X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=bwmon.py;h=91a79f382af7ab6d74e97c1ef12aed7a0e6bff10;hb=a4c294316df5642055aed39b0f54b61bee1c30e3;hp=63a2d34cbcb06ffc122b485ddb9050d2b292fa51;hpb=68abd8116ec262c638073b46ab622065abaed341;p=nodemanager.git diff --git a/bwmon.py b/bwmon.py index 63a2d34..91a79f3 100644 --- a/bwmon.py +++ b/bwmon.py @@ -52,14 +52,30 @@ except: seconds_per_day = 24 * 60 * 60 bits_per_byte = 8 +dev_default = tools.get_default_if() # Burst to line rate (or node cap). Set by NM. in KBit/s -default_MaxRate = int(bwlimit.get_bwcap() / 1000) +default_MaxRate = int(bwlimit.get_bwcap(dev_default) / 1000) default_Maxi2Rate = int(bwlimit.bwmax / 1000) # 5.4 Gbyte per day. 5.4 * 1024 k * 1024M * 1024G # 5.4 Gbyte per day max allowed transfered per recording period -default_MaxKByte = 5662310 +# 5.4 Gbytes per day is aprox 512k/s for 24hrs (approx because original math was wrong +# but its better to keep a higher byte total and keep people happy than correct +# the problem and piss people off. +# default_MaxKByte = 5662310 + +# -- 6/1/09 +# llp wants to double these, so we use the following +# 1mbit * 24hrs * 60mins * 60secs = bits/day +# 1000000 * 24 * 60 * 60 / (1024 * 8) +default_MaxKByte = 10546875 + # 16.4 Gbyte per day max allowed transfered per recording period to I2 -default_Maxi2KByte = 17196646 +# default_Maxi2KByte = 17196646 + +# -- 6/1/09 +# 3Mb/s for 24hrs a day (30.17 gigs) +default_Maxi2KByte = 31640625 + # Default share quanta default_Share = 1 @@ -199,7 +215,7 @@ class Slice: self.emailed = False self.capped = False - self.updateSliceAttributes(rspec) + self.updateSliceTags(rspec) bwlimit.set(xid = self.xid, minrate = self.MinRate * 1000, maxrate = self.MaxRate * 1000, @@ -210,7 +226,7 @@ class Slice: def __repr__(self): return self.name - def updateSliceAttributes(self, rspec): + def updateSliceTags(self, rspec): ''' Use respects from GetSlivers to PLC to populate slice object. Also do some sanity checking. @@ -271,20 +287,23 @@ class Slice: logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share)) - def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): + def reset(self, runningrates, rspec): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ + # Cache share for later comparison + self.Share = runningrates.get('share', 1) + # Query Node Manager for max rate overrides - self.updateSliceAttributes(rspec) + self.updateSliceTags(rspec) # Reset baseline time self.time = time.time() # Reset baseline byte coutns - self.bytes = usedbytes - self.i2bytes = usedi2bytes + self.bytes = runningrates.get('usedbytes', 0) + self.i2bytes = runningrates.get('usedi2bytes', 0) # Reset email self.emailed = False @@ -292,13 +311,20 @@ class Slice: self.capped = False # Reset rates. maxrate = self.MaxRate * 1000 + minrate = self.MinRate * 1000 maxi2rate = self.Maxi2Rate * 1000 - if (self.MaxRate != runningmaxrate) or (self.Maxi2Rate != runningmaxi2rate): + mini2rate = self.Mini2Rate * 1000 + + if (maxrate != runningrates.get('maxrate', 0)) or \ + (minrate != runningrates.get('maxrate', 0)) or \ + (maxi2rate != runningrates.get('maxexemptrate', 0)) or \ + (mini2rate != runningrates.get('minexemptrate', 0)) or \ + (self.Share != runningrates.get('share', 0)): logger.log("bwmon: %s reset to %s/%s" % \ (self.name, bwlimit.format_tc_rate(maxrate), bwlimit.format_tc_rate(maxi2rate)), 1) - bwlimit.set(xid = self.xid, + bwlimit.set(xid = self.xid, dev = dev_default, minrate = self.MinRate * 1000, maxrate = self.MaxRate * 1000, maxexemptrate = self.Maxi2Rate * 1000, @@ -350,21 +376,20 @@ class Slice: slicemail(self.name, subject, message + (footer % params)) - def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, runningshare, rspec): + def update(self, runningrates, rspec): """ Update byte counts and check if byte thresholds have been exceeded. If exceeded, cap to remaining bytes in limit over remaining time in period. Recalculate every time module runs. """ - - # copy self.Min* and self.*share values for comparison later. - runningMinRate = self.MinRate - runningMini2Rate = self.Mini2Rate - runningshare = self.Share - runningsharei2 = self.Sharei2 + # cache share for later comparison + runningrates['share'] = self.Share # Query Node Manager for max rate overrides - self.updateSliceAttributes(rspec) + self.updateSliceTags(rspec) + + usedbytes = runningrates['usedbytes'] + usedi2bytes = runningrates['usedi2bytes'] # Check limits. if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)): @@ -402,11 +427,11 @@ class Slice: # Check running values against newly calculated values so as not to run tc # unnecessarily - if (runningmaxrate != new_maxrate) or \ - (runningMinRate != self.MinRate) or \ - (runningmaxi2rate != new_maxi2rate) or \ - (runningMini2Rate != self.Mini2Rate) or \ - (runningshare != self.Share): + if (runningrates['maxrate'] != new_maxrate) or \ + (runningrates['minrate'] != self.MinRate * 1000) or \ + (runningrates['maxexemptrate'] != new_maxi2rate) or \ + (runningrates['minexemptrate'] != self.Mini2Rate * 1000) or \ + (runningrates['share'] != self.Share): # Apply parameters bwlimit.set(xid = self.xid, minrate = self.MinRate * 1000, @@ -498,12 +523,12 @@ def sync(nmdbcopy): # to use defaults. if root_xid not in slices.keys(): slices[root_xid] = Slice(root_xid, "root", {}) - slices[root_xid].reset(0, 0, 0, 0, {}) + slices[root_xid].reset({}, {}) # Used by bwlimit. pass {} since there is no rspec (like above). if default_xid not in slices.keys(): slices[default_xid] = Slice(default_xid, "default", {}) - slices[default_xid].reset(0, 0, 0, 0, {}) + slices[default_xid].reset({}, {}) live = {} # Get running slivers that should be on this node (from plc). {xid: name} @@ -525,7 +550,7 @@ def sync(nmdbcopy): # Reset tc counts. for nohtbslice in nohtbslices: if live.has_key(nohtbslice): - slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] ) + slices[nohtbslice].reset( {}, live[nohtbslice]['_rspec'] ) else: logger.log("bwmon: Removing abondoned slice %s from dat." % nohtbslice) del slices[nohtbslice] @@ -558,7 +583,7 @@ def sync(nmdbcopy): # _rspec is the computed rspec: NM retrieved data from PLC, computed loans # and made a dict of computed values. slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec']) - slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] ) + slices[newslice].reset( {}, live[newslice]['_rspec'] ) # Double check time for dead slice in deaddb is within 24hr recording period. elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)): deadslice = deaddb[live[newslice]['name']] @@ -566,20 +591,18 @@ def sync(nmdbcopy): slices[newslice] = deadslice['slice'] slices[newslice].xid = newslice # Start the HTB - slices[newslice].reset(deadslice['slice'].MaxRate, - deadslice['slice'].Maxi2Rate, - deadslice['htb']['usedbytes'], - deadslice['htb']['usedi2bytes'], - live[newslice]['_rspec']) + newvals = {"maxrate": deadslice['slice'].MaxRate * 1000, + "minrate": deadslice['slice'].MinRate * 1000, + "maxexemptrate": deadslice['slice'].Maxi2Rate * 1000, + "usedbytes": deadslice['htb']['usedbytes'] * 1000, + "usedi2bytes": deadslice['htb']['usedi2bytes'], + "share":deadslice['htb']['share']} + slices[newslice].reset(newvals, live[newslice]['_rspec']) # Bring up to date - slices[newslice].update(deadslice['slice'].MaxRate, - deadslice['slice'].Maxi2Rate, - deadslice['htb']['usedbytes'], - deadslice['htb']['usedi2bytes'], - deadslice['htb']['share'], - live[newslice]['_rspec']) + slices[newslice].update(newvals, live[newslice]['_rspec']) # Since the slice has been reinitialed, remove from dead database. del deaddb[deadslice['slice'].name] + del newvals else: logger.log("bwmon: Slice %s doesn't have xid. Skipping." % live[newslice]['name']) @@ -628,29 +651,52 @@ def sync(nmdbcopy): # that the byte counters have overflowed (or, more # likely, the node was restarted or the HTB buckets # were re-initialized). - slice.reset(kernelhtbs[xid]['maxrate'], \ - kernelhtbs[xid]['maxexemptrate'], \ - kernelhtbs[xid]['usedbytes'], \ - kernelhtbs[xid]['usedi2bytes'], \ - live[xid]['_rspec']) + slice.reset(kernelhtbs[xid], live[xid]['_rspec']) elif ENABLE: logger.log("bwmon: Updating slice %s" % slice.name, 2) # Update byte counts - slice.update(kernelhtbs[xid]['maxrate'], \ - kernelhtbs[xid]['maxexemptrate'], \ - kernelhtbs[xid]['usedbytes'], \ - kernelhtbs[xid]['usedi2bytes'], \ - kernelhtbs[xid]['share'], - live[xid]['_rspec']) + slice.update(kernelhtbs[xid], live[xid]['_rspec']) logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile), 2) f = open(datafile, "w") pickle.dump((version, slices, deaddb), f) f.close() +# doesnt use generic default interface because this runs as its own thread. +# changing the config variable will not have an effect since GetSlivers: pass +def getDefaults(nmdbcopy): + ''' + Get defaults from default slice's slice attributes. + ''' + status = True + # default slice + dfltslice = nmdbcopy.get(PLC_SLICE_PREFIX+"_default") + if dfltslice: + if dfltslice['rspec']['net_max_rate'] == -1: + allOff() + status = False + return status + + +def allOff(): + """ + Turn off all slice HTBs + """ + # Get/set special slice IDs + root_xid = bwlimit.get_xid("root") + default_xid = bwlimit.get_xid("default") + kernelhtbs = gethtbs(root_xid, default_xid) + if len(kernelhtbs): + logger.log("bwlimit: Disabling all running HTBs.") + for htb in kernelhtbs.keys(): bwlimit.off(htb) + + lock = threading.Event() def run(): - """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting.""" + """ + When run as a thread, wait for event, lock db, deep copy it, release it, + run bwmon.GetSlivers(), then go back to waiting. + """ logger.log("bwmon: Thread started", 2) while True: lock.wait() @@ -658,7 +704,11 @@ def run(): database.db_lock.acquire() nmdbcopy = copy.deepcopy(database.db) database.db_lock.release() - try: sync(nmdbcopy) + try: + if getDefaults(nmdbcopy) and len(bwlimit.tc("class show dev %s" % dev_default)) > 0: + # class show to check if net:InitNodeLimit:bwlimit.init has run. + sync(nmdbcopy) + else: logger.log("bwmon: BW limits DISABLED.") except: logger.log_exc() lock.clear()