* BWmon is now event driven and handles reboots. Also got rid of ALL legacy code.
authorFaiyaz Ahmed <faiyaza@cs.princeton.edu>
Fri, 15 Jun 2007 20:29:26 +0000 (20:29 +0000)
committerFaiyaz Ahmed <faiyaza@cs.princeton.edu>
Fri, 15 Jun 2007 20:29:26 +0000 (20:29 +0000)
* everything else is to support delegation.

api.py
bwmon.py
database.py
nm.py
sm.py

diff --git a/api.py b/api.py
index 937fb78..96fafe7 100644 (file)
--- a/api.py
+++ b/api.py
@@ -54,6 +54,7 @@ def Ticket(tkt):
         data = ticket.verify(tkt)
         if data != None:
             deliver_ticket(data)
+        logger.log('Got ticket')
     except Exception, err:
         raise xmlrpclib.Fault(102, 'Ticket error: ' + str(err))
 
@@ -81,14 +82,6 @@ def Destroy(rec):
     """Destroy(sliver_name): destroy a non-PLC-instantiated sliver"""
     if rec['instantiation'] == 'delegated': accounts.get(rec['name']).ensure_destroyed()
 
-@export_to_api(1)
-def ReCreate(rec):
-    """ReCreate(sliver_name): destroy then recreate 
-    and start sliver regardless of instantiation."""
-    accounts.get(rec['name']).ensure_destroyed()
-    accounts.get(rec['name']).ensure_created(rec)
-    accounts.get(rec['name']).start()
-
 @export_to_api(1)
 def Start(rec):
     """Start(sliver_name): run start scripts belonging to the specified sliver"""
@@ -142,7 +135,9 @@ class APIRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
             api_method_list.sort()
             raise xmlrpclib.Fault(100, 'Invalid API method %s.  Valid choices are %s' % (method_name, ', '.join(api_method_list)))
         expected_nargs = nargs_dict[method_name]
-        if len(args) != expected_nargs: raise xmlrpclib.Fault(101, 'Invalid argument count: got %d, expecting %d.' % (len(args), expected_nargs))
+        if len(args) != expected_nargs: 
+            raise xmlrpclib.Fault(101, 'Invalid argument count: got %d, expecting %d.' % (len(args), 
+            expected_nargs))
         else:
             # Figure out who's calling.
             # XXX - these ought to be imported directly from some .h file
@@ -151,11 +146,15 @@ class APIRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
             ucred = self.request.getsockopt(socket.SOL_SOCKET, SO_PEERCRED, sizeof_struct_ucred)
             xid = struct.unpack('3i', ucred)[2]
             caller_name = pwd.getpwuid(xid)[0]
-            if method_name not in ('Help', 'Ticket', 'GetXIDs', 'GetSSHKeys'):
+            if method_name not in ('ReCreate', 'Help', 'Ticket', 'GetXIDs', 'GetSSHKeys'):
                 target_name = args[0]
                 target_rec = database.db.get(target_name)
-                if not (target_rec and target_rec['type'].startswith('sliver.')): raise xmlrpclib.Fault(102, 'Invalid argument: the first argument must be a sliver name.')
-                if not (caller_name in (args[0], 'root') or (caller_name, method_name) in target_rec['delegations'] or (caller_name == 'utah_elab_delegate' and target_name.startswith('utah_'))): raise xmlrpclib.Fault(108, 'Permission denied.')
+                if not (target_rec and target_rec['type'].startswith('sliver.')): 
+                    raise xmlrpclib.Fault(102, 'Invalid argument: the first argument must be a sliver name.')
+                if not (caller_name, method_name) in target_rec['delegations']:
+               # or (caller_name == 'utah_elab_delegate' and target_name.startswith('utah_'))): 
+                    raise xmlrpclib.Fault(108, 'Permission denied.')
+
                 result = method(target_rec, *args[1:])
             else: result = method(*args)
             if result == None: result = 1
index b8ca5e2..af70370 100644 (file)
--- a/bwmon.py
+++ b/bwmon.py
 # Faiyaz Ahmed <faiyaza@cs.princeton.edu>
 # Copyright (C) 2004-2006 The Trustees of Princeton University
 #
-# $Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $
+# $Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $
 #
 
 import os
 import sys
 import time
 import pickle
-
 import socket
-import bwlimit
 import logger
+import copy
+import threading
+import tools
+
+import bwlimit
+import database
 
 from sets import Set
 
@@ -386,7 +390,45 @@ class Slice:
                 self.emailed = True
                 slicemail(self.name, subject, message + (footer % params))
 
-def GetSlivers(db):
+def gethtbs(root_xid, default_xid):
+    """
+    Return dict {xid: {*rates}} of running htbs as reported by tc that have names.
+       Turn off HTBs without names.
+    """
+    livehtbs = {}
+    for params in bwlimit.get():
+        (xid, share,
+         minrate, maxrate,
+         minexemptrate, maxexemptrate,
+         usedbytes, usedi2bytes) = params
+        
+        name = bwlimit.get_slice(xid)
+
+        
+        
+        if (name is None) \
+               and (xid != root_xid) \
+               and (xid != default_xid):
+            # Orphaned (not associated with a slice) class
+            name = "%d?" % xid
+            logger.log("bwmon:  Found orphaned HTB %s. Removing." %name)
+            bwlimit.off(xid)
+
+        livehtbs[xid] = {'share': share,
+            'minrate': minrate,
+            'maxrate': maxrate,
+            'maxexemptrate': maxexemptrate,
+            'minexemptrate': minexemptrate,
+            'usedbytes': usedbytes,
+            'name': name, 
+            'usedi2bytes': usedi2bytes}
+
+    return livehtbs
+
+def sync(nmdbcopy):
+    """
+    Syncs tc, db, and bwmon.dat.  Then, starts new slices, kills old ones, and updates byte accounts for each running slice.  Sends emails and caps those that went over their limit.
+    """
     # Defaults
     global datafile, \
         period, \
@@ -402,7 +444,6 @@ def GetSlivers(db):
 
     # All slices
     names = []
-
     # Incase the limits have changed. 
     default_MaxRate = int(bwlimit.get_bwcap() / 1000)
     default_Maxi2Rate = int(bwlimit.bwmax / 1000)
@@ -417,11 +458,11 @@ def GetSlivers(db):
         (version, slices) = pickle.load(f)
         f.close()
         # Check version of data file
-        if version != "$Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $":
+        if version != "$Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $":
             logger.log("bwmon:  Not using old version '%s' data file %s" % (version, datafile))
             raise Exception
     except Exception:
-        version = "$Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $"
+        version = "$Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $"
         slices = {}
 
     # Get/set special slice IDs
@@ -441,107 +482,108 @@ def GetSlivers(db):
 
     live = {}
     # Get running slivers that should be on this node (from plc). {xid: name}
-    for sliver in db.keys():
-        live[bwlimit.get_xid(sliver)] = sliver
+    # db keys on name, bwmon keys on xid.  db doesnt have xid either.
+    for plcSliver in nmdbcopy.keys():
+        live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver]
 
-    # Setup new slices.
-    # live.xids - runing(slices).xids = new.xids
-    newslicesxids = []
-    for plcxid in live.keys():
-        if plcxid not in slices.keys():
-            newslicesxids.append(plcxid)
+    logger.log("bwmon:  Found %s instantiated slices" % live.keys().__len__())
+    logger.log("bwmon:  Found %s slices in dat file" % slices.values().__len__())
+
+    # Get actual running values from tc.
+    # Update slice totals and bandwidth. {xid: {values}}
+    livehtbs = gethtbs(root_xid, default_xid)
+    logger.log("bwmon:  Found %s running HTBs" % livehtbs.keys().__len__())
 
-    #newslicesxids = Set(live.keys()) - Set(slices.keys())
-    for newslicexid in newslicesxids:
+    # Get new slices.
+    # live.xids - runing(slices).xids = new.xids
+    #newslicesxids = Set(live.keys()) - Set(slices.keys()) 
+    newslicesxids = Set(live.keys()) - Set(livehtbs.keys())
+    logger.log("bwmon:  Found %s new slices" % newslicesxids.__len__())
+
+    # Incase we rebooted and need to bring up the htbs that are in the db but 
+    # not known to tc.
+    #nohtbxids = Set(slices.keys()) - Set(livehtbs.keys())
+    #logger.log("bwmon:  Found %s slices that should have htbs but dont." % nohtbxids.__len__())
+    #newslicesxids.update(nohtbxids)
+        
+    # Setup new slices
+    for newslice in newslicesxids:
         # Delegated slices dont have xids (which are uids) since they haven't been
         # instantiated yet.
-        if newslicexid != None and db[live[newslicexid]].has_key('_rspec') == True:
-            logger.log("bwmon: New Slice %s" % live[newslicexid])
+        if newslice != None and live[newslice].has_key('_rspec') == True:
+            logger.log("bwmon: New Slice %s" % live[newslice]['name'])
             # _rspec is the computed rspec:  NM retrieved data from PLC, computed loans
             # and made a dict of computed values.
-            rspec = db[live[newslicexid]]['_rspec']
-            slices[newslicexid] = Slice(newslicexid, live[newslicexid], rspec)
-            slices[newslicexid].reset(0, 0, 0, 0, rspec)
+            slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec'])
+            slices[newslice].reset(0, 0, 0, 0, live[newslice]['_rspec'])
         else:
-            logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated.  Skipping." % live[newslicexid])
+            logger.log("bwmon  Slice %s doesn't have xid.  Must be delegated.  Skipping." % live[newslice]['name'])
 
-    # ...mlhuang's abortion....
-    # Get actual running values from tc.
-    # Update slice totals and bandwidth.
-    for params in bwlimit.get():
-        (xid, share,
-         minrate, maxrate,
-         minexemptrate, maxexemptrate,
-         usedbytes, usedi2bytes) = params
-        
-        # Ignore root and default buckets
+    # Delete dead slices.
+    # First delete dead slices that exist in the pickle file, but
+    # aren't instantiated by PLC.
+    dead = Set(slices.keys()) - Set(live.keys())
+    logger.log("bwmon:  Found %s dead slices" % (dead.__len__() - 2))
+    for xid in dead:
         if xid == root_xid or xid == default_xid:
             continue
+        logger.log("bwmon:  removing dead slice  %s " % xid)
+        if slices.has_key(xid): del slices[xid]
+        if livehtbs.has_key(xid): bwlimit.off(xid)
 
-        name = bwlimit.get_slice(xid)
-        if name is None:
-            # Orphaned (not associated with a slice) class
-            name = "%d?" % xid
-            bwlimit.off(xid)
+    # Get actual running values from tc since we've added and removed buckets.
+    # Update slice totals and bandwidth. {xid: {values}}
+    livehtbs = gethtbs(root_xid, default_xid)
+    logger.log("bwmon:  now %s running HTBs" % livehtbs.keys().__len__())
 
+    for (xid, slice) in slices.iteritems():
         # Monitor only the specified slices
+        if xid == root_xid or xid == default_xid: continue
         if names and name not in names:
             continue
-        #slices is populated from the pickle file
-        #xid is populated from bwlimit (read from /etc/passwd) 
-        if slices.has_key(xid):
-            slice = slices[xid]
-            # Old slices werent being instanciated correctly because
-            # the HTBs were still pleasent, but the slice in bwmon would
-            # have the byte counts set to 0.  The next time update was run
-            # the real byte count would be sent to update, causing the bw cap.
  
-            if time.time() >= (slice.time + period) or \
-               usedbytes < slice.bytes or \
-               usedi2bytes < slice.i2bytes or \
-               xid in newslicesxids:
-                # Reset to defaults every 24 hours or if it appears
-                # that the byte counters have overflowed (or, more
-                # likely, the node was restarted or the HTB buckets
-                # were re-initialized).
-                slice.reset(maxrate, \
-                    maxexemptrate, \
-                    usedbytes, \
-                    usedi2bytes, \
-                    db[slice.name]['_rspec'])
-            else:
-                # Update byte counts
-                slice.update(maxrate, \
-                    maxexemptrate, \
-                    usedbytes, \
-                    usedi2bytes, \
-                    db[slice.name]['_rspec'])
+        if (time.time() >= (slice.time + period)) or \
+        (livehtbs[xid]['usedbytes'] < slice.bytes) or \
+        (livehtbs[xid]['usedi2bytes'] < slice.i2bytes):
+            # Reset to defaults every 24 hours or if it appears
+            # that the byte counters have overflowed (or, more
+            # likely, the node was restarted or the HTB buckets
+            # were re-initialized).
+            slice.reset(livehtbs[xid]['maxrate'], \
+                livehtbs[xid]['maxexemptrate'], \
+                livehtbs[xid]['usedbytes'], \
+                livehtbs[xid]['usedi2bytes'], \
+                live[xid]['_rspec'])
         else:
-            # Just in case.  Probably (hopefully) this will never happen.
-            # New slice, initialize state
-            logger.log("bwmon: Deleting orphaned slice xid %s" % xid)
-            bwlimit.off(xid)
-
-    # Delete dead slices
-    dead = Set(slices.keys()) - Set(live.keys())
-    for xid in dead:
-        if xid == root_xid or xid == default_xid:
-            continue
-        del slices[xid]
-        bwlimit.off(xid)
-
-    logger.log("bwmon:  Saving %s" % datafile)
+            if debug:  logger.log("bwmon: Updating slice %s" % slice.name)
+            # Update byte counts
+            slice.update(livehtbs[xid]['maxrate'], \
+                livehtbs[xid]['maxexemptrate'], \
+                livehtbs[xid]['usedbytes'], \
+                livehtbs[xid]['usedi2bytes'], \
+                live[xid]['_rspec'])
+    
+    logger.log("bwmon:  Saving %s slices in %s" % (slices.keys().__len__(),datafile))
     f = open(datafile, "w")
     pickle.dump((version, slices), f)
     f.close()
 
-
-#def GetSlivers(data):
-#   for sliver in data['slivers']:
-#       if sliver.has_key('attributes'):
-#          print sliver
-#           for attribute in sliver['attributes']:
-#               if attribute['name'] == "KByteThresh": print attribute['value']
-
-#def start(options, config):
-#    pass
+lock = threading.Event()
+def run():
+    """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting."""
+    if debug:  logger.log("bwmon:  Thread started")
+    while True:
+        lock.wait()
+        if debug: logger.log("bwmon:  Event received.  Running.")
+        database.db_lock.acquire()
+        nmdbcopy = copy.deepcopy(database.db)
+        database.db_lock.release()
+        try:  sync(nmdbcopy)
+        except: logger.log_exc()
+        lock.clear()
+
+def start(*args):
+    tools.as_daemon_thread(run)
+
+def GetSlivers(*args):
+    pass
index 7b95ed2..be304d3 100644 (file)
@@ -102,10 +102,11 @@ class Database(dict):
             if name not in self: accounts.get(name).ensure_destroyed()
         for name, rec in self.iteritems():
             if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec)
+            if rec['instantiation'] == 'nm-controller': accounts.get(name).ensure_created(rec)
 
-        try: bwmon.GetSlivers(self)
-        except: logger.log_exc()
-
+        #try: bwmon.GetSlivers(self)
+        #except: logger.log_exc()
+        bwmon.lock.set()
         # request a database dump
         global dump_requested
         dump_requested = True
diff --git a/nm.py b/nm.py
index 46f23f5..243a782 100644 (file)
--- a/nm.py
+++ b/nm.py
@@ -55,7 +55,7 @@ def run():
             print "Warning while writing PID file:", err
 
         # Load and start modules
-        for module in ['net', 'proper', 'conf_files', 'sm']:
+        for module in ['net', 'proper', 'conf_files', 'sm', 'bwmon']:
             try:
                 m = __import__(module)
                 m.start(options, config)
diff --git a/sm.py b/sm.py
index 87b2307..7084ef0 100644 (file)
--- a/sm.py
+++ b/sm.py
@@ -112,15 +112,15 @@ def GetSlivers(data, fullupdate=True):
                 DEFAULT_ALLOCATION['net_max_rate'] = network['bwlimit'] / 1000
 
 ### Emulab-specific hack begins here
-    emulabdelegate = {
-        'instantiation': 'plc-instantiated',
-        'keys': '''ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA5Rimz6osRvlAUcaxe0YNfGsLL4XYBN6H30V3l/0alZOSXbGOgWNdEEdohwbh9E8oYgnpdEs41215UFHpj7EiRudu8Nm9mBI51ARHA6qF6RN+hQxMCB/Pxy08jDDBOGPefINq3VI2DRzxL1QyiTX0jESovrJzHGLxFTB3Zs+Y6CgmXcnI9i9t/zVq6XUAeUWeeXA9ADrKJdav0SxcWSg+B6F1uUcfUd5AHg7RoaccTldy146iF8xvnZw0CfGRCq2+95AU9rbMYS6Vid8Sm+NS+VLaAyJaslzfW+CAVBcywCOlQNbLuvNmL82exzgtl6fVzutRFYLlFDwEM2D2yvg4BQ== root@boss.emulab.net''',
-        'name': 'utah_elab_delegate',
-        'timestamp': data['timestamp'],
-        'type': 'delegate',
-        'vref': None
-        }
-    database.db.deliver_record(emulabdelegate)
+#    emulabdelegate = {
+#        'instantiation': 'plc-instantiated',
+#        'keys': '''ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA5Rimz6osRvlAUcaxe0YNfGsLL4XYBN6H30V3l/0alZOSXbGOgWNdEEdohwbh9E8oYgnpdEs41215UFHpj7EiRudu8Nm9mBI51ARHA6qF6RN+hQxMCB/Pxy08jDDBOGPefINq3VI2DRzxL1QyiTX0jESovrJzHGLxFTB3Zs+Y6CgmXcnI9i9t/zVq6XUAeUWeeXA9ADrKJdav0SxcWSg+B6F1uUcfUd5AHg7RoaccTldy146iF8xvnZw0CfGRCq2+95AU9rbMYS6Vid8Sm+NS+VLaAyJaslzfW+CAVBcywCOlQNbLuvNmL82exzgtl6fVzutRFYLlFDwEM2D2yvg4BQ== root@boss.emulab.net''',
#       'name': 'utah_elab_delegate',
#       'timestamp': data['timestamp'],
#       'type': 'delegate',
#       'vref': None
#       }
#   database.db.deliver_record(emulabdelegate)
 ### Emulab-specific hack ends here
 
 
@@ -143,14 +143,20 @@ def GetSlivers(data, fullupdate=True):
         keys = rec.pop('keys')
         rec.setdefault('keys', '\n'.join([key_struct['key'] for key_struct in keys]))
 
-        rec.setdefault('type', attr_dict.get('type', 'sliver.VServer'))
+               # Handle nm controller here
+               rec.setdefault('type', attr_dict.get('type', 'sliver.VServer'))
+               if rec['instantiation'] == 'nm-controller':
+               # type isn't returned by GetSlivers() for whatever reason.  We're overloading
+               # instantiation here, but i suppose its the ssame thing when you think about it. -FA
+                       rec['type'] = 'delegate'
+
         rec.setdefault('vref', attr_dict.get('vref', 'default'))
         is_id = attr_dict.get('plc_initscript_id')
         if is_id is not None and is_id in initscripts_by_id:
             rec['initscript'] = initscripts_by_id[is_id]
         else:
             rec['initscript'] = ''
-        rec.setdefault('delegations', [])  # XXX - delegation not yet supported
+        rec.setdefault('delegations', [])
 
         # extract the implied rspec
         rspec = {}