From 73ac7b040f2f78973727c592dd3facfccb618a6c Mon Sep 17 00:00:00 2001 From: Faiyaz Ahmed Date: Fri, 15 Jun 2007 20:29:26 +0000 Subject: [PATCH] * BWmon is now event driven and handles reboots. Also got rid of ALL legacy code. * everything else is to support delegation. --- api.py | 23 +++--- bwmon.py | 222 +++++++++++++++++++++++++++++++--------------------- database.py | 7 +- nm.py | 2 +- sm.py | 28 ++++--- 5 files changed, 165 insertions(+), 117 deletions(-) diff --git a/api.py b/api.py index 937fb78..96fafe7 100644 --- a/api.py +++ b/api.py @@ -54,6 +54,7 @@ def Ticket(tkt): data = ticket.verify(tkt) if data != None: deliver_ticket(data) + logger.log('Got ticket') except Exception, err: raise xmlrpclib.Fault(102, 'Ticket error: ' + str(err)) @@ -81,14 +82,6 @@ def Destroy(rec): """Destroy(sliver_name): destroy a non-PLC-instantiated sliver""" if rec['instantiation'] == 'delegated': accounts.get(rec['name']).ensure_destroyed() -@export_to_api(1) -def ReCreate(rec): - """ReCreate(sliver_name): destroy then recreate - and start sliver regardless of instantiation.""" - accounts.get(rec['name']).ensure_destroyed() - accounts.get(rec['name']).ensure_created(rec) - accounts.get(rec['name']).start() - @export_to_api(1) def Start(rec): """Start(sliver_name): run start scripts belonging to the specified sliver""" @@ -142,7 +135,9 @@ class APIRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): api_method_list.sort() raise xmlrpclib.Fault(100, 'Invalid API method %s. Valid choices are %s' % (method_name, ', '.join(api_method_list))) expected_nargs = nargs_dict[method_name] - if len(args) != expected_nargs: raise xmlrpclib.Fault(101, 'Invalid argument count: got %d, expecting %d.' % (len(args), expected_nargs)) + if len(args) != expected_nargs: + raise xmlrpclib.Fault(101, 'Invalid argument count: got %d, expecting %d.' % (len(args), + expected_nargs)) else: # Figure out who's calling. # XXX - these ought to be imported directly from some .h file @@ -151,11 +146,15 @@ class APIRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): ucred = self.request.getsockopt(socket.SOL_SOCKET, SO_PEERCRED, sizeof_struct_ucred) xid = struct.unpack('3i', ucred)[2] caller_name = pwd.getpwuid(xid)[0] - if method_name not in ('Help', 'Ticket', 'GetXIDs', 'GetSSHKeys'): + if method_name not in ('ReCreate', 'Help', 'Ticket', 'GetXIDs', 'GetSSHKeys'): target_name = args[0] target_rec = database.db.get(target_name) - if not (target_rec and target_rec['type'].startswith('sliver.')): raise xmlrpclib.Fault(102, 'Invalid argument: the first argument must be a sliver name.') - if not (caller_name in (args[0], 'root') or (caller_name, method_name) in target_rec['delegations'] or (caller_name == 'utah_elab_delegate' and target_name.startswith('utah_'))): raise xmlrpclib.Fault(108, 'Permission denied.') + if not (target_rec and target_rec['type'].startswith('sliver.')): + raise xmlrpclib.Fault(102, 'Invalid argument: the first argument must be a sliver name.') + if not (caller_name, method_name) in target_rec['delegations']: + # or (caller_name == 'utah_elab_delegate' and target_name.startswith('utah_'))): + raise xmlrpclib.Fault(108, 'Permission denied.') + result = method(target_rec, *args[1:]) else: result = method(*args) if result == None: result = 1 diff --git a/bwmon.py b/bwmon.py index b8ca5e2..af70370 100644 --- a/bwmon.py +++ b/bwmon.py @@ -15,17 +15,21 @@ # Faiyaz Ahmed # Copyright (C) 2004-2006 The Trustees of Princeton University # -# $Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $ +# $Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $ # import os import sys import time import pickle - import socket -import bwlimit import logger +import copy +import threading +import tools + +import bwlimit +import database from sets import Set @@ -386,7 +390,45 @@ class Slice: self.emailed = True slicemail(self.name, subject, message + (footer % params)) -def GetSlivers(db): +def gethtbs(root_xid, default_xid): + """ + Return dict {xid: {*rates}} of running htbs as reported by tc that have names. + Turn off HTBs without names. + """ + livehtbs = {} + for params in bwlimit.get(): + (xid, share, + minrate, maxrate, + minexemptrate, maxexemptrate, + usedbytes, usedi2bytes) = params + + name = bwlimit.get_slice(xid) + + + + if (name is None) \ + and (xid != root_xid) \ + and (xid != default_xid): + # Orphaned (not associated with a slice) class + name = "%d?" % xid + logger.log("bwmon: Found orphaned HTB %s. Removing." %name) + bwlimit.off(xid) + + livehtbs[xid] = {'share': share, + 'minrate': minrate, + 'maxrate': maxrate, + 'maxexemptrate': maxexemptrate, + 'minexemptrate': minexemptrate, + 'usedbytes': usedbytes, + 'name': name, + 'usedi2bytes': usedi2bytes} + + return livehtbs + +def sync(nmdbcopy): + """ + Syncs tc, db, and bwmon.dat. Then, starts new slices, kills old ones, and updates byte accounts for each running slice. Sends emails and caps those that went over their limit. + """ # Defaults global datafile, \ period, \ @@ -402,7 +444,6 @@ def GetSlivers(db): # All slices names = [] - # Incase the limits have changed. default_MaxRate = int(bwlimit.get_bwcap() / 1000) default_Maxi2Rate = int(bwlimit.bwmax / 1000) @@ -417,11 +458,11 @@ def GetSlivers(db): (version, slices) = pickle.load(f) f.close() # Check version of data file - if version != "$Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $": + if version != "$Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $": logger.log("bwmon: Not using old version '%s' data file %s" % (version, datafile)) raise Exception except Exception: - version = "$Id: bwmon.py,v 1.18 2007/04/25 22:19:59 faiyaza Exp $" + version = "$Id: bwmon.py,v 1.1.2.9 2007/04/26 19:09:05 faiyaza Exp $" slices = {} # Get/set special slice IDs @@ -441,107 +482,108 @@ def GetSlivers(db): live = {} # Get running slivers that should be on this node (from plc). {xid: name} - for sliver in db.keys(): - live[bwlimit.get_xid(sliver)] = sliver + # db keys on name, bwmon keys on xid. db doesnt have xid either. + for plcSliver in nmdbcopy.keys(): + live[bwlimit.get_xid(plcSliver)] = nmdbcopy[plcSliver] - # Setup new slices. - # live.xids - runing(slices).xids = new.xids - newslicesxids = [] - for plcxid in live.keys(): - if plcxid not in slices.keys(): - newslicesxids.append(plcxid) + logger.log("bwmon: Found %s instantiated slices" % live.keys().__len__()) + logger.log("bwmon: Found %s slices in dat file" % slices.values().__len__()) + + # Get actual running values from tc. + # Update slice totals and bandwidth. {xid: {values}} + livehtbs = gethtbs(root_xid, default_xid) + logger.log("bwmon: Found %s running HTBs" % livehtbs.keys().__len__()) - #newslicesxids = Set(live.keys()) - Set(slices.keys()) - for newslicexid in newslicesxids: + # Get new slices. + # live.xids - runing(slices).xids = new.xids + #newslicesxids = Set(live.keys()) - Set(slices.keys()) + newslicesxids = Set(live.keys()) - Set(livehtbs.keys()) + logger.log("bwmon: Found %s new slices" % newslicesxids.__len__()) + + # Incase we rebooted and need to bring up the htbs that are in the db but + # not known to tc. + #nohtbxids = Set(slices.keys()) - Set(livehtbs.keys()) + #logger.log("bwmon: Found %s slices that should have htbs but dont." % nohtbxids.__len__()) + #newslicesxids.update(nohtbxids) + + # Setup new slices + for newslice in newslicesxids: # Delegated slices dont have xids (which are uids) since they haven't been # instantiated yet. - if newslicexid != None and db[live[newslicexid]].has_key('_rspec') == True: - logger.log("bwmon: New Slice %s" % live[newslicexid]) + if newslice != None and live[newslice].has_key('_rspec') == True: + logger.log("bwmon: New Slice %s" % live[newslice]['name']) # _rspec is the computed rspec: NM retrieved data from PLC, computed loans # and made a dict of computed values. - rspec = db[live[newslicexid]]['_rspec'] - slices[newslicexid] = Slice(newslicexid, live[newslicexid], rspec) - slices[newslicexid].reset(0, 0, 0, 0, rspec) + slices[newslice] = Slice(newslice, live[newslice]['name'], live[newslice]['_rspec']) + slices[newslice].reset(0, 0, 0, 0, live[newslice]['_rspec']) else: - logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslicexid]) + logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslice]['name']) - # ...mlhuang's abortion.... - # Get actual running values from tc. - # Update slice totals and bandwidth. - for params in bwlimit.get(): - (xid, share, - minrate, maxrate, - minexemptrate, maxexemptrate, - usedbytes, usedi2bytes) = params - - # Ignore root and default buckets + # Delete dead slices. + # First delete dead slices that exist in the pickle file, but + # aren't instantiated by PLC. + dead = Set(slices.keys()) - Set(live.keys()) + logger.log("bwmon: Found %s dead slices" % (dead.__len__() - 2)) + for xid in dead: if xid == root_xid or xid == default_xid: continue + logger.log("bwmon: removing dead slice %s " % xid) + if slices.has_key(xid): del slices[xid] + if livehtbs.has_key(xid): bwlimit.off(xid) - name = bwlimit.get_slice(xid) - if name is None: - # Orphaned (not associated with a slice) class - name = "%d?" % xid - bwlimit.off(xid) + # Get actual running values from tc since we've added and removed buckets. + # Update slice totals and bandwidth. {xid: {values}} + livehtbs = gethtbs(root_xid, default_xid) + logger.log("bwmon: now %s running HTBs" % livehtbs.keys().__len__()) + for (xid, slice) in slices.iteritems(): # Monitor only the specified slices + if xid == root_xid or xid == default_xid: continue if names and name not in names: continue - #slices is populated from the pickle file - #xid is populated from bwlimit (read from /etc/passwd) - if slices.has_key(xid): - slice = slices[xid] - # Old slices werent being instanciated correctly because - # the HTBs were still pleasent, but the slice in bwmon would - # have the byte counts set to 0. The next time update was run - # the real byte count would be sent to update, causing the bw cap. - if time.time() >= (slice.time + period) or \ - usedbytes < slice.bytes or \ - usedi2bytes < slice.i2bytes or \ - xid in newslicesxids: - # Reset to defaults every 24 hours or if it appears - # that the byte counters have overflowed (or, more - # likely, the node was restarted or the HTB buckets - # were re-initialized). - slice.reset(maxrate, \ - maxexemptrate, \ - usedbytes, \ - usedi2bytes, \ - db[slice.name]['_rspec']) - else: - # Update byte counts - slice.update(maxrate, \ - maxexemptrate, \ - usedbytes, \ - usedi2bytes, \ - db[slice.name]['_rspec']) + if (time.time() >= (slice.time + period)) or \ + (livehtbs[xid]['usedbytes'] < slice.bytes) or \ + (livehtbs[xid]['usedi2bytes'] < slice.i2bytes): + # Reset to defaults every 24 hours or if it appears + # that the byte counters have overflowed (or, more + # likely, the node was restarted or the HTB buckets + # were re-initialized). + slice.reset(livehtbs[xid]['maxrate'], \ + livehtbs[xid]['maxexemptrate'], \ + livehtbs[xid]['usedbytes'], \ + livehtbs[xid]['usedi2bytes'], \ + live[xid]['_rspec']) else: - # Just in case. Probably (hopefully) this will never happen. - # New slice, initialize state - logger.log("bwmon: Deleting orphaned slice xid %s" % xid) - bwlimit.off(xid) - - # Delete dead slices - dead = Set(slices.keys()) - Set(live.keys()) - for xid in dead: - if xid == root_xid or xid == default_xid: - continue - del slices[xid] - bwlimit.off(xid) - - logger.log("bwmon: Saving %s" % datafile) + if debug: logger.log("bwmon: Updating slice %s" % slice.name) + # Update byte counts + slice.update(livehtbs[xid]['maxrate'], \ + livehtbs[xid]['maxexemptrate'], \ + livehtbs[xid]['usedbytes'], \ + livehtbs[xid]['usedi2bytes'], \ + live[xid]['_rspec']) + + logger.log("bwmon: Saving %s slices in %s" % (slices.keys().__len__(),datafile)) f = open(datafile, "w") pickle.dump((version, slices), f) f.close() - -#def GetSlivers(data): -# for sliver in data['slivers']: -# if sliver.has_key('attributes'): -# print sliver -# for attribute in sliver['attributes']: -# if attribute['name'] == "KByteThresh": print attribute['value'] - -#def start(options, config): -# pass +lock = threading.Event() +def run(): + """When run as a thread, wait for event, lock db, deep copy it, release it, run bwmon.GetSlivers(), then go back to waiting.""" + if debug: logger.log("bwmon: Thread started") + while True: + lock.wait() + if debug: logger.log("bwmon: Event received. Running.") + database.db_lock.acquire() + nmdbcopy = copy.deepcopy(database.db) + database.db_lock.release() + try: sync(nmdbcopy) + except: logger.log_exc() + lock.clear() + +def start(*args): + tools.as_daemon_thread(run) + +def GetSlivers(*args): + pass diff --git a/database.py b/database.py index 7b95ed2..be304d3 100644 --- a/database.py +++ b/database.py @@ -102,10 +102,11 @@ class Database(dict): if name not in self: accounts.get(name).ensure_destroyed() for name, rec in self.iteritems(): if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec) + if rec['instantiation'] == 'nm-controller': accounts.get(name).ensure_created(rec) - try: bwmon.GetSlivers(self) - except: logger.log_exc() - + #try: bwmon.GetSlivers(self) + #except: logger.log_exc() + bwmon.lock.set() # request a database dump global dump_requested dump_requested = True diff --git a/nm.py b/nm.py index 46f23f5..243a782 100644 --- a/nm.py +++ b/nm.py @@ -55,7 +55,7 @@ def run(): print "Warning while writing PID file:", err # Load and start modules - for module in ['net', 'proper', 'conf_files', 'sm']: + for module in ['net', 'proper', 'conf_files', 'sm', 'bwmon']: try: m = __import__(module) m.start(options, config) diff --git a/sm.py b/sm.py index 87b2307..7084ef0 100644 --- a/sm.py +++ b/sm.py @@ -112,15 +112,15 @@ def GetSlivers(data, fullupdate=True): DEFAULT_ALLOCATION['net_max_rate'] = network['bwlimit'] / 1000 ### Emulab-specific hack begins here - emulabdelegate = { - 'instantiation': 'plc-instantiated', - 'keys': '''ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA5Rimz6osRvlAUcaxe0YNfGsLL4XYBN6H30V3l/0alZOSXbGOgWNdEEdohwbh9E8oYgnpdEs41215UFHpj7EiRudu8Nm9mBI51ARHA6qF6RN+hQxMCB/Pxy08jDDBOGPefINq3VI2DRzxL1QyiTX0jESovrJzHGLxFTB3Zs+Y6CgmXcnI9i9t/zVq6XUAeUWeeXA9ADrKJdav0SxcWSg+B6F1uUcfUd5AHg7RoaccTldy146iF8xvnZw0CfGRCq2+95AU9rbMYS6Vid8Sm+NS+VLaAyJaslzfW+CAVBcywCOlQNbLuvNmL82exzgtl6fVzutRFYLlFDwEM2D2yvg4BQ== root@boss.emulab.net''', - 'name': 'utah_elab_delegate', - 'timestamp': data['timestamp'], - 'type': 'delegate', - 'vref': None - } - database.db.deliver_record(emulabdelegate) +# emulabdelegate = { +# 'instantiation': 'plc-instantiated', +# 'keys': '''ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA5Rimz6osRvlAUcaxe0YNfGsLL4XYBN6H30V3l/0alZOSXbGOgWNdEEdohwbh9E8oYgnpdEs41215UFHpj7EiRudu8Nm9mBI51ARHA6qF6RN+hQxMCB/Pxy08jDDBOGPefINq3VI2DRzxL1QyiTX0jESovrJzHGLxFTB3Zs+Y6CgmXcnI9i9t/zVq6XUAeUWeeXA9ADrKJdav0SxcWSg+B6F1uUcfUd5AHg7RoaccTldy146iF8xvnZw0CfGRCq2+95AU9rbMYS6Vid8Sm+NS+VLaAyJaslzfW+CAVBcywCOlQNbLuvNmL82exzgtl6fVzutRFYLlFDwEM2D2yvg4BQ== root@boss.emulab.net''', + # 'name': 'utah_elab_delegate', + # 'timestamp': data['timestamp'], + # 'type': 'delegate', + # 'vref': None + # } + # database.db.deliver_record(emulabdelegate) ### Emulab-specific hack ends here @@ -143,14 +143,20 @@ def GetSlivers(data, fullupdate=True): keys = rec.pop('keys') rec.setdefault('keys', '\n'.join([key_struct['key'] for key_struct in keys])) - rec.setdefault('type', attr_dict.get('type', 'sliver.VServer')) + # Handle nm controller here + rec.setdefault('type', attr_dict.get('type', 'sliver.VServer')) + if rec['instantiation'] == 'nm-controller': + # type isn't returned by GetSlivers() for whatever reason. We're overloading + # instantiation here, but i suppose its the ssame thing when you think about it. -FA + rec['type'] = 'delegate' + rec.setdefault('vref', attr_dict.get('vref', 'default')) is_id = attr_dict.get('plc_initscript_id') if is_id is not None and is_id in initscripts_by_id: rec['initscript'] = initscripts_by_id[is_id] else: rec['initscript'] = '' - rec.setdefault('delegations', []) # XXX - delegation not yet supported + rec.setdefault('delegations', []) # extract the implied rspec rspec = {} -- 2.43.0