X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=bwmon.py;h=579bae3ab241deb04b2609f0e65a16997200eb9e;hb=84a90a8a44a9c8265d95578411e3f0a8fc80becc;hp=28c734186289ea1aae0c5f8cd47edcb2006e2b44;hpb=e3eb033d2d04d2705d3222e6560725bb5945084a;p=nodemanager.git diff --git a/bwmon.py b/bwmon.py index 28c7341..579bae3 100644 --- a/bwmon.py +++ b/bwmon.py @@ -15,23 +15,33 @@ # Faiyaz Ahmed # Copyright (C) 2004-2006 The Trustees of Princeton University # -# $Id: bwmon.py,v 1.2 2007/02/07 18:12:02 faiyaza Exp $ +# $Id: bwmon.py,v 1.1.2.10 2007/06/25 17:47:10 faiyaza Exp $ # import os import sys import time import pickle -import database +import socket +import logger +import copy +import threading +import tools -#import socket -#import xmlrpclib import bwlimit +import database from sets import Set -# Utility functions -#from pl_mom import * +try: + sys.path.append("/etc/planetlab") + from plc_config import * +except: + logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found") + PLC_NAME = "PlanetLab" + PLC_SLICE_PREFIX = "pl" + PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org" + PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org" # Constants seconds_per_day = 24 * 60 * 60 @@ -39,15 +49,16 @@ bits_per_byte = 8 # Defaults debug = False -verbose = 0 +verbose = False datafile = "/var/lib/misc/bwmon.dat" #nm = None -# Burst to line rate (or node cap). Set by NM. -default_MaxRate = bwlimit.get_bwcap() -default_Maxi2Rate = bwlimit.bwmax +# Burst to line rate (or node cap). Set by NM. in KBit/s +default_MaxRate = int(bwlimit.get_bwcap() / 1000) +default_Maxi2Rate = int(bwlimit.bwmax / 1000) # Min rate 8 bits/s -default_MinRate = 8 +default_MinRate = 0 +default_Mini2Rate = 0 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G # 5.4 Gbyte per day max allowed transfered per recording period default_MaxKByte = 5662310 @@ -81,6 +92,78 @@ footer = \ %(date)s %(hostname)s bwcap %(slice)s """.lstrip() +def format_bytes(bytes, si = True): + """ + Formats bytes into a string + """ + if si: + kilo = 1000. + else: + # Officially, a kibibyte + kilo = 1024. + + if bytes >= (kilo * kilo * kilo): + return "%.1f GB" % (bytes / (kilo * kilo * kilo)) + elif bytes >= 1000000: + return "%.1f MB" % (bytes / (kilo * kilo)) + elif bytes >= 1000: + return "%.1f KB" % (bytes / kilo) + else: + return "%.0f bytes" % bytes + +def format_period(seconds): + """ + Formats a period in seconds into a string + """ + + if seconds == (24 * 60 * 60): + return "day" + elif seconds == (60 * 60): + return "hour" + elif seconds > (24 * 60 * 60): + return "%.1f days" % (seconds / 24. / 60. / 60.) + elif seconds > (60 * 60): + return "%.1f hours" % (seconds / 60. / 60.) + elif seconds > (60): + return "%.1f minutes" % (seconds / 60.) + else: + return "%.0f seconds" % seconds + +def slicemail(slice, subject, body): + sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w") + + # PLC has a separate list for pl_mom messages + if PLC_MAIL_SUPPORT_ADDRESS == "support@planet-lab.org": + to = ["pl-mom@planet-lab.org"] + else: + to = [PLC_MAIL_SUPPORT_ADDRESS] + + if slice is not None and slice != "root": + to.append(PLC_MAIL_SLICE_ADDRESS.replace("SLICE", slice)) + + header = {'from': "%s Support <%s>" % (PLC_NAME, PLC_MAIL_SUPPORT_ADDRESS), + 'to': ", ".join(to), + 'version': sys.version.split(" ")[0], + 'subject': subject} + + # Write headers + sendmail.write( +""" +Content-type: text/plain +From: %(from)s +Reply-To: %(from)s +To: %(to)s +X-Mailer: Python/%(version)s +Subject: %(subject)s + +""".lstrip() % header) + + # Write body + sendmail.write(body) + # Done + sendmail.close() + + class Slice: """ Stores the last recorded bandwidth parameters of a slice. @@ -100,62 +183,101 @@ class Slice: """ - def __init__(self, xid, name, maxrate, maxi2rate, bytes, i2bytes, data): + def __init__(self, xid, name, rspec): self.xid = xid self.name = name self.time = 0 self.bytes = 0 self.i2bytes = 0 self.MaxRate = default_MaxRate - self.MinRate = default_MinRate + self.MinRate = default_MinRate self.Maxi2Rate = default_Maxi2Rate + self.Mini2Rate = default_Mini2Rate self.MaxKByte = default_MaxKByte self.ThreshKByte = default_ThreshKByte self.Maxi2KByte = default_Maxi2KByte self.Threshi2KByte = default_Threshi2KByte self.Share = default_Share + self.Sharei2 = default_Share self.emailed = False - # Get real values where applicable - self.reset(maxrate, maxi2rate, bytes, i2bytes, data) + self.updateSliceAttributes(rspec) + bwlimit.set(xid = self.xid, + minrate = self.MinRate * 1000, + maxrate = self.MaxRate * 1000, + maxexemptrate = self.Maxi2Rate * 1000, + minexemptrate = self.Mini2Rate * 1000, + share = self.Share) def __repr__(self): return self.name - @database.synchronized - def updateSliceAttributes(self, data): - for sliver in data['slivers']: - if sliver['name'] == self.name: - for attribute in sliver['attributes']: - if attribute['name'] == 'net_min_rate': - self.MinRate = attribute['value'] - elif attribute['name'] == 'net_max_rate': - self.MaxRate = attribute['value'] - elif attribute['name'] == 'net_i2_min_rate': - self.Mini2Rate = attribute['value'] - elif attribute['name'] == 'net_i2_max_rate': - self.Maxi2Rate = attribute['value'] - elif attribute['name'] == 'net_max_kbyte': - self.MaxKbyte = attribute['value'] - elif attribute['name'] == 'net_i2_max_kbyte': - self.Maxi2KByte = attribute['value'] - elif attribute['name'] == 'net_thresh_kbyte': - self.ThreshKByte = attribute['value'] - elif attribute['name'] == 'net_i2_thresh_kbyte': - self.Threshi2KByte = attribute['value'] - elif attribute['name'] == 'net_share': - self.Share = attribute['value'] - elif attribute['name'] == 'net_i2_share': - self.Sharei2 = attribute['value'] - - def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, data): + def updateSliceAttributes(self, rspec): + # Get attributes + + # Sanity check plus policy decision for MinRate: + # Minrate cant be greater than 25% of MaxRate or NodeCap. + MinRate = int(rspec.get("net_min_rate", default_MinRate)) + if MinRate > int(.25 * default_MaxRate): + MinRate = int(.25 * default_MaxRate) + if MinRate != self.MinRate: + self.MinRate = MinRate + logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate)) + + MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000)) + if MaxRate != self.MaxRate: + self.MaxRate = MaxRate + logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate)) + + Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate)) + if Mini2Rate != self.Mini2Rate: + self.Mini2Rate = Mini2Rate + logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate)) + + Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000)) + if Maxi2Rate != self.Maxi2Rate: + self.Maxi2Rate = Maxi2Rate + logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate)) + + MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte)) + if MaxKByte != self.MaxKByte: + self.MaxKByte = MaxKByte + logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte)) + + Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte)) + if Maxi2KByte != self.Maxi2KByte: + self.Maxi2KByte = Maxi2KByte + logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte)) + + ThreshKByte = int(rspec.get('net_thresh_kbyte', default_ThreshKByte)) + if ThreshKByte != self.ThreshKByte: + self.ThreshKByte = ThreshKByte + logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte)) + + Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', default_Threshi2KByte)) + if Threshi2KByte != self.Threshi2KByte: + self.Threshi2KByte = Threshi2KByte + logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte)) + + Share = int(rspec.get('net_share', default_Share)) + if Share != self.Share: + self.Share = Share + logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share)) + + Sharei2 = int(rspec.get('net_i2_share', default_Share)) + if Sharei2 != self.Sharei2: + self.Sharei2 = Sharei2 + logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share)) + + + def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ # Query Node Manager for max rate overrides - self.updateSliceAttributes(data) + self.updateSliceAttributes(rspec) # Reset baseline time self.time = time.time() @@ -166,28 +288,29 @@ class Slice: # Reset email self.emailed = False - + maxrate = self.MaxRate * 1000 + maxi2rate = self.Maxi2Rate * 1000 # Reset rates. if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate): - print "%s reset to %s/%s" % \ + logger.log("bwmon: %s reset to %s/%s" % \ (self.name, - bwlimit.format_tc_rate(self.MaxRate), - bwlimit.format_tc_rate(self.Maxi2Rate)) + bwlimit.format_tc_rate(maxrate), + bwlimit.format_tc_rate(maxi2rate))) bwlimit.set(xid = self.xid, - minrate = self.MinRate, - maxrate = self.MaxRate, - maxexemptrate = self.Maxi2Rate, - minexemptrate = self.Mini2Rate, + minrate = self.MinRate * 1000, + maxrate = self.MaxRate * 1000, + maxexemptrate = self.Maxi2Rate * 1000, + minexemptrate = self.Mini2Rate * 1000, share = self.Share) - def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, data): + def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): """ Update byte counts and check if byte limits have been exceeded. """ # Query Node Manager for max rate overrides - self.updateSliceAttributes(data) + self.updateSliceAttributes(rspec) # Prepare message parameters from the template message = "" @@ -197,58 +320,61 @@ class Slice: 'date': time.asctime(time.gmtime()) + " GMT", 'period': format_period(period)} - if usedi2bytes >= (self.usedbytes + self.ByteThresh): + if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)): + if verbose: + logger.log("bwmon: %s over thresh %s" \ + % (self.name, format_bytes(self.ThreshKByte * 1024))) + sum = self.bytes + (self.ThreshKByte * 1024) maxbyte = self.MaxKByte * 1024 - bytesused = bytes - self.bytes + bytesused = usedbytes - self.bytes timeused = int(time.time() - self.time) new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused)) - if new_maxrate < self.MinRate: - new_maxrate = self.MinRate + if new_maxrate < (self.MinRate * 1000): + new_maxrate = self.MinRate * 1000 else: - new_maxrate = self.MaxRate + new_maxrate = self.MaxRate * 1000 # Format template parameters for low bandwidth message params['class'] = "low bandwidth" params['bytes'] = format_bytes(usedbytes - self.bytes) - params['maxrate'] = bwlimit.format_tc_rate(runningmaxrate) - params['limit'] = format_bytes(self.MaxKByte) + params['limit'] = format_bytes(self.MaxKByte * 1024) + params['thresh'] = format_bytes(self.ThreshKByte * 1024) params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate) if verbose: - print "%(slice)s %(class)s " \ - "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % \ - params + logger.log("bwmon: %(slice)s %(class)s " \ + "%(bytes)s of %(limit)s max %(thresh)s thresh (%(new_maxrate)s/s maxrate)" % \ + params) # Cap low bandwidth burst rate if new_maxrate != runningmaxrate: message += template % params - print "%(slice)s %(class)s capped at %(new_maxrate)s/s " % params + logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params) - if usedi2bytes >= (self.i2bytes + self.Threshi2KBytes): + if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)): maxi2byte = self.Maxi2KByte * 1024 - i2bytesused = i2bytes - self.i2bytes + i2bytesused = usedi2bytes - self.i2bytes timeused = int(time.time() - self.time) new_maxi2rate = int(((maxi2byte - i2bytesused) * 8)/(period - timeused)) - if new_maxi2rate < self.Mini2Rate: - new_maxi2rate = self.Mini2Rate + if new_maxi2rate < (self.Mini2Rate * 1000): + new_maxi2rate = self.Mini2Rate * 1000 else: - new_maxi2rate = self.Maxi2Rate + new_maxi2rate = self.Maxi2Rate * 1000 # Format template parameters for high bandwidth message params['class'] = "high bandwidth" params['bytes'] = format_bytes(usedi2bytes - self.i2bytes) - params['maxrate'] = bwlimit.format_tc_rate(runningmaxi2rate) - params['limit'] = format_bytes(self.Maxi2KByte) + params['limit'] = format_bytes(self.Maxi2KByte * 1024) params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate) if verbose: - print "%(slice)s %(class)s " \ - "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % params + logger.log("bwmon: %(slice)s %(class)s " \ + "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % params) # Cap high bandwidth burst rate if new_maxi2rate != runningmaxi2rate: message += template % params - print "%(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params + logger.log("bwmon: %(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params) # Apply parameters if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate: @@ -258,107 +384,205 @@ class Slice: if message and self.emailed == False: subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params if debug: - print subject - print message + (footer % params) + logger.log("bwmon: "+ subject) + logger.log("bwmon: "+ message + (footer % params)) else: self.emailed = True slicemail(self.name, subject, message + (footer % params)) -def GetSlivers(data): +def gethtbs(root_xid, default_xid): + """ + Return dict {xid: {*rates}} of running htbs as reported by tc that have names. + Turn off HTBs without names. + """ + livehtbs = {} + for params in bwlimit.get(): + (xid, share, + minrate, maxrate, + minexemptrate, maxexemptrate, + usedbytes, usedi2bytes) = params + + name = bwlimit.get_slice(xid) + + + + if (name is None) \ + and (xid != root_xid) \ + and (xid != default_xid): + # Orphaned (not associated with a slice) class + name = "%d?" % xid + logger.log("bwmon: Found orphaned HTB %s. Removing." %name) + bwlimit.off(xid) + + livehtbs[xid] = {'share': share, + 'minrate': minrate, + 'maxrate': maxrate, + 'maxexemptrate': maxexemptrate, + 'minexemptrate': minexemptrate, + 'usedbytes': usedbytes, + 'name': name, + 'usedi2bytes': usedi2bytes} + + return livehtbs + +def sync(nmdbcopy): + """ + Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit. + """ # Defaults global datafile, \ period, \ - default_MaxRate, \ - default_Maxi2Rate, \ - default_MinRate, \ - default_MaxKByte,\ - default_ThreshKByte,\ + default_MaxRate, \ + default_Maxi2Rate, \ + default_MinRate, \ + default_MaxKByte,\ + default_ThreshKByte,\ default_Maxi2KByte,\ default_Threshi2KByte,\ - default_Share + default_Share,\ + verbose # All slices names = [] + # Incase the limits have changed. + default_MaxRate = int(bwlimit.get_bwcap() / 1000) + default_Maxi2Rate = int(bwlimit.bwmax / 1000) + + # Incase default isn't set yet. + if default_MaxRate == -1: + default_MaxRate = 1000000 try: f = open(datafile, "r+") - if verbose: - print "Loading %s" % datafile + logger.log("bwmon: Loading %s" % datafile) (version, slices) = pickle.load(f) f.close() # Check version of data file - if version != "$Id: bwmon.py,v 1.2 2007/02/07 18:12:02 faiyaza Exp $": - print "Not using old version '%s' data file %s" % (version, datafile) + if version != "$Id: bwmon.py,v 1.1.2.10 2007/06/25 17:47:10 faiyaza Exp $": + logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile)) raise Exception except Exception: - version = "$Id: bwmon.py,v 1.2 2007/02/07 18:12:02 faiyaza Exp $" + version = "$Id: bwmon.py,v 1.1.2.10 2007/06/25 17:47:10 faiyaza Exp $" slices = {} - # Get special slice IDs + # Get/set special slice IDs root_xid = bwlimit.get_xid("root") default_xid = bwlimit.get_xid("default") - live = [] - # Get actuall running values from tc. - for params in bwlimit.get(): - (xid, share, - minrate, maxrate, - minexemptrate, maxexemptrate, - bytes, i2bytes) = params - live.append(xid) + # Since root is required for sanity, its not in the API/plc database, so pass {} + # to use defaults. + if root_xid not in slices.keys(): + slices[root_xid] = Slice(root_xid, "root", {}) + slices[root_xid].reset(0, 0, 0, 0, {}) + + # Used by bwlimit. pass {} since there is no rspec (like above). + if default_xid not in slices.keys(): + slices[default_xid] = Slice(default_xid, "default", {}) + slices[default_xid].reset(0, 0, 0, 0, {}) + + live = {} + # Get running slivers that should be on this node (from plc). {xid: name} + # db keys on name, bwmon keys on xid. db doesnt have xid either. + for plcSliver in nmdbcopy.keys(): + live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver] + + logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__()) + logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__()) + + # Get actual running values from tc. + # Update slice totals and bandwidth. {xid: {values}} + livehtbs = gethtbs(root_xid, default_xid) + logger.log("bwmon: Found %s running HTBs" % livehtbs.keys().__len__()) + + # Get new slices. + # live.xids - runing(slices).xids = new.xids + #newslicesxids = Set(live.keys()) - Set(slices.keys()) + newslicesxids = Set(live.keys()) - Set(livehtbs.keys()) + logger.log("bwmon: Found %s new slices" % newslicesxids.__len__()) + + # Incase we upgraded nm and need to keep track of already running htbs + norecxids = Set(livehtbs.keys()) - Set(slices.keys()) + logger.log("bwmon: Found %s slices that have htbs but not in dat." % norecxids.__len__()) + newslicesxids.update(norecxids) + + # Setup new slices + for newslice in newslicesxids: + # Delegated slices dont have xids (which are uids) since they haven't been + # instantiated yet. + if newslice != None and live[newslice].has_key('_rspec') == True: + logger.log("bwmon: New Slice %s" % live[newslice]['name']) + # _rspec is the computed rspec: NM retrieved data from PLC, computed loans + # and made a dict of computed values. + slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec']) + slices[newslice].reset(0, 0, 0, 0, live[newslice]['_rspec']) + else: + logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslice]['name']) - # Ignore root and default buckets + # Delete dead slices. + # First delete dead slices that exist in the pickle file, but + # aren't instantiated by PLC. + dead = Set(slices.keys()) - Set(live.keys()) + logger.log("bwmon: Found %s dead slices" % (dead.__len__() - 2)) + for xid in dead: if xid == root_xid or xid == default_xid: continue + logger.log("bwmon: removing dead slice %s " % xid) + if slices.has_key(xid): del slices[xid] + if livehtbs.has_key(xid): bwlimit.off(xid) - name = bwlimit.get_slice(xid) - if name is None: - # Orphaned (not associated with a slice) class - name = "%d?" % xid - bwlimit.off(xid) + # Get actual running values from tc since we've added and removed buckets. + # Update slice totals and bandwidth. {xid: {values}} + livehtbs = gethtbs(root_xid, default_xid) + logger.log("bwmon: now %s running HTBs" % livehtbs.keys().__len__()) + for (xid, slice) in slices.iteritems(): # Monitor only the specified slices + if xid == root_xid or xid == default_xid: continue if names and name not in names: continue - #slices is populated from the pickle file - #xid is populated from bwlimit (read from /etc/passwd) - if slices.has_key(xid): - slice = slices[xid] - if time.time() >= (slice.time + period) or \ - bytes < slice.bytes or i2bytes < slice.i2bytes: - # Reset to defaults every 24 hours or if it appears - # that the byte counters have overflowed (or, more - # likely, the node was restarted or the HTB buckets - # were re-initialized). - slice.reset(maxrate, maxexemptrate, bytes, i2bytes, data) - else: - # Update byte counts - slice.update(maxrate, maxexemptrate, bytes, i2bytes, data) + + if (time.time() >= (slice.time + period)) or \ + (livehtbs[xid]['usedbytes'] < slice.bytes) or \ + (livehtbs[xid]['usedi2bytes'] < slice.i2bytes): + # Reset to defaults every 24 hours or if it appears + # that the byte counters have overflowed (or, more + # likely, the node was restarted or the HTB buckets + # were re-initialized). + slice.reset(livehtbs[xid]['maxrate'], \ + livehtbs[xid]['maxexemptrate'], \ + livehtbs[xid]['usedbytes'], \ + livehtbs[xid]['usedi2bytes'], \ + live[xid]['_rspec']) else: - # New slice, initialize state - slice = slices[xid] = Slice(xid, name, maxrate, maxexemptrate, bytes, i2bytes, data) - - # Delete dead slices - dead = Set(slices.keys()) - Set(live) - for xid in dead: - del slices[xid] - bwlimit.off(xid) - - print "Saving %s" % datafile + if debug: logger.log("bwmon: Updating slice %s" % slice.name) + # Update byte counts + slice.update(livehtbs[xid]['maxrate'], \ + livehtbs[xid]['maxexemptrate'], \ + livehtbs[xid]['usedbytes'], \ + livehtbs[xid]['usedi2bytes'], \ + live[xid]['_rspec']) + + logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile)) f = open(datafile, "w") pickle.dump((version, slices), f) f.close() - -#def GetSlivers(data): -# for sliver in data['slivers']: -# if sliver.has_key('attributes'): -# print sliver -# for attribute in sliver['attributes']: -# if attribute['name'] == "KByteThresh": print attribute['value'] - -def start(options, config): +lock = threading.Event() +def run(): + """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting.""" + if debug: logger.log("bwmon: Thread started") + while True: + lock.wait() + if debug: logger.log("bwmon: Event received. Running.") + database.db_lock.acquire() + nmdbcopy = copy.deepcopy(database.db) + database.db_lock.release() + try: sync(nmdbcopy) + except: logger.log_exc() + lock.clear() + +def start(*args): + tools.as_daemon_thread(run) + +def GetSlivers(*args): pass - -if __name__ == '__main__': - main()