X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=bwmon.py;h=91a79f382af7ab6d74e97c1ef12aed7a0e6bff10;hb=refs%2Fheads%2F1.8;hp=10cdd9543f2c730275cb4414d519b9bc36fbd173;hpb=9ce0d803975411f43ded4f8844c6eed314d670e1;p=nodemanager.git diff --git a/bwmon.py b/bwmon.py index 10cdd95..91a79f3 100644 --- a/bwmon.py +++ b/bwmon.py @@ -33,14 +33,18 @@ import database from sets import Set # Defaults -debug = False -verbose = False +# Set DEBUG to True if you don't want to send emails +DEBUG = False +# Set ENABLE to False to setup buckets, but not limit. +ENABLE = True + datafile = "/var/lib/misc/bwmon.dat" try: sys.path.append("/etc/planetlab") from plc_config import * except: + DEBUG = True logger.log("bwmon: Warning: Configuration file /etc/planetlab/plc_config.py not found", 2) logger.log("bwmon: Running in DEBUG mode. Logging to file and not emailing.", 1) @@ -48,14 +52,30 @@ except: seconds_per_day = 24 * 60 * 60 bits_per_byte = 8 +dev_default = tools.get_default_if() # Burst to line rate (or node cap). Set by NM. in KBit/s -default_MaxRate = int(bwlimit.get_bwcap() / 1000) +default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000) default_Maxi2Rate = int(bwlimit.bwmax / 1000) # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G # 5.4 Gbyte per day max allowed transfered per recording period -default_MaxKByte = 5662310 +# 5.4 Gbytes per day is aprox 512k/s for 24hrs (approx because original math was wrong +# but its better to keep a higher byte total and keep people happy than correct +# the problem and piss people off. +# default_MaxKByte = 5662310 + +# -- 6/1/09 +# llp wants to double these, so we use the following +# 1mbit * 24hrs * 60mins * 60secs = bits/day +# 1000000 * 24 * 60 * 60 / (1024 * 8) +default_MaxKByte = 10546875 + # 16.4 Gbyte per day max allowed transfered per recording period to I2 -default_Maxi2KByte = 17196646 +# default_Maxi2KByte = 17196646 + +# -- 6/1/09 +# 3Mb/s for 24hrs a day (30.17 gigs) +default_Maxi2KByte = 31640625 + # Default share quanta default_Share = 1 @@ -195,7 +215,7 @@ class Slice: self.emailed = False self.capped = False - self.updateSliceAttributes(rspec) + self.updateSliceTags(rspec) bwlimit.set(xid = self.xid, minrate = self.MinRate * 1000, maxrate = self.MaxRate * 1000, @@ -206,7 +226,7 @@ class Slice: def __repr__(self): return self.name - def updateSliceAttributes(self, rspec): + def updateSliceTags(self, rspec): ''' Use respects from GetSlivers to PLC to populate slice object. Also do some sanity checking. @@ -221,7 +241,7 @@ class Slice: self.MinRate = MinRate logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate)) - MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000)) + MaxRate = int(rspec.get('net_max_rate', default_MaxRate)) if MaxRate != self.MaxRate: self.MaxRate = MaxRate logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate)) @@ -231,7 +251,7 @@ class Slice: self.Mini2Rate = Mini2Rate logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate)) - Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000)) + Maxi2Rate = int(rspec.get('net_i2_max_rate', default_Maxi2Rate)) if Maxi2Rate != self.Maxi2Rate: self.Maxi2Rate = Maxi2Rate logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate)) @@ -267,21 +287,23 @@ class Slice: logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share)) - def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): + def reset(self, runningrates, rspec): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ - + # Cache share for later comparison + self.Share = runningrates.get('share', 1) + # Query Node Manager for max rate overrides - self.updateSliceAttributes(rspec) + self.updateSliceTags(rspec) # Reset baseline time self.time = time.time() # Reset baseline byte coutns - self.bytes = usedbytes - self.i2bytes = usedi2bytes + self.bytes = runningrates.get('usedbytes', 0) + self.i2bytes = runningrates.get('usedi2bytes', 0) # Reset email self.emailed = False @@ -289,13 +311,20 @@ class Slice: self.capped = False # Reset rates. maxrate = self.MaxRate * 1000 + minrate = self.MinRate * 1000 maxi2rate = self.Maxi2Rate * 1000 - if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate): + mini2rate = self.Mini2Rate * 1000 + + if (maxrate != runningrates.get('maxrate', 0)) or \ + (minrate != runningrates.get('maxrate', 0)) or \ + (maxi2rate != runningrates.get('maxexemptrate', 0)) or \ + (mini2rate != runningrates.get('minexemptrate', 0)) or \ + (self.Share != runningrates.get('share', 0)): logger.log("bwmon: %s reset to %s/%s" % \ (self.name, bwlimit.format_tc_rate(maxrate), bwlimit.format_tc_rate(maxi2rate)), 1) - bwlimit.set(xid = self.xid, + bwlimit.set(xid = self.xid, dev = dev_default, minrate = self.MinRate * 1000, maxrate = self.MaxRate * 1000, maxexemptrate = self.Maxi2Rate * 1000, @@ -314,7 +343,7 @@ class Slice: 'date': time.asctime(time.gmtime()) + " GMT", 'period': format_period(period)} - if new_maxrate != self.MaxRate: + if new_maxrate != (self.MaxRate * 1000): # Format template parameters for low bandwidth message params['class'] = "low bandwidth" params['bytes'] = format_bytes(usedbytes - self.bytes) @@ -325,7 +354,7 @@ class Slice: message += template % params logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params) - if new_maxexemptrate != self.Maxi2Rate: + if new_maxexemptrate != (self.Maxi2Rate * 1000): # Format template parameters for high bandwidth message params['class'] = "high bandwidth" params['bytes'] = format_bytes(usedi2bytes - self.i2bytes) @@ -336,25 +365,31 @@ class Slice: logger.log("bwmon: ** %(slice)s %(class)s capped at %(new_maxrate)s/s " % params) # Notify slice - if message and self.emailed == False: + if self.emailed == False: subject = "pl_mom capped bandwidth of slice %(slice)s on %(hostname)s" % params - if debug: + if DEBUG: logger.log("bwmon: "+ subject) logger.log("bwmon: "+ message + (footer % params)) else: self.emailed = True + logger.log("bwmon: Emailing %s" % self.name) slicemail(self.name, subject, message + (footer % params)) - def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec): + def update(self, runningrates, rspec): """ Update byte counts and check if byte thresholds have been exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period. Recalculate every time module runs. """ - + # cache share for later comparison + runningrates['share'] = self.Share + # Query Node Manager for max rate overrides - self.updateSliceAttributes(rspec) + self.updateSliceTags(rspec) + + usedbytes = runningrates['usedbytes'] + usedi2bytes = runningrates['usedi2bytes'] # Check limits. if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)): @@ -362,17 +397,17 @@ class Slice: maxbyte = self.MaxKByte * 1024 bytesused = usedbytes - self.bytes timeused = int(time.time() - self.time) - # Calcuate new rate. + # Calcuate new rate. in bit/s new_maxrate = int(((maxbyte - bytesused) * 8)/(period - timeused)) # Never go under MinRate if new_maxrate < (self.MinRate * 1000): new_maxrate = self.MinRate * 1000 # State information. I'm capped. - self.capped = True + self.capped += True else: # Sanity Check new_maxrate = self.MaxRate * 1000 - self.capped = False + self.capped += False if usedi2bytes >= (self.i2bytes + (self.Threshi2KByte * 1024)): maxi2byte = self.Maxi2KByte * 1024 @@ -384,14 +419,21 @@ class Slice: if new_maxi2rate < (self.Mini2Rate * 1000): new_maxi2rate = self.Mini2Rate * 1000 # State information. I'm capped. - self.capped = True + self.capped += True else: # Sanity new_maxi2rate = self.Maxi2Rate * 1000 - self.capped = False - - # Apply parameters - bwlimit.set(xid = self.xid, + self.capped += False + + # Check running values against newly calculated values so as not to run tc + # unnecessarily + if (runningrates['maxrate'] != new_maxrate) or \ + (runningrates['minrate'] != self.MinRate * 1000) or \ + (runningrates['maxexemptrate'] != new_maxi2rate) or \ + (runningrates['minexemptrate'] != self.Mini2Rate * 1000) or \ + (runningrates['share'] != self.Share): + # Apply parameters + bwlimit.set(xid = self.xid, minrate = self.MinRate * 1000, maxrate = new_maxrate, minexemptrate = self.Mini2Rate * 1000, @@ -399,7 +441,7 @@ class Slice: share = self.Share) # Notify slice - if self.capped == True and self.emailed == False: + if self.capped == True: self.notify(new_maxrate, new_maxi2rate, usedbytes, usedi2bytes) @@ -447,8 +489,7 @@ def sync(nmdbcopy): default_Maxi2Rate, \ default_MaxKByte,\ default_Maxi2KByte,\ - default_Share,\ - verbose + default_Share # All slices names = [] @@ -482,12 +523,12 @@ def sync(nmdbcopy): # to use defaults. if root_xid not in slices.keys(): slices[root_xid] = Slice(root_xid, "root", {}) - slices[root_xid].reset(0, 0, 0, 0, {}) + slices[root_xid].reset({}, {}) # Used by bwlimit. pass {} since there is no rspec (like above). if default_xid not in slices.keys(): slices[default_xid] = Slice(default_xid, "default", {}) - slices[default_xid].reset(0, 0, 0, 0, {}) + slices[default_xid].reset({}, {}) live = {} # Get running slivers that should be on this node (from plc). {xid: name} @@ -509,7 +550,7 @@ def sync(nmdbcopy): # Reset tc counts. for nohtbslice in nohtbslices: if live.has_key(nohtbslice): - slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] ) + slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] ) else: logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice) del slices[nohtbslice] @@ -542,7 +583,7 @@ def sync(nmdbcopy): # _rspec is the computed rspec: NM retrieved data from PLC, computed loans # and made a dict of computed values. slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec']) - slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] ) + slices[newslice].reset( {}, live[newslice]['_rspec'] ) # Double check time for dead slice in deaddb is within 24hr recording period. elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)): deadslice = deaddb[live[newslice]['name']] @@ -550,20 +591,18 @@ def sync(nmdbcopy): slices[newslice] = deadslice['slice'] slices[newslice].xid = newslice # Start the HTB - slices[newslice].reset(deadslice['slice'].MaxRate, - deadslice['slice'].Maxi2Rate, - deadslice['htb']['usedbytes'], - deadslice['htb']['usedi2bytes'], - live[newslice]['_rspec']) + newvals = {"maxrate": deadslice['slice'].MaxRate * 1000, + "minrate": deadslice['slice'].MinRate * 1000, + "maxexemptrate": deadslice['slice'].Maxi2Rate * 1000, + "usedbytes": deadslice['htb']['usedbytes'] * 1000, + "usedi2bytes": deadslice['htb']['usedi2bytes'], + "share":deadslice['htb']['share']} + slices[newslice].reset(newvals, live[newslice]['_rspec']) # Bring up to date - slices[newslice].update(deadslice['slice'].MaxRate, - deadslice['slice'].Maxi2Rate, - deadslice['htb']['usedbytes'], - deadslice['htb']['usedi2bytes'], - deadslice['htb']['share'], - live[newslice]['_rspec']) + slices[newslice].update(newvals, live[newslice]['_rspec']) # Since the slice has been reinitialed, remove from dead database. del deaddb[deadslice['slice'].name] + del newvals else: logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name']) @@ -598,6 +637,7 @@ def sync(nmdbcopy): kernelhtbs = gethtbs(root_xid, default_xid) logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__(), 2) + # Update all byte limites on all slices for (xid, slice) in slices.iteritems(): # Monitor only the specified slices if xid == root_xid or xid == default_xid: continue @@ -611,29 +651,52 @@ def sync(nmdbcopy): # that the byte counters have overflowed (or, more # likely, the node was restarted or the HTB buckets # were re-initialized). - slice.reset(kernelhtbs[xid]['maxrate'], \ - kernelhtbs[xid]['maxexemptrate'], \ - kernelhtbs[xid]['usedbytes'], \ - kernelhtbs[xid]['usedi2bytes'], \ - live[xid]['_rspec']) - else: + slice.reset(kernelhtbs[xid], live[xid]['_rspec']) + elif ENABLE: logger.log("bwmon: Updating slice %s" % slice.name, 2) # Update byte counts - slice.update(kernelhtbs[xid]['maxrate'], \ - kernelhtbs[xid]['maxexemptrate'], \ - kernelhtbs[xid]['usedbytes'], \ - kernelhtbs[xid]['usedi2bytes'], \ - kernelhtbs[xid]['share'], - live[xid]['_rspec']) + slice.update(kernelhtbs[xid], live[xid]['_rspec']) logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2) f = open(datafile, "w") pickle.dump((version, slices, deaddb), f) f.close() +# doesnt use generic default interface because this runs as its own thread. +# changing the config variable will not have an effect since GetSlivers: pass +def getDefaults(nmdbcopy): + ''' + Get defaults from default slice's slice attributes. + ''' + status = True + # default slice + dfltslice = nmdbcopy.get(PLC_SLICE_PREFIX+"_default") + if dfltslice: + if dfltslice['rspec']['net_max_rate'] == -1: + allOff() + status = False + return status + + +def allOff(): + """ + Turn off all slice HTBs + """ + # Get/set special slice IDs + root_xid = bwlimit.get_xid("root") + default_xid = bwlimit.get_xid("default") + kernelhtbs = gethtbs(root_xid, default_xid) + if len(kernelhtbs): + logger.log("bwlimit: Disabling all running HTBs.") + for htb in kernelhtbs.keys(): bwlimit.off(htb) + + lock = threading.Event() def run(): - """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting.""" + """ + When run as a thread, wait for event, lock db, deep copy it, release it, + run bwmon.GetSlivers(), then go back to waiting. + """ logger.log("bwmon: Thread started", 2) while True: lock.wait() @@ -641,7 +704,11 @@ def run(): database.db_lock.acquire() nmdbcopy = copy.deepcopy(database.db) database.db_lock.release() - try: sync(nmdbcopy) + try: + if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev %s" % dev_default)) > 0: + # class show to check if net:InitNodeLimit:bwlimit.init has run. + sync(nmdbcopy) + else: logger.log("bwmon: BW limits DISABLED.") except: logger.log_exc() lock.clear()