(no commit message)
[nodemanager.git] / bwmon.py
index b8ca5e2..ee5d41b 100644 (file)
--- a/bwmon.py
+++ b/bwmon.py
@@ -1,31 +1,34 @@
 #!/usr/bin/python
 #
-# Average bandwidth monitoring script. Run periodically via cron(8) to
+# Average bandwidth monitoring script. Run periodically via NM db.sync to
 # enforce a soft limit on daily bandwidth usage for each slice. If a
-# slice is found to have exceeded its daily bandwidth usage when the
-# script is run, its instantaneous rate will be capped at the desired
-# average rate. Thus, in the worst case, a slice will only be able to
-# send a little more than twice its average daily limit.
+# slice is found to have transmitted 80% of its daily byte limit usage,
+# its instantaneous rate will be capped at the bytes remaning in the limit
+# over the time remaining in the recording period.
 #
 # Two separate limits are enforced, one for destinations exempt from
-# the node bandwidth cap, and the other for all other destinations.
+# the node bandwidth cap (i.e. Internet2), and the other for all other destinations.
 #
 # Mark Huang <mlhuang@cs.princeton.edu>
 # Andy Bavier <acb@cs.princeton.edu>
 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
-# Copyright (C) 2004-2006 The Trustees of Princeton University
+# Copyright (C) 2004-2008 The Trustees of Princeton University
 #
-# $Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $
+# $Id$
 #
 
 import os
 import sys
 import time
 import pickle
-
 import socket
-import bwlimit
 import logger
+import copy
+import threading
+import tools
+
+import bwlimit
+import database
 
 from sets import Set
 
@@ -44,7 +47,7 @@ seconds_per_day = 24 * 60 * 60
 bits_per_byte = 8
 
 # Defaults
-debug = False
+debug = True
 verbose = False
 datafile = "/var/lib/misc/bwmon.dat"
 #nm = None
@@ -126,6 +129,10 @@ def format_period(seconds):
         return "%.0f seconds" % seconds
 
 def slicemail(slice, subject, body):
+    '''
+    Front end to sendmail.  Sends email to slice alias with given subject and body.
+    '''
+
     sendmail = os.popen("/usr/sbin/sendmail -N never -t -f%s" % PLC_MAIL_SUPPORT_ADDRESS, "w")
 
     # PLC has a separate list for pl_mom messages
@@ -169,13 +176,15 @@ class Slice:
     time - beginning of recording period in UNIX seconds
     bytes - low bandwidth bytes transmitted at the beginning of the recording period
     i2bytes - high bandwidth bytes transmitted at the beginning of the recording period (for I2 -F)
-    ByteMax - total volume of data allowed
-    ByteThresh - After thresh, cap node to (maxbyte - bytes)/(time left in period)
-    ExemptByteMax - Same as above, but for i2.
-    ExemptByteThresh - i2 ByteThresh
-    maxrate - max_rate slice attribute. 
-    maxexemptrate - max_exempt_rate slice attribute.
-    self.emailed = did we email during this recording period
+    MaxKByte - total volume of data allowed
+    ThreshKbyte - After thresh, cap node to (maxkbyte - bytes)/(time left in period)
+    Maxi2KByte - same as MaxKByte, but for i2 
+    Threshi2Kbyte - same as Threshi2KByte, but for i2 
+    MaxRate - max_rate slice attribute. 
+    Maxi2Rate - max_exempt_rate slice attribute.
+    Share - Used by Sirius to loan min rates
+    Sharei2 - Used by Sirius to loan min rates for i2
+    self.emailed - did slice recv email during this recording period
 
     """
 
@@ -209,7 +218,10 @@ class Slice:
         return self.name
 
     def updateSliceAttributes(self, rspec):
-        # Get attributes
+        '''
+        Use respects from GetSlivers to PLC to populate slice object.  Also
+        do some sanity checking.
+        '''
 
         # Sanity check plus policy decision for MinRate:
         # Minrate cant be greater than 25% of MaxRate or NodeCap.
@@ -301,8 +313,9 @@ class Slice:
 
     def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec):
         """
-        Update byte counts and check if byte limits have been
-        exceeded. 
+        Update byte counts and check if byte thresholds have been
+        exceeded. If exceeded, cap to  remaining bytes in limit over remaining in period.  
+        Recalculate every time module runs.
         """
     
         # Query Node Manager for max rate overrides
@@ -386,7 +399,43 @@ class Slice:
                 self.emailed = True
                 slicemail(self.name, subject, message + (footer % params))
 
-def GetSlivers(db):
+def gethtbs(root_xid, default_xid):
+    """
+    Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
+    Turn off HTBs without names.
+    """
+    livehtbs = {}
+    for params in bwlimit.get():
+        (xid, share,
+         minrate, maxrate,
+         minexemptrate, maxexemptrate,
+         usedbytes, usedi2bytes) = params
+        
+        name = bwlimit.get_slice(xid)
+
+        if (name is None) \
+        and (xid != root_xid) \
+        and (xid != default_xid):
+            # Orphaned (not associated with a slice) class
+            name = "%d?" % xid
+            logger.log("bwmon:  Found orphaned HTB %s. Removing." %name)
+            bwlimit.off(xid)
+
+        livehtbs[xid] = {'share': share,
+            'minrate': minrate,
+            'maxrate': maxrate,
+            'maxexemptrate': maxexemptrate,
+            'minexemptrate': minexemptrate,
+            'usedbytes': usedbytes,
+            'name': name, 
+            'usedi2bytes': usedi2bytes}
+
+    return livehtbs
+
+def sync(nmdbcopy):
+    """
+    Syncs tc, db, and bwmon.dat.  Then, starts new slices, kills old ones, and updates byte accounts for each running slice.  Sends emails and caps those that went over their limit.
+    """
     # Defaults
     global datafile, \
         period, \
@@ -402,7 +451,6 @@ def GetSlivers(db):
 
     # All slices
     names = []
-
     # Incase the limits have changed. 
     default_MaxRate = int(bwlimit.get_bwcap() / 1000)
     default_Maxi2Rate = int(bwlimit.bwmax / 1000)
@@ -414,15 +462,16 @@ def GetSlivers(db):
     try:
         f = open(datafile, "r+")
         logger.log("bwmon:  Loading %s" % datafile)
-        (version, slices) = pickle.load(f)
+        (version, slices, deaddb) = pickle.load(f)
         f.close()
         # Check version of data file
-        if version != "$Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $":
+        if version != "$Id$":
             logger.log("bwmon:  Not using old version '%s' data file %s" % (version, datafile))
             raise Exception
     except Exception:
-        version = "$Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $"
+        version = "$Id$"
         slices = {}
+        deaddb = {}
 
     # Get/set special slice IDs
     root_xid = bwlimit.get_xid("root")
@@ -441,107 +490,149 @@ def GetSlivers(db):
 
     live = {}
     # Get running slivers that should be on this node (from plc). {xid: name}
-    for sliver in db.keys():
-        live[bwlimit.get_xid(sliver)] = sliver
-
-    # Setup new slices.
-    # live.xids - runing(slices).xids = new.xids
-    newslicesxids = []
-    for plcxid in live.keys():
-        if plcxid not in slices.keys():
-            newslicesxids.append(plcxid)
-
-    #newslicesxids = Set(live.keys()) - Set(slices.keys())
-    for newslicexid in newslicesxids:
+    # db keys on name, bwmon keys on xid.  db doesnt have xid either.
+    for plcSliver in nmdbcopy.keys():
+        live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
+
+    logger.log("bwmon:  Found %s instantiated slices" % live.keys().__len__())
+    logger.log("bwmon:  Found %s slices in dat file" % slices.values().__len__())
+
+    # Get actual running values from tc.
+    # Update slice totals and bandwidth. {xid: {values}}
+    kernelhtbs = gethtbs(root_xid, default_xid)
+    logger.log("bwmon:  Found %s running HTBs" % kernelhtbs.keys().__len__())
+
+    # The dat file has HTBs for slices, but the HTBs aren't running
+    nohtbslices =  Set(slices.keys()) - Set(kernelhtbs.keys())
+    logger.log( "bwmon:  Found %s slices in dat but not running." % nohtbslices.__len__() )
+    # Reset tc counts.
+    for nohtbslice in nohtbslices:
+        if live.has_key(nohtbslice): 
+            slices[nohtbslice].reset( 0, 0, 0, 0, live[nohtbslice]['_rspec'] )
+
+    # The dat file doesnt have HTB for the slice, but slice is running and
+    # HTB exists
+    slicesnodat = Set(kernelhtbs.keys()) - Set(slices.keys())
+    logger.log( "bwmon: Found %s slices with HTBs but not in dat" % slicesnodat.__len__() )
+    for slicenodat in slicesnodat:
+        slices[slicenodat] = Slice(slicenodat, 
+                                live[slicenodat]['name'], 
+                                live[slicenodat]['_rspec'])
+
+    # Get new slices.
+    # Slices in GetSlivers but not running HTBs
+    newslicesxids = Set(live.keys()) - Set(kernelhtbs.keys())
+    logger.log("bwmon:  Found %s new slices" % newslicesxids.__len__())
+       
+    # Setup new slices
+    for newslice in newslicesxids:
         # Delegated slices dont have xids (which are uids) since they haven't been
         # instantiated yet.
-        if newslicexid != None and db[live[newslicexid]].has_key('_rspec') == True:
-            logger.log("bwmon: New Slice %s" % live[newslicexid])
-            # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
-            # and made a dict of computed values.
-            rspec = db[live[newslicexid]]['_rspec']
-            slices[newslicexid] = Slice(newslicexid, live[newslicexid], rspec)
-            slices[newslicexid].reset(0, 0, 0, 0, rspec)
+        if newslice != None and live[newslice].has_key('_rspec') == True:
+            if live[newslice]['name'] not in deaddb.keys():
+                logger.log( "bwmon: New Slice %s" % live[newslice]['name'] )
+                # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
+                # and made a dict of computed values.
+                slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
+                slices[newslice].reset( 0, 0, 0, 0, live[newslice]['_rspec'] )
+            # Double check time for dead slice in deaddb is within 24hr recording period.
+            elif (time.time() <= (deaddb[live[newslice]['name']]['slice'].time + period)):
+                deadslice = deaddb[live[newslice]['name']]
+                logger.log("bwmon: Reinstantiating deleted slice %s" % live[newslice]['name'])
+                slices[newslice] = deadslice['slice']
+                slices[newslice].xid = newslice
+                # Start the HTB
+                slices[newslice].reset(deadslice['slice'].MaxRate,
+                                    deadslice['slice'].Maxi2Rate,
+                                    deadslice['htb']['usedbytes'],
+                                    deadslice['htb']['usedi2bytes'],
+                                    live[newslice]['_rspec'])
+                # Bring up to date
+                slices[newslice].update(deadslice['slice'].MaxRate, 
+                                    deadslice['slice'].Maxi2Rate, 
+                                    deadslice['htb']['usedbytes'], 
+                                    deadslice['htb']['usedi2bytes'], 
+                                    live[newslice]['_rspec'])
+                # Since the slice has been reinitialed, remove from dead database.
+                del deaddb[deadslice]
         else:
-            logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated.  Skipping." % live[newslicexid])
+            logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated.  Skipping." % live[newslice]['name'])
 
-    # ...mlhuang's abortion....
-    # Get actual running values from tc.
-    # Update slice totals and bandwidth.
-    for params in bwlimit.get():
-        (xid, share,
-         minrate, maxrate,
-         minexemptrate, maxexemptrate,
-         usedbytes, usedi2bytes) = params
-        
-        # Ignore root and default buckets
+    # Move dead slices that exist in the pickle file, but
+    # aren't instantiated by PLC into the dead dict until
+    # recording period is over.  This is to avoid the case where a slice is dynamically created
+    # and destroyed then recreated to get around byte limits.
+    dead = Set(slices.keys()) - Set(live.keys())
+    logger.log("bwmon:  Found %s dead slices" % (dead.__len__() - 2))
+    for xid in dead:
         if xid == root_xid or xid == default_xid:
             continue
-
-        name = bwlimit.get_slice(xid)
-        if name is None:
-            # Orphaned (not associated with a slice) class
-            name = "%d?" % xid
-            bwlimit.off(xid)
-
+        logger.log("bwmon:  removing dead slice  %s " % xid)
+        if slices.has_key(xid):
+            # add slice (by name) to deaddb
+            deaddb[slices[xid].name] = {'slice': slices[xid], 'htb': kernelhtbs[xid]}
+            del slices[xid]
+        if kernelhtbs.has_key(xid): bwlimit.off(xid)
+       
+       # Clean up deaddb
+       for (deadslicexid, deadslice) in deaddb.iteritems():
+               if (time.time() >= (deadslice.time() + period)):
+                       logger.log("bwmon: Removing dead slice %s from dat." % deadslice.name)
+                       del deaddb[deadslicexid]
+
+    # Get actual running values from tc since we've added and removed buckets.
+    # Update slice totals and bandwidth. {xid: {values}}
+    kernelhtbs = gethtbs(root_xid, default_xid)
+    logger.log("bwmon:  now %s running HTBs" % kernelhtbs.keys().__len__())
+
+    for (xid, slice) in slices.iteritems():
         # Monitor only the specified slices
+        if xid == root_xid or xid == default_xid: continue
         if names and name not in names:
             continue
-        #slices is populated from the pickle file
-        #xid is populated from bwlimit (read from /etc/passwd) 
-        if slices.has_key(xid):
-            slice = slices[xid]
-            # Old slices werent being instanciated correctly because
-            # the HTBs were still pleasent, but the slice in bwmon would
-            # have the byte counts set to 0.  The next time update was run
-            # the real byte count would be sent to update, causing the bw cap.
  
-            if time.time() >= (slice.time + period) or \
-               usedbytes < slice.bytes or \
-               usedi2bytes < slice.i2bytes or \
-               xid in newslicesxids:
-                # Reset to defaults every 24 hours or if it appears
-                # that the byte counters have overflowed (or, more
-                # likely, the node was restarted or the HTB buckets
-                # were re-initialized).
-                slice.reset(maxrate, \
-                    maxexemptrate, \
-                    usedbytes, \
-                    usedi2bytes, \
-                    db[slice.name]['_rspec'])
-            else:
-                # Update byte counts
-                slice.update(maxrate, \
-                    maxexemptrate, \
-                    usedbytes, \
-                    usedi2bytes, \
-                    db[slice.name]['_rspec'])
+        if (time.time() >= (slice.time + period)) or \
+            (kernelhtbs[xid]['usedbytes'] < slice.bytes) or \
+            (kernelhtbs[xid]['usedi2bytes'] < slice.i2bytes):
+            # Reset to defaults every 24 hours or if it appears
+            # that the byte counters have overflowed (or, more
+            # likely, the node was restarted or the HTB buckets
+            # were re-initialized).
+            slice.reset(kernelhtbs[xid]['maxrate'], \
+                kernelhtbs[xid]['maxexemptrate'], \
+                kernelhtbs[xid]['usedbytes'], \
+                kernelhtbs[xid]['usedi2bytes'], \
+                live[xid]['_rspec'])
         else:
-            # Just in case.  Probably (hopefully) this will never happen.
-            # New slice, initialize state
-            logger.log("bwmon: Deleting orphaned slice xid %s" % xid)
-            bwlimit.off(xid)
-
-    # Delete dead slices
-    dead = Set(slices.keys()) - Set(live.keys())
-    for xid in dead:
-        if xid == root_xid or xid == default_xid:
-            continue
-        del slices[xid]
-        bwlimit.off(xid)
-
-    logger.log("bwmon:  Saving %s" % datafile)
+            if debug:  logger.log("bwmon: Updating slice %s" % slice.name)
+            # Update byte counts
+            slice.update(kernelhtbs[xid]['maxrate'], \
+                kernelhtbs[xid]['maxexemptrate'], \
+                kernelhtbs[xid]['usedbytes'], \
+                kernelhtbs[xid]['usedi2bytes'], \
+                live[xid]['_rspec'])
+
+    logger.log("bwmon:  Saving %s slices in %s" % (slices.keys().__len__(),datafile))
     f = open(datafile, "w")
-    pickle.dump((version, slices), f)
+    pickle.dump((version, slices, deaddb), f)
     f.close()
 
-
-#def GetSlivers(data):
-#   for sliver in data['slivers']:
-#       if sliver.has_key('attributes'):
-#          print sliver
-#           for attribute in sliver['attributes']:
-#               if attribute['name'] == "KByteThresh": print attribute['value']
-
-#def start(options, config):
-#    pass
+lock = threading.Event()
+def run():
+    """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
+    if debug:  logger.log("bwmon:  Thread started")
+    while True:
+        lock.wait()
+        if debug: logger.log("bwmon:  Event received.  Running.")
+        database.db_lock.acquire()
+        nmdbcopy = copy.deepcopy(database.db)
+        database.db_lock.release()
+        try:  sync(nmdbcopy)
+        except: logger.log_exc()
+        lock.clear()
+
+def start(*args):
+    tools.as_daemon_thread(run)
+
+def GetSlivers(*args):
+    pass