Added ReCreate. Also added try catch to api eval of rpc method.
[nodemanager.git] / bwmon.py
index c2c9ffe..882b8f4 100644 (file)
--- a/bwmon.py
+++ b/bwmon.py
 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
 # Copyright (C) 2004-2006 The Trustees of Princeton University
 #
-# $Id: bwmon.py,v 1.1.2.8 2007/04/25 22:20:58 faiyaza Exp $
+# $Id: bwmon.py,v 1.1.2.11 2007/06/26 18:03:55 faiyaza Exp $
 #
 
 import os
 import sys
 import time
 import pickle
-
 import socket
-import bwlimit
 import logger
+import copy
+import threading
+import tools
+
+import bwlimit
+import database
 
 from sets import Set
 
@@ -386,7 +390,43 @@ class Slice:
                 self.emailed = True
                 slicemail(self.name, subject, message + (footer % params))
 
-def GetSlivers(db):
+def gethtbs(root_xid, default_xid):
+    """
+    Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
+    Turn off HTBs without names.
+    """
+    livehtbs = {}
+    for params in bwlimit.get():
+        (xid, share,
+         minrate, maxrate,
+         minexemptrate, maxexemptrate,
+         usedbytes, usedi2bytes) = params
+        
+        name = bwlimit.get_slice(xid)
+
+        if (name is None) \
+        and (xid != root_xid) \
+        and (xid != default_xid):
+            # Orphaned (not associated with a slice) class
+            name = "%d?" % xid
+            logger.log("bwmon:  Found orphaned HTB %s. Removing." %name)
+            bwlimit.off(xid)
+
+        livehtbs[xid] = {'share': share,
+            'minrate': minrate,
+            'maxrate': maxrate,
+            'maxexemptrate': maxexemptrate,
+            'minexemptrate': minexemptrate,
+            'usedbytes': usedbytes,
+            'name': name, 
+            'usedi2bytes': usedi2bytes}
+
+    return livehtbs
+
+def sync(nmdbcopy):
+    """
+    Syncs tc, db, and bwmon.dat.  Then, starts new slices, kills old ones, and updates byte accounts for each running slice.  Sends emails and caps those that went over their limit.
+    """
     # Defaults
     global datafile, \
         period, \
@@ -402,7 +442,6 @@ def GetSlivers(db):
 
     # All slices
     names = []
-
     # Incase the limits have changed. 
     default_MaxRate = int(bwlimit.get_bwcap() / 1000)
     default_Maxi2Rate = int(bwlimit.bwmax / 1000)
@@ -417,11 +456,11 @@ def GetSlivers(db):
         (version, slices) = pickle.load(f)
         f.close()
         # Check version of data file
-        if version != "$Id: bwmon.py,v 1.1.2.8 2007/04/25 22:20:58 faiyaza Exp $":
+        if version != "$Id: bwmon.py,v 1.1.2.11 2007/06/26 18:03:55 faiyaza Exp $":
             logger.log("bwmon:  Not using old version '%s' data file %s" % (version, datafile))
             raise Exception
     except Exception:
-        version = "$Id: bwmon.py,v 1.1.2.8 2007/04/25 22:20:58 faiyaza Exp $"
+        version = "$Id: bwmon.py,v 1.1.2.11 2007/06/26 18:03:55 faiyaza Exp $"
         slices = {}
 
     # Get/set special slice IDs
@@ -441,107 +480,114 @@ def GetSlivers(db):
 
     live = {}
     # Get running slivers that should be on this node (from plc). {xid: name}
-    for sliver in db.keys():
-        live[bwlimit.get_xid(sliver)] = sliver
+    # db keys on name, bwmon keys on xid.  db doesnt have xid either.
+    for plcSliver in nmdbcopy.keys():
+        live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
 
-    # Setup new slices.
-    # live.xids - runing(slices).xids = new.xids
-    newslicesxids = []
-    for plcxid in live.keys():
-        if plcxid not in slices.keys():
-            newslicesxids.append(plcxid)
+    logger.log("bwmon:  Found %s instantiated slices" % live.keys().__len__())
+    logger.log("bwmon:  Found %s slices in dat file" % slices.values().__len__())
+
+    # Get actual running values from tc.
+    # Update slice totals and bandwidth. {xid: {values}}
+    livehtbs = gethtbs(root_xid, default_xid)
+    logger.log("bwmon:  Found %s running HTBs" % livehtbs.keys().__len__())
 
-    #newslicesxids = Set(live.keys()) - Set(slices.keys())
-    for newslicexid in newslicesxids:
+    # Get new slices.
+    # live.xids - runing(slices).xids = new.xids
+    #newslicesxids = Set(live.keys()) - Set(slices.keys()) 
+    newslicesxids = Set(live.keys()) - Set(livehtbs.keys())
+    logger.log("bwmon:  Found %s new slices" % newslicesxids.__len__())
+
+    # Incase we rebooted and need to keep track of already running htbs 
+    norecxids =  Set(livehtbs.keys()) - Set(slices.keys())
+    logger.log("bwmon:  Found %s slices that have htbs but not in dat." % norecxids.__len__())
+       # Reset tc counts.
+    for norecxid in norecxids:
+        slices[norecxid] = Slice(norecxid, live[norecxid]['name'], live[norecxid]['_rspec'])
+        slices[norecxid].reset(livehtbs[norecxid]['maxrate'], 
+                            livehtbs[norecxid]['maxexemptrate'], 
+                            livehtbs[norecxid]['usedbytes'], 
+                            livehtbs[norecxid]['usedi2bytes'], 
+                            live[norecxid]['_rspec'])
+        
+    # Setup new slices
+    for newslice in newslicesxids:
         # Delegated slices dont have xids (which are uids) since they haven't been
         # instantiated yet.
-        if newslicexid != None and db[live[newslicexid]].has_key('_rspec') == True:
-            logger.log("bwmon: New Slice %s" % live[newslicexid])
+        if newslice != None and live[newslice].has_key('_rspec') == True:
+            logger.log("bwmon: New Slice %s" % live[newslice]['name'])
             # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
             # and made a dict of computed values.
-            rspec = db[live[newslicexid]]['_rspec']
-            slices[newslicexid] = Slice(newslicexid, live[newslicexid], rspec)
-            slices[newslicexid].reset(0, 0, 0, 0, rspec)
+            slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
+            slices[newslice].reset(0, 0, 0, 0, live[newslice]['_rspec'])
         else:
-            logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated.  Skipping." % live[newslicexid])
+            logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated.  Skipping." % live[newslice]['name'])
 
-    # ...mlhuang's abortion....
-    # Get actual running values from tc.
-    # Update slice totals and bandwidth.
-    for params in bwlimit.get():
-        (xid, share,
-         minrate, maxrate,
-         minexemptrate, maxexemptrate,
-         usedbytes, usedi2bytes) = params
-        
-        # Ignore root and default buckets
+    # Delete dead slices.
+    # First delete dead slices that exist in the pickle file, but
+    # aren't instantiated by PLC.
+    dead = Set(slices.keys()) - Set(live.keys())
+    logger.log("bwmon:  Found %s dead slices" % (dead.__len__() - 2))
+    for xid in dead:
         if xid == root_xid or xid == default_xid:
             continue
+        logger.log("bwmon:  removing dead slice  %s " % xid)
+        if slices.has_key(xid): del slices[xid]
+        if livehtbs.has_key(xid): bwlimit.off(xid)
 
-        name = bwlimit.get_slice(xid)
-        if name is None:
-            # Orphaned (not associated with a slice) class
-            name = "%d?" % xid
-            bwlimit.off(xid)
+    # Get actual running values from tc since we've added and removed buckets.
+    # Update slice totals and bandwidth. {xid: {values}}
+    livehtbs = gethtbs(root_xid, default_xid)
+    logger.log("bwmon:  now %s running HTBs" % livehtbs.keys().__len__())
 
+    for (xid, slice) in slices.iteritems():
         # Monitor only the specified slices
+        if xid == root_xid or xid == default_xid: continue
         if names and name not in names:
             continue
-        #slices is populated from the pickle file
-        #xid is populated from bwlimit (read from /etc/passwd) 
-        if slices.has_key(xid):
-            slice = slices[xid]
-            # Old slices werent being instanciated correctly because
-            # the HTBs were still pleasent, but the slice in bwmon would
-            # have the byte counts set to 0.  The next time update was run
-            # the real byte count would be sent to update, causing the bw cap.
  
-            if time.time() >= (slice.time + period) or \
-               usedbytes < slice.bytes or \
-               usedi2bytes < slice.i2bytes or \
-               xid in newslicesxids:
-                # Reset to defaults every 24 hours or if it appears
-                # that the byte counters have overflowed (or, more
-                # likely, the node was restarted or the HTB buckets
-                # were re-initialized).
-                slice.reset(maxrate, \
-                    maxexemptrate, \
-                    usedbytes, \
-                    usedi2bytes, \
-                    db[slice.name]['_rspec'])
-            else:
-                # Update byte counts
-                slice.update(maxrate, \
-                    maxexemptrate, \
-                    usedbytes, \
-                    usedi2bytes, \
-                    db[slice.name]['_rspec'])
+        if (time.time() >= (slice.time + period)) or \
+        (livehtbs[xid]['usedbytes'] < slice.bytes) or \
+        (livehtbs[xid]['usedi2bytes'] < slice.i2bytes):
+            # Reset to defaults every 24 hours or if it appears
+            # that the byte counters have overflowed (or, more
+            # likely, the node was restarted or the HTB buckets
+            # were re-initialized).
+            slice.reset(livehtbs[xid]['maxrate'], \
+                livehtbs[xid]['maxexemptrate'], \
+                livehtbs[xid]['usedbytes'], \
+                livehtbs[xid]['usedi2bytes'], \
+                live[xid]['_rspec'])
         else:
-            # Just in case.  Probably (hopefully) this will never happen.
-            # New slice, initialize state
-            logger.log("bwmon: Deleting orphaned slice xid %s" % xid)
-            bwlimit.off(xid)
-
-    # Delete dead slices
-    dead = Set(slices.keys()) - Set(live.keys())
-    for xid in dead:
-        if xid == root_xid or xid == default_xid:
-            continue
-        del slices[xid]
-        bwlimit.off(xid)
-
-    logger.log("bwmon:  Saving %s" % datafile)
+            if debug:  logger.log("bwmon: Updating slice %s" % slice.name)
+            # Update byte counts
+            slice.update(livehtbs[xid]['maxrate'], \
+                livehtbs[xid]['maxexemptrate'], \
+                livehtbs[xid]['usedbytes'], \
+                livehtbs[xid]['usedi2bytes'], \
+                live[xid]['_rspec'])
+    
+    logger.log("bwmon:  Saving %s slices in %s" % (slices.keys().__len__(),datafile))
     f = open(datafile, "w")
     pickle.dump((version, slices), f)
     f.close()
 
-
-#def GetSlivers(data):
-#   for sliver in data['slivers']:
-#       if sliver.has_key('attributes'):
-#          print sliver
-#           for attribute in sliver['attributes']:
-#               if attribute['name'] == "KByteThresh": print attribute['value']
-
-#def start(options, config):
-#    pass
+lock = threading.Event()
+def run():
+    """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
+    if debug:  logger.log("bwmon:  Thread started")
+    while True:
+        lock.wait()
+        if debug: logger.log("bwmon:  Event received.  Running.")
+        database.db_lock.acquire()
+        nmdbcopy = copy.deepcopy(database.db)
+        database.db_lock.release()
+        try:  sync(nmdbcopy)
+        except: logger.log_exc()
+        lock.clear()
+
+def start(*args):
+    tools.as_daemon_thread(run)
+
+def GetSlivers(*args):
+    pass