X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=bwmon.py;h=ee5d41b2dc3afef272a7615594274abac6e982a2;hb=5c20310810597b3601deca0daf6a3f9cee246fdd;hp=2806713c7dcee9525904b37d8d2423185300d388;hpb=78431edeac05e3ab1ca8022514f24b173da5286a;p=nodemanager.git diff --git a/bwmon.py b/bwmon.py index 2806713..ee5d41b 100644 --- a/bwmon.py +++ b/bwmon.py @@ -1,31 +1,34 @@ #!/usr/bin/python # -# Average bandwidth monitoring script. Run periodically via cron(8) to +# Average bandwidth monitoring script. Run periodically via NM db.sync to # enforce a soft limit on daily bandwidth usage for each slice. If a -# slice is found to have exceeded its daily bandwidth usage when the -# script is run, its instantaneous rate will be capped at the desired -# average rate. Thus, in the worst case, a slice will only be able to -# send a little more than twice its average daily limit. +# slice is found to have transmitted 80% of its daily byte limit usage, +# its instantaneous rate will be capped at the bytes remaning in the limit +# over the time remaining in the recording period. # # Two separate limits are enforced, one for destinations exempt from -# the node bandwidth cap, and the other for all other destinations. +# the node bandwidth cap (i.e. Internet2), and the other for all other destinations. # # Mark Huang # Andy Bavier # Faiyaz Ahmed -# Copyright (C) 2004-2006 The Trustees of Princeton University +# Copyright (C) 2004-2008 The Trustees of Princeton University # -# $Id: bwmon.py,v 1.12 2007/03/06 20:46:54 faiyaza Exp $ +# $Id$ # import os import sys import time import pickle - import socket -import bwlimit import logger +import copy +import threading +import tools + +import bwlimit +import database from sets import Set @@ -44,7 +47,7 @@ seconds_per_day = 24 * 60 * 60 bits_per_byte = 8 # Defaults -debug = False +debug = True verbose = False datafile = "/var/lib/misc/bwmon.dat" #nm = None @@ -126,6 +129,10 @@ def format_period(seconds): return "%.0f seconds" % seconds def slicemail(slice, subject, body): + ''' + Front end to sendmail. Sends email to slice alias with given subject and body. + ''' + sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w") # PLC has a separate list for pl_mom messages @@ -169,13 +176,15 @@ class Slice: time - beginning of recording period in UNIX seconds bytes - low bandwidth bytes transmitted at the beginning of the recording period i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F) - ByteMax - total volume of data allowed - ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period) - ExemptByteMax - Same as above, but for i2. - ExemptByteThresh - i2 ByteThresh - maxrate - max_rate slice attribute. - maxexemptrate - max_exempt_rate slice attribute. - self.emailed = did we email during this recording period + MaxKByte - total volume of data allowed + ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period) + Maxi2KByte - same as MaxKByte, but for i2 + Threshi2Kbyte - same as Threshi2KByte, but for i2 + MaxRate - max_rate slice attribute. + Maxi2Rate - max_exempt_rate slice attribute. + Share - Used by Sirius to loan min rates + Sharei2 - Used by Sirius to loan min rates for i2 + self.emailed - did slice recv email during this recording period """ @@ -209,7 +218,10 @@ class Slice: return self.name def updateSliceAttributes(self, rspec): - # Get attributes + ''' + Use respects from GetSlivers to PLC to populate slice object. Also + do some sanity checking. + ''' # Sanity check plus policy decision for MinRate: # Minrate cant be greater than 25% of MaxRate or NodeCap. @@ -301,8 +313,9 @@ class Slice: def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): """ - Update byte counts and check if byte limits have been - exceeded. + Update byte counts and check if byte thresholds have been + exceeded. If exceeded, cap to remaining bytes in limit over remaining in period. + Recalculate every time module runs. """ # Query Node Manager for max rate overrides @@ -317,6 +330,9 @@ class Slice: 'period': format_period(period)} if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)): + if verbose: + logger.log("bwmon: %s over thresh %s" \ + % (self.name, format_bytes(self.ThreshKByte * 1024))) sum = self.bytes + (self.ThreshKByte * 1024) maxbyte = self.MaxKByte * 1024 bytesused = usedbytes - self.bytes @@ -331,11 +347,12 @@ class Slice: params['class'] = "low bandwidth" params['bytes'] = format_bytes(usedbytes - self.bytes) params['limit'] = format_bytes(self.MaxKByte * 1024) + params['thresh'] = format_bytes(self.ThreshKByte * 1024) params['new_maxrate'] = bwlimit.format_tc_rate(new_maxrate) if verbose: logger.log("bwmon: %(slice)s %(class)s " \ - "%(bytes)s of %(limit)s (%(new_maxrate)s/s maxrate)" % \ + "%(bytes)s of %(limit)s max %(thresh)s thresh (%(new_maxrate)s/s maxrate)" % \ params) # Cap low bandwidth burst rate @@ -382,7 +399,43 @@ class Slice: self.emailed = True slicemail(self.name, subject, message + (footer % params)) -def GetSlivers(db): +def gethtbs(root_xid, default_xid): + """ + Return dict {xid: {*rates}} of running htbs as reported by tc that have names. + Turn off HTBs without names. + """ + livehtbs = {} + for params in bwlimit.get(): + (xid, share, + minrate, maxrate, + minexemptrate, maxexemptrate, + usedbytes, usedi2bytes) = params + + name = bwlimit.get_slice(xid) + + if (name is None) \ + and (xid != root_xid) \ + and (xid != default_xid): + # Orphaned (not associated with a slice) class + name = "%d?" % xid + logger.log("bwmon: Found orphaned HTB %s. Removing." %name) + bwlimit.off(xid) + + livehtbs[xid] = {'share': share, + 'minrate': minrate, + 'maxrate': maxrate, + 'maxexemptrate': maxexemptrate, + 'minexemptrate': minexemptrate, + 'usedbytes': usedbytes, + 'name': name, + 'usedi2bytes': usedi2bytes} + + return livehtbs + +def sync(nmdbcopy): + """ + Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit. + """ # Defaults global datafile, \ period, \ @@ -398,7 +451,6 @@ def GetSlivers(db): # All slices names = [] - # Incase the limits have changed. default_MaxRate = int(bwlimit.get_bwcap() / 1000) default_Maxi2Rate = int(bwlimit.bwmax / 1000) @@ -410,15 +462,16 @@ def GetSlivers(db): try: f = open(datafile, "r+") logger.log("bwmon: Loading %s" % datafile) - (version, slices) = pickle.load(f) + (version, slices, deaddb) = pickle.load(f) f.close() # Check version of data file - if version != "$Id: bwmon.py,v 1.12 2007/03/06 20:46:54 faiyaza Exp $": + if version != "$Id$": logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile)) raise Exception except Exception: - version = "$Id: bwmon.py,v 1.12 2007/03/06 20:46:54 faiyaza Exp $" + version = "$Id$" slices = {} + deaddb = {} # Get/set special slice IDs root_xid = bwlimit.get_xid("root") @@ -437,98 +490,149 @@ def GetSlivers(db): live = {} # Get running slivers that should be on this node (from plc). {xid: name} - for sliver in db.keys(): - live[bwlimit.get_xid(sliver)] = sliver + # db keys on name, bwmon keys on xid. db doesnt have xid either. + for plcSliver in nmdbcopy.keys(): + live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver] - # Setup new slices. - # live.xids - runing.xids = new.xids - newslicesxids = Set(live.keys()) - Set(slices.keys()) - for newslicexid in newslicesxids: + logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__()) + logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__()) + + # Get actual running values from tc. + # Update slice totals and bandwidth. {xid: {values}} + kernelhtbs = gethtbs(root_xid, default_xid) + logger.log("bwmon: Found %s running HTBs" % kernelhtbs.keys().__len__()) + + # The dat file has HTBs for slices, but the HTBs aren't running + nohtbslices = Set(slices.keys()) - Set(kernelhtbs.keys()) + logger.log( "bwmon: Found %s slices in dat but not running." % nohtbslices.__len__() ) + # Reset tc counts. + for nohtbslice in nohtbslices: + if live.has_key(nohtbslice): + slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] ) + + # The dat file doesnt have HTB for the slice, but slice is running and + # HTB exists + slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys()) + logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__() ) + for slicenodat in slicesnodat: + slices[slicenodat] = Slice(slicenodat, + live[slicenodat]['name'], + live[slicenodat]['_rspec']) + + # Get new slices. + # Slices in GetSlivers but not running HTBs + newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys()) + logger.log("bwmon: Found %s new slices" % newslicesxids.__len__()) + + # Setup new slices + for newslice in newslicesxids: # Delegated slices dont have xids (which are uids) since they haven't been # instantiated yet. - if newslicexid != None and db[live[newslicexid]].has_key('_rspec') == True: - logger.log("bwmon: New Slice %s" % live[newslicexid]) - # _rspec is the computed rspec: NM retrieved data from PLC, computed loans - # and made a dict of computed values. - rspec = db[live[newslicexid]]['_rspec'] - slices[newslicexid] = Slice(newslicexid, live[newslicexid], rspec) - slices[newslicexid].reset(0, 0, 0, 0, rspec) + if newslice != None and live[newslice].has_key('_rspec') == True: + if live[newslice]['name'] not in deaddb.keys(): + logger.log( "bwmon: New Slice %s" % live[newslice]['name'] ) + # _rspec is the computed rspec: NM retrieved data from PLC, computed loans + # and made a dict of computed values. + slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec']) + slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] ) + # Double check time for dead slice in deaddb is within 24hr recording period. + elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)): + deadslice = deaddb[live[newslice]['name']] + logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name']) + slices[newslice] = deadslice['slice'] + slices[newslice].xid = newslice + # Start the HTB + slices[newslice].reset(deadslice['slice'].MaxRate, + deadslice['slice'].Maxi2Rate, + deadslice['htb']['usedbytes'], + deadslice['htb']['usedi2bytes'], + live[newslice]['_rspec']) + # Bring up to date + slices[newslice].update(deadslice['slice'].MaxRate, + deadslice['slice'].Maxi2Rate, + deadslice['htb']['usedbytes'], + deadslice['htb']['usedi2bytes'], + live[newslice]['_rspec']) + # Since the slice has been reinitialed, remove from dead database. + del deaddb[deadslice] else: - logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslicexid]) - # Get actual running values from tc. - # Update slice totals and bandwidth. - for params in bwlimit.get(): - (xid, share, - minrate, maxrate, - minexemptrate, maxexemptrate, - usedbytes, usedi2bytes) = params - - # Ignore root and default buckets + logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslice]['name']) + + # Move dead slices that exist in the pickle file, but + # aren't instantiated by PLC into the dead dict until + # recording period is over. This is to avoid the case where a slice is dynamically created + # and destroyed then recreated to get around byte limits. + dead = Set(slices.keys()) - Set(live.keys()) + logger.log("bwmon: Found %s dead slices" % (dead.__len__() - 2)) + for xid in dead: if xid == root_xid or xid == default_xid: continue - - name = bwlimit.get_slice(xid) - if name is None: - # Orphaned (not associated with a slice) class - name = "%d?" % xid - bwlimit.off(xid) - + logger.log("bwmon: removing dead slice %s " % xid) + if slices.has_key(xid): + # add slice (by name) to deaddb + deaddb[slices[xid].name] = {'slice': slices[xid], 'htb': kernelhtbs[xid]} + del slices[xid] + if kernelhtbs.has_key(xid): bwlimit.off(xid) + + # Clean up deaddb + for (deadslicexid, deadslice) in deaddb.iteritems(): + if (time.time() >= (deadslice.time() + period)): + logger.log("bwmon: Removing dead slice %s from dat." % deadslice.name) + del deaddb[deadslicexid] + + # Get actual running values from tc since we've added and removed buckets. + # Update slice totals and bandwidth. {xid: {values}} + kernelhtbs = gethtbs(root_xid, default_xid) + logger.log("bwmon: now %s running HTBs" % kernelhtbs.keys().__len__()) + + for (xid, slice) in slices.iteritems(): # Monitor only the specified slices + if xid == root_xid or xid == default_xid: continue if names and name not in names: continue - #slices is populated from the pickle file - #xid is populated from bwlimit (read from /etc/passwd) - if slices.has_key(xid): - slice = slices[xid] - if time.time() >= (slice.time + period) or \ - usedbytes < slice.bytes or usedi2bytes < slice.i2bytes: - # Reset to defaults every 24 hours or if it appears - # that the byte counters have overflowed (or, more - # likely, the node was restarted or the HTB buckets - # were re-initialized). - slice.reset(maxrate, \ - maxexemptrate, \ - usedbytes, \ - usedi2bytes, \ - db[slice.name]['_rspec']) - else: - # Update byte counts - slice.update(maxrate, \ - maxexemptrate, \ - usedbytes, \ - usedi2bytes, \ - db[slice.name]['_rspec']) + + if (time.time() >= (slice.time + period)) or \ + (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \ + (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes): + # Reset to defaults every 24 hours or if it appears + # that the byte counters have overflowed (or, more + # likely, the node was restarted or the HTB buckets + # were re-initialized). + slice.reset(kernelhtbs[xid]['maxrate'], \ + kernelhtbs[xid]['maxexemptrate'], \ + kernelhtbs[xid]['usedbytes'], \ + kernelhtbs[xid]['usedi2bytes'], \ + live[xid]['_rspec']) else: - # Just in case. Probably (hopefully) this will never happen. - # New slice, initialize state - logger.log("bwmon: New Slice %s" % name) - slice = slices[xid] = Slice(xid, name, db[slice.name]['_rspec']) - slice.reset(maxrate, \ - maxexemptrate, \ - usedbytes, \ - usedi2bytes, \ - db[slice.name]['_rspec']) - - # Delete dead slices - dead = Set(slices.keys()) - Set(live.keys()) - for xid in dead: - if xid == root_xid or xid == default_xid: - continue - del slices[xid] - bwlimit.off(xid) - - logger.log("bwmon: Saving %s" % datafile) + if debug: logger.log("bwmon: Updating slice %s" % slice.name) + # Update byte counts + slice.update(kernelhtbs[xid]['maxrate'], \ + kernelhtbs[xid]['maxexemptrate'], \ + kernelhtbs[xid]['usedbytes'], \ + kernelhtbs[xid]['usedi2bytes'], \ + live[xid]['_rspec']) + + logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile)) f = open(datafile, "w") - pickle.dump((version, slices), f) + pickle.dump((version, slices, deaddb), f) f.close() - -#def GetSlivers(data): -# for sliver in data['slivers']: -# if sliver.has_key('attributes'): -# print sliver -# for attribute in sliver['attributes']: -# if attribute['name'] == "KByteThresh": print attribute['value'] - -#def start(options, config): -# pass +lock = threading.Event() +def run(): + """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting.""" + if debug: logger.log("bwmon: Thread started") + while True: + lock.wait() + if debug: logger.log("bwmon: Event received. Running.") + database.db_lock.acquire() + nmdbcopy = copy.deepcopy(database.db) + database.db_lock.release() + try: sync(nmdbcopy) + except: logger.log_exc() + lock.clear() + +def start(*args): + tools.as_daemon_thread(run) + +def GetSlivers(*args): + pass