X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=bwmon.py;h=ce33e19bf8fafb94ac21961a84babbc0a229438b;hb=refs%2Fheads%2F1.7;hp=0e9f78e31bbca5b5744926095d33e8f54281467b;hpb=4c5cb74ef18fde013d440f6d2243aa71978c7b3d;p=nodemanager.git diff --git a/bwmon.py b/bwmon.py index 0e9f78e..ce33e19 100644 --- a/bwmon.py +++ b/bwmon.py @@ -33,16 +33,20 @@ import database from sets import Set # Defaults -debug = False -verbose = False +# Set DEBUG to True if you don't want to send emails +DEBUG = False +# Set ENABLE to False to setup buckets, but not limit. +ENABLE = True + datafile = "/var/lib/misc/bwmon.dat" try: sys.path.append("/etc/planetlab") from plc_config import * except: - logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found") - logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.") + DEBUG = True + logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2) + logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1) # Constants seconds_per_day = 24 * 60 * 60 @@ -51,9 +55,6 @@ bits_per_byte = 8 # Burst to line rate (or node cap). Set by NM. in KBit/s default_MaxRate = int(bwlimit.get_bwcap() / 1000) default_Maxi2Rate = int(bwlimit.bwmax / 1000) -# Min rate 8 bits/s -default_MinRate = 0 -default_Mini2Rate = 0 # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G # 5.4 Gbyte per day max allowed transfered per recording period default_MaxKByte = 5662310 @@ -186,13 +187,13 @@ class Slice: self.bytes = 0 self.i2bytes = 0 self.MaxRate = default_MaxRate - self.MinRate = default_MinRate + self.MinRate = bwlimit.bwmin / 1000 self.Maxi2Rate = default_Maxi2Rate - self.Mini2Rate = default_Mini2Rate + self.Mini2Rate = bwlimit.bwmin / 1000 self.MaxKByte = default_MaxKByte - self.ThreshKByte = (.8 * self.MaxKByte) + self.ThreshKByte = int(.8 * self.MaxKByte) self.Maxi2KByte = default_Maxi2KByte - self.Threshi2KByte = (.8 * self.Maxi2KByte) + self.Threshi2KByte = int(.8 * self.Maxi2KByte) self.Share = default_Share self.Sharei2 = default_Share self.emailed = False @@ -217,24 +218,24 @@ class Slice: # Sanity check plus policy decision for MinRate: # Minrate cant be greater than 25% of MaxRate or NodeCap. - MinRate = int(rspec.get("net_min_rate", default_MinRate)) + MinRate = int(rspec.get("net_min_rate", bwlimit.bwmin / 1000)) if MinRate > int(.25 * default_MaxRate): MinRate = int(.25 * default_MaxRate) if MinRate != self.MinRate: self.MinRate = MinRate logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate)) - MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000)) + MaxRate = int(rspec.get('net_max_rate', default_MaxRate)) if MaxRate != self.MaxRate: self.MaxRate = MaxRate logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate)) - Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate)) + Mini2Rate = int(rspec.get('net_i2_min_rate', bwlimit.bwmin / 1000)) if Mini2Rate != self.Mini2Rate: self.Mini2Rate = Mini2Rate logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate)) - Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000)) + Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate)) if Maxi2Rate != self.Maxi2Rate: self.Maxi2Rate = Maxi2Rate logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate)) @@ -275,7 +276,6 @@ class Slice: Begin a new recording period. Remove caps by restoring limits to their default values. """ - # Query Node Manager for max rate overrides self.updateSliceAttributes(rspec) @@ -297,7 +297,7 @@ class Slice: logger.log("bwmon: %s reset to %s/%s" % \ (self.name, bwlimit.format_tc_rate(maxrate), - bwlimit.format_tc_rate(maxi2rate))) + bwlimit.format_tc_rate(maxi2rate)), 1) bwlimit.set(xid = self.xid, minrate = self.MinRate * 1000, maxrate = self.MaxRate * 1000, @@ -317,7 +317,7 @@ class Slice: 'date': time.asctime(time.gmtime()) + " GMT", 'period': format_period(period)} - if new_maxrate != self.MaxRate: + if new_maxrate != (self.MaxRate * 1000): # Format template parameters for low bandwidth message params['class'] = "low bandwidth" params['bytes'] = format_bytes(usedbytes - self.bytes) @@ -328,85 +328,62 @@ class Slice: message += template % params logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params) - if new_maxexemptrate != self.Maxi2Rate: + if new_maxexemptrate != (self.Maxi2Rate * 1000): # Format template parameters for high bandwidth message params['class'] = "high bandwidth" params['bytes'] = format_bytes(usedi2bytes - self.i2bytes) params['limit'] = format_bytes(self.Maxi2KByte * 1024) - params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate) + params['new_maxrate'] = bwlimit.format_tc_rate(new_maxexemptrate) message += template % params logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params) # Notify slice - if message and self.emailed == False: + if self.emailed == False: subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params - if debug: + if DEBUG: logger.log("bwmon: "+ subject) logger.log("bwmon: "+ message + (footer % params)) else: self.emailed = True + logger.log("bwmon: Emailing %s" % self.name) slicemail(self.name, subject, message + (footer % params)) def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec): """ Update byte counts and check if byte thresholds have been - exceeded. If exceeded, cap to remaining bytes in limit over remaining in period. + exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period. Recalculate every time module runs. """ - + + # copy self.Min* and self.*share values for comparison later. + runningMinRate = self.MinRate + runningMini2Rate = self.Mini2Rate + runningshare = self.Share + runningsharei2 = self.Sharei2 + # Query Node Manager for max rate overrides self.updateSliceAttributes(rspec) - # Check shares for Sirius loans. - if runningshare != self.share: - logger.log("bwmon: Updating share to %s" % self.share) - bwlimit.set(xid = self.xid, - minrate = self.MinRate * 1000, - maxrate = self.MaxRate * 1000, - maxexemptrate = self.Maxi2Rate * 1000, - minexemptrate = self.Mini2Rate * 1000, - share = self.Share) - - # Prepare message parameters from the template - #message = "" - #params = {'slice': self.name, 'hostname': socket.gethostname(), - # 'since': time.asctime(time.gmtime(self.time)) + " GMT", - # 'until': time.asctime(time.gmtime(self.time + period)) + " GMT", - # 'date': time.asctime(time.gmtime()) + " GMT", - # 'period': format_period(period)} - # Check limits. if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)): sum = self.bytes + (self.ThreshKByte * 1024) maxbyte = self.MaxKByte * 1024 bytesused = usedbytes - self.bytes timeused = int(time.time() - self.time) - # Calcuate new rate. + # Calcuate new rate. in bit/s new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused)) # Never go under MinRate if new_maxrate < (self.MinRate * 1000): new_maxrate = self.MinRate * 1000 # State information. I'm capped. - self.capped = True + self.capped += True else: # Sanity Check new_maxrate = self.MaxRate * 1000 - self.capped = False - - ## Format template parameters for low bandwidth message - #params['class'] = "low bandwidth" - #params['bytes'] = format_bytes(usedbytes - self.bytes) - #params['limit'] = format_bytes(self.MaxKByte * 1024) - #params['thresh'] = format_bytes(self.ThreshKByte * 1024) - #params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate) - - # Cap low bandwidth burst rate - #if new_maxrate != runningmaxrate: - # message += template % params - # logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params) - + self.capped += False + if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)): maxi2byte = self.Maxi2KByte * 1024 i2bytesused = usedi2bytes - self.i2bytes @@ -417,37 +394,31 @@ class Slice: if new_maxi2rate < (self.Mini2Rate * 1000): new_maxi2rate = self.Mini2Rate * 1000 # State information. I'm capped. - self.capped = True + self.capped += True else: # Sanity new_maxi2rate = self.Maxi2Rate * 1000 - self.capped = False - - # Format template parameters for high bandwidth message - #params['class'] = "high bandwidth" - #params['bytes'] = format_bytes(usedi2bytes - self.i2bytes) - #params['limit'] = format_bytes(self.Maxi2KByte * 1024) - #params['new_maxexemptrate'] = bwlimit.format_tc_rate(new_maxi2rate) - - # Cap high bandwidth burst rate - #if new_maxi2rate != runningmaxi2rate: - # message += template % params - # logger.log("bwmon: %(slice)s %(class)s capped at %(new_maxexemptrate)s/s" % params) - - # Apply parameters - if new_maxrate != runningmaxrate or new_maxi2rate != runningmaxi2rate: - bwlimit.set(xid = self.xid, maxrate = new_maxrate, maxexemptrate = new_maxi2rate) + self.capped += False + + # Check running values against newly calculated values so as not to run tc + # unnecessarily + if (runningmaxrate != new_maxrate) or \ + (runningMinRate != self.MinRate) or \ + (runningmaxi2rate != new_maxi2rate) or \ + (runningMini2Rate != self.Mini2Rate) or \ + (runningshare != self.Share): + # Apply parameters + bwlimit.set(xid = self.xid, + minrate = self.MinRate * 1000, + maxrate = new_maxrate, + minexemptrate = self.Mini2Rate * 1000, + maxexemptrate = new_maxi2rate, + share = self.Share) # Notify slice - if self.capped == True and self.emailed == False: - self.notify(newmaxrate, newmaxexemptrate, usedbytes, usedi2bytes) - # subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params - # if debug: - # logger.log("bwmon: "+ subject) - # logger.log("bwmon: "+ message + (footer % params)) - # else: - # self.emailed = True - # slicemail(self.name, subject, message + (footer % params)) + if self.capped == True: + self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes) + def gethtbs(root_xid, default_xid): """ @@ -468,7 +439,7 @@ def gethtbs(root_xid, default_xid): and (xid != default_xid): # Orphaned (not associated with a slice) class name = "%d?" % xid - logger.log("bwmon: Found orphaned HTB %s. Removing." %name) + logger.log("bwmon: Found orphaned HTB %s. Removing." %name, 1) bwlimit.off(xid) livehtbs[xid] = {'share': share, @@ -491,13 +462,9 @@ def sync(nmdbcopy): period, \ default_MaxRate, \ default_Maxi2Rate, \ - default_MinRate, \ default_MaxKByte,\ - default_ThreshKByte,\ default_Maxi2KByte,\ - default_Threshi2KByte,\ - default_Share,\ - verbose + default_Share # All slices names = [] @@ -507,11 +474,11 @@ def sync(nmdbcopy): # Incase default isn't set yet. if default_MaxRate == -1: - default_MaxRate = 1000000 + default_MaxRate = 10000000 try: f = open(datafile, "r+") - logger.log("bwmon: Loading %s" % datafile) + logger.log("bwmon: Loading %s" % datafile, 2) (version, slices, deaddb) = pickle.load(f) f.close() # Check version of data file @@ -544,25 +511,28 @@ def sync(nmdbcopy): for plcSliver in nmdbcopy.keys(): live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver] - logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__()) - logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__()) + logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__(), 2) + logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__(), 2) # Get actual running values from tc. # Update slice totals and bandwidth. {xid: {values}} kernelhtbs = gethtbs(root_xid, default_xid) - logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__()) + logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__(), 2) # The dat file has HTBs for slices, but the HTBs aren't running nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys()) - logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__() ) + logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__(), 2) # Reset tc counts. for nohtbslice in nohtbslices: if live.has_key(nohtbslice): slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] ) + else: + logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice) + del slices[nohtbslice] # The dat file doesnt have HTB for the slice but kern has HTB slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys()) - logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__() ) + logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__(), 2) for slicenodat in slicesnodat: # But slice is running if live.has_key(slicenodat): @@ -571,12 +541,11 @@ def sync(nmdbcopy): slices[slicenodat] = Slice(slicenodat, live[slicenodat]['name'], live[slicenodat]['_rspec']) - else: bwlimit.off(slicenodat) # Abandoned. it doesnt exist at PLC or the dat # Get new slices. # Slices in GetSlivers but not running HTBs newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys()) - logger.log("bwmon: Found %s new slices" % newslicesxids.__len__()) + logger.log("bwmon: Found %s new slices" % newslicesxids.__len__(), 2) # Setup new slices for newslice in newslicesxids: @@ -610,39 +579,42 @@ def sync(nmdbcopy): deadslice['htb']['share'], live[newslice]['_rspec']) # Since the slice has been reinitialed, remove from dead database. - del deaddb[deadslice] + del deaddb[deadslice['slice'].name] else: - logger.log("bwmon Slice %s doesn't have xid. Must be delegated."\ - "Skipping." % live[newslice]['name']) + logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name']) # Move dead slices that exist in the pickle file, but # aren't instantiated by PLC into the dead dict until # recording period is over. This is to avoid the case where a slice is dynamically created # and destroyed then recreated to get around byte limits. deadxids = Set(slices.keys()) - Set(live.keys()) - logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2)) + logger.log("bwmon: Found %s dead slices" % (deadxids.__len__() - 2), 2) for deadxid in deadxids: if deadxid == root_xid or deadxid == default_xid: continue - logger.log("bwmon: removing dead slice %s " % deadxid) - if slices.has_key(deadxid): + logger.log("bwmon: removing dead slice %s " % deadxid) + if slices.has_key(deadxid) and kernelhtbs.has_key(deadxid): # add slice (by name) to deaddb + logger.log("bwmon: Saving bandwidth totals for %s." % slices[deadxid].name) deaddb[slices[deadxid].name] = {'slice': slices[deadxid], 'htb': kernelhtbs[deadxid]} del slices[deadxid] if kernelhtbs.has_key(deadxid): + logger.log("bwmon: Removing HTB for %s." % deadxid, 2) bwlimit.off(deadxid) - - # Clean up deaddb - for (deadslice, deadhtb) in deaddb.iteritems(): - if (time.time() >= (deadslice.time() + period)): - logger.log("bwmon: Removing dead slice %s from dat." % deadslice.name) - del deaddb[deadslice.name] + + # Clean up deaddb + for deadslice in deaddb.keys(): + if (time.time() >= (deaddb[deadslice]['slice'].time + period)): + logger.log("bwmon: Removing dead slice %s from dat." \ + % deaddb[deadslice]['slice'].name) + del deaddb[deadslice] # Get actual running values from tc since we've added and removed buckets. # Update slice totals and bandwidth. {xid: {values}} kernelhtbs = gethtbs(root_xid, default_xid) - logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__()) + logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2) + # Update all byte limites on all slices for (xid, slice) in slices.iteritems(): # Monitor only the specified slices if xid == root_xid or xid == default_xid: continue @@ -661,8 +633,8 @@ def sync(nmdbcopy): kernelhtbs[xid]['usedbytes'], \ kernelhtbs[xid]['usedi2bytes'], \ live[xid]['_rspec']) - else: - if debug: logger.log("bwmon: Updating slice %s" % slice.name) + elif ENABLE: + logger.log("bwmon: Updating slice %s" % slice.name, 2) # Update byte counts slice.update(kernelhtbs[xid]['maxrate'], \ kernelhtbs[xid]['maxexemptrate'], \ @@ -671,22 +643,57 @@ def sync(nmdbcopy): kernelhtbs[xid]['share'], live[xid]['_rspec']) - logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile)) + logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2) f = open(datafile, "w") pickle.dump((version, slices, deaddb), f) f.close() + +def getDefaults(nmdbcopy): + ''' + Get defaults from default slice's slice attributes. + ''' + status = True + # default slice + dfltslice = nmdbcopy.get(PLC_SLICE_PREFIX+"_default") + if dfltslice: + if dfltslice['rspec']['net_max_rate'] == -1: + allOff() + status = False + return status + + +def allOff(): + """ + Turn off all slice HTBs + """ + # Get/set special slice IDs + root_xid = bwlimit.get_xid("root") + default_xid = bwlimit.get_xid("default") + kernelhtbs = gethtbs(root_xid, default_xid) + if len(kernelhtbs): + logger.log("bwlimit: Disabling all running HTBs.") + for htb in kernelhtbs.keys(): bwlimit.off(htb) + + lock = threading.Event() def run(): - """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting.""" - if debug: logger.log("bwmon: Thread started") + """ + When run as a thread, wait for event, lock db, deep copy it, release it, + run bwmon.GetSlivers(), then go back to waiting. + """ + logger.log("bwmon: Thread started", 2) while True: lock.wait() - if debug: logger.log("bwmon: Event received. Running.") + logger.log("bwmon: Event received. Running.", 2) database.db_lock.acquire() nmdbcopy = copy.deepcopy(database.db) database.db_lock.release() - try: sync(nmdbcopy) + try: + if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev eth0")) > 0: + # class show to check if net:InitNodeLimit:bwlimit.init has run. + sync(nmdbcopy) + else: logger.log("bwmon: BW limits DISABLED.") except: logger.log_exc() lock.clear()