X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=bwmon.py;h=7baee4c220077f791f5b27997c85ab806a952044;hb=b4a5dd8ec367eca5f04b7cf4b8daa31458e6da6c;hp=26a14f8a19bcab2d829ad9a33930995bf7b37dcc;hpb=0c9f6590a6f97823553cc557e25cd6a381cb5adb;p=nodemanager.git diff --git a/bwmon.py b/bwmon.py index 26a14f8..7baee4c 100644 --- a/bwmon.py +++ b/bwmon.py @@ -33,14 +33,18 @@ import database from sets import Set # Defaults -debug = False -verbose = False +# Set DEBUG to True if you don't want to send emails +DEBUG = False +# Set ENABLE to False to setup buckets, but not limit. +ENABLE = True + datafile = "/var/lib/misc/bwmon.dat" try: sys.path.append("/etc/planetlab") from plc_config import * except: + DEBUG = True logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2) logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1) @@ -221,7 +225,7 @@ class Slice: self.MinRate = MinRate logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate)) - MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000)) + MaxRate = int(rspec.get('net_max_rate', default_MaxRate)) if MaxRate != self.MaxRate: self.MaxRate = MaxRate logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate)) @@ -231,7 +235,7 @@ class Slice: self.Mini2Rate = Mini2Rate logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate)) - Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000)) + Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate)) if Maxi2Rate != self.Maxi2Rate: self.Maxi2Rate = Maxi2Rate logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate)) @@ -267,12 +271,14 @@ class Slice: logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share)) - def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): + def reset(self, runningrates, rspec): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ - + # Cache share for later comparison + self.Share = runningrates.get('share', 1) + # Query Node Manager for max rate overrides self.updateSliceAttributes(rspec) @@ -280,8 +286,8 @@ class Slice: self.time = time.time() # Reset baseline byte coutns - self.bytes = usedbytes - self.i2bytes = usedi2bytes + self.bytes = runningrates.get('usedbytes', 0) + self.i2bytes = runningrates.get('usedi2bytes', 0) # Reset email self.emailed = False @@ -289,8 +295,15 @@ class Slice: self.capped = False # Reset rates. maxrate = self.MaxRate * 1000 + minrate = self.MinRate * 1000 maxi2rate = self.Maxi2Rate * 1000 - if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate): + mini2rate = self.Mini2Rate * 1000 + + if (maxrate != runningrates.get('maxrate', 0)) or \ + (minrate != runningrates.get('maxrate', 0)) or \ + (maxi2rate != runningrates.get('maxexemptrate', 0)) or \ + (mini2rate != runningrates.get('minexemptrate', 0)) or \ + (self.Share != runningrates.get('share', 0)): logger.log("bwmon: %s reset to %s/%s" % \ (self.name, bwlimit.format_tc_rate(maxrate), @@ -338,7 +351,7 @@ class Slice: # Notify slice if self.emailed == False: subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params - if debug: + if DEBUG: logger.log("bwmon: "+ subject) logger.log("bwmon: "+ message + (footer % params)) else: @@ -347,23 +360,28 @@ class Slice: slicemail(self.name, subject, message + (footer % params)) - def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec): + def update(self, runningrates, rspec): """ Update byte counts and check if byte thresholds have been exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period. Recalculate every time module runs. """ - + # cache share for later comparison + runningrates['share'] = self.Share + # Query Node Manager for max rate overrides self.updateSliceAttributes(rspec) + usedbytes = runningrates['usedbytes'] + usedi2bytes = runningrates['usedi2bytes'] + # Check limits. if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)): sum = self.bytes + (self.ThreshKByte * 1024) maxbyte = self.MaxKByte * 1024 bytesused = usedbytes - self.bytes timeused = int(time.time() - self.time) - # Calcuate new rate. + # Calcuate new rate. in bit/s new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused)) # Never go under MinRate if new_maxrate < (self.MinRate * 1000): @@ -391,8 +409,15 @@ class Slice: new_maxi2rate = self.Maxi2Rate * 1000 self.capped += False - # Apply parameters - bwlimit.set(xid = self.xid, + # Check running values against newly calculated values so as not to run tc + # unnecessarily + if (runningrates['maxrate'] != new_maxrate) or \ + (runningrates['minrate'] != self.MinRate * 1000) or \ + (runningrates['maxexemptrate'] != new_maxi2rate) or \ + (runningrates['minexemptrate'] != self.Mini2Rate * 1000) or \ + (runningrates['share'] != self.Share): + # Apply parameters + bwlimit.set(xid = self.xid, minrate = self.MinRate * 1000, maxrate = new_maxrate, minexemptrate = self.Mini2Rate * 1000, @@ -448,8 +473,7 @@ def sync(nmdbcopy): default_Maxi2Rate, \ default_MaxKByte,\ default_Maxi2KByte,\ - default_Share,\ - verbose + default_Share # All slices names = [] @@ -483,12 +507,12 @@ def sync(nmdbcopy): # to use defaults. if root_xid not in slices.keys(): slices[root_xid] = Slice(root_xid, "root", {}) - slices[root_xid].reset(0, 0, 0, 0, {}) + slices[root_xid].reset({}, {}) # Used by bwlimit. pass {} since there is no rspec (like above). if default_xid not in slices.keys(): slices[default_xid] = Slice(default_xid, "default", {}) - slices[default_xid].reset(0, 0, 0, 0, {}) + slices[default_xid].reset({}, {}) live = {} # Get running slivers that should be on this node (from plc). {xid: name} @@ -510,7 +534,7 @@ def sync(nmdbcopy): # Reset tc counts. for nohtbslice in nohtbslices: if live.has_key(nohtbslice): - slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] ) + slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] ) else: logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice) del slices[nohtbslice] @@ -543,7 +567,7 @@ def sync(nmdbcopy): # _rspec is the computed rspec: NM retrieved data from PLC, computed loans # and made a dict of computed values. slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec']) - slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] ) + slices[newslice].reset( {}, live[newslice]['_rspec'] ) # Double check time for dead slice in deaddb is within 24hr recording period. elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)): deadslice = deaddb[live[newslice]['name']] @@ -551,20 +575,18 @@ def sync(nmdbcopy): slices[newslice] = deadslice['slice'] slices[newslice].xid = newslice # Start the HTB - slices[newslice].reset(deadslice['slice'].MaxRate, - deadslice['slice'].Maxi2Rate, - deadslice['htb']['usedbytes'], - deadslice['htb']['usedi2bytes'], - live[newslice]['_rspec']) + newvals = {"maxrate": deadslice['slice'].MaxRate * 1000, + "minrate": deadslice['slice'].MinRate * 1000, + "maxexemptrate": deadslice['slice'].Maxi2Rate * 1000, + "usedbytes": deadslice['htb']['usedbytes'] * 1000, + "usedi2bytes": deadslice['htb']['usedi2bytes'], + "share":deadslice['htb']['share']} + slices[newslice].reset(newvals, live[newslice]['_rspec']) # Bring up to date - slices[newslice].update(deadslice['slice'].MaxRate, - deadslice['slice'].Maxi2Rate, - deadslice['htb']['usedbytes'], - deadslice['htb']['usedi2bytes'], - deadslice['htb']['share'], - live[newslice]['_rspec']) + slices[newslice].update(newvals, live[newslice]['_rspec']) # Since the slice has been reinitialed, remove from dead database. del deaddb[deadslice['slice'].name] + del newvals else: logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name']) @@ -599,6 +621,7 @@ def sync(nmdbcopy): kernelhtbs = gethtbs(root_xid, default_xid) logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2) + # Update all byte limites on all slices for (xid, slice) in slices.iteritems(): # Monitor only the specified slices if xid == root_xid or xid == default_xid: continue @@ -612,20 +635,11 @@ def sync(nmdbcopy): # that the byte counters have overflowed (or, more # likely, the node was restarted or the HTB buckets # were re-initialized). - slice.reset(kernelhtbs[xid]['maxrate'], \ - kernelhtbs[xid]['maxexemptrate'], \ - kernelhtbs[xid]['usedbytes'], \ - kernelhtbs[xid]['usedi2bytes'], \ - live[xid]['_rspec']) - else: + slice.reset(kernelhtbs[xid], live[xid]['_rspec']) + elif ENABLE: logger.log("bwmon: Updating slice %s" % slice.name, 2) # Update byte counts - slice.update(kernelhtbs[xid]['maxrate'], \ - kernelhtbs[xid]['maxexemptrate'], \ - kernelhtbs[xid]['usedbytes'], \ - kernelhtbs[xid]['usedi2bytes'], \ - kernelhtbs[xid]['share'], - live[xid]['_rspec']) + slice.update(kernelhtbs[xid], live[xid]['_rspec']) logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2) f = open(datafile, "w")