From 724fae2cbf404dbc9860dcf5e50674e8fba155f2 Mon Sep 17 00:00:00 2001 From: Faiyaz Ahmed Date: Mon, 23 Apr 2007 19:47:50 +0000 Subject: [PATCH] Merge from head. Includes fix for initscripts also includes bw loans --- NodeManager.spec | 4 +- bwmon.py | 192 +++++++++++++++++++++++++---------------------- conf_files.py | 2 +- database.py | 4 +- nm.py | 4 +- sliver_vs.py | 2 +- sm.py | 16 +++- tools.py | 7 +- 8 files changed, 132 insertions(+), 99 deletions(-) diff --git a/NodeManager.spec b/NodeManager.spec index 6fe9166..41a6653 100644 --- a/NodeManager.spec +++ b/NodeManager.spec @@ -1,7 +1,7 @@ Summary: PlanetLab Node Manager Name: NodeManager -Version: 1.2 -Release: 3%{?pldistro:.%{pldistro}}%{?date:.%{date}} +Version: 1.3 +Release: %{?pldistro:.%{pldistro}}%{?date:.%{date}} License: PlanetLab Group: System Environment/Daemons URL: http://cvs.planet-lab.org/cvs/NodeManager diff --git a/bwmon.py b/bwmon.py index d362454..de7715d 100644 --- a/bwmon.py +++ b/bwmon.py @@ -24,11 +24,11 @@ import time import pickle import socket -#import xmlrpclib import bwlimit import logger from sets import Set + try: sys.path.append("/etc/planetlab") from plc_config import * @@ -39,10 +39,6 @@ except: PLC_MAIL_SUPPORT_ADDRESS = "support@planet-lab.org" PLC_MAIL_SLICE_ADDRESS = "SLICE@slices.planet-lab.org" - -# Utility functions -#from pl_mom import * - # Constants seconds_per_day = 24 * 60 * 60 bits_per_byte = 8 @@ -183,7 +179,7 @@ class Slice: """ - def __init__(self, xid, name, data): + def __init__(self, xid, name, rspec): self.xid = xid self.name = name self.time = 0 @@ -198,9 +194,10 @@ class Slice: self.Maxi2KByte = default_Maxi2KByte self.Threshi2KByte = default_Threshi2KByte self.Share = default_Share + self.Sharei2 = default_Share self.emailed = False - self.updateSliceAttributes(data) + self.updateSliceAttributes(rspec) bwlimit.set(xid = self.xid, minrate = self.MinRate * 1000, maxrate = self.MaxRate * 1000, @@ -208,75 +205,75 @@ class Slice: minexemptrate = self.Mini2Rate * 1000, share = self.Share) - def __repr__(self): return self.name - def updateSliceAttributes(self, data): - # Incase the limits have changed. - if (self.MaxRate != default_MaxRate) or \ - (self.Maxi2Rate != default_Maxi2Rate): - self.MaxRate = int(bwlimit.get_bwcap() / 1000) - self.Maxi2Rate = int(bwlimit.bwmax / 1000) - + def updateSliceAttributes(self, rspec): # Get attributes - for sliver in data['slivers']: - if sliver['name'] == self.name: - for attribute in sliver['attributes']: - if attribute['name'] == 'net_min_rate': - logger.log("bwmon: Updating %s. Min Rate = %s" \ - %(self.name, self.MinRate)) - # To ensure min does not go above 25% of nodecap. - if int(attribute['value']) > int(.25 * default_MaxRate): - self.MinRate = int(.25 * default_MaxRate) - else: - self.MinRate = int(attribute['value']) - elif attribute['name'] == 'net_max_rate': - self.MaxRate = int(attribute['value']) - logger.log("bwmon: Updating %s. Max Rate = %s" \ - %(self.name, self.MaxRate)) - elif attribute['name'] == 'net_i2_min_rate': - self.Mini2Rate = int(attribute['value']) - logger.log("bwmon: Updating %s. Min i2 Rate = %s" \ - %(self.name, self.Mini2Rate)) - elif attribute['name'] == 'net_i2_max_rate': - self.Maxi2Rate = int(attribute['value']) - logger.log("bwmon: Updating %s. Max i2 Rate = %s" \ - %(self.name, self.Maxi2Rate)) - elif attribute['name'] == 'net_max_kbyte': - self.MaxKByte = int(attribute['value']) - logger.log("bwmon: Updating %s. Max KByte lim = %s" \ - %(self.name, self.MaxKByte)) - elif attribute['name'] == 'net_i2_max_kbyte': - self.Maxi2KByte = int(attribute['value']) - logger.log("bwmon: Updating %s. Max i2 KByte = %s" \ - %(self.name, self.Maxi2KByte)) - elif attribute['name'] == 'net_thresh_kbyte': - self.ThreshKByte = int(attribute['value']) - logger.log("bwmon: Updating %s. Thresh KByte = %s" \ - %(self.name, self.ThreshKByte)) - elif attribute['name'] == 'net_i2_thresh_kbyte': - self.Threshi2KByte = int(attribute['value']) - logger.log("bwmon: Updating %s. i2 Thresh KByte = %s" \ - %(self.name, self.Threshi2KByte)) - elif attribute['name'] == 'net_share': - self.Share = int(attribute['value']) - logger.log("bwmon: Updating %s. Net Share = %s" \ - %(self.name, self.Share)) - elif attribute['name'] == 'net_i2_share': - self.Sharei2 = int(attribute['value']) - logger.log("bwmon: Updating %s. Net i2 Share = %s" \ - %(self.name, self.i2Share)) - - - def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, data): + + # Sanity check plus policy decision for MinRate: + # Minrate cant be greater than 25% of MaxRate or NodeCap. + MinRate = int(rspec.get("net_min_rate", default_MinRate)) + if MinRate > int(.25 * default_MaxRate): + MinRate = int(.25 * default_MaxRate) + if MinRate != self.MinRate: + self.MinRate = MinRate + logger.log("bwmon: Updating %s: Min Rate = %s" %(self.name, self.MinRate)) + + MaxRate = int(rspec.get('net_max_rate', bwlimit.get_bwcap() / 1000)) + if MaxRate != self.MaxRate: + self.MaxRate = MaxRate + logger.log("bwmon: Updating %s: Max Rate = %s" %(self.name, self.MaxRate)) + + Mini2Rate = int(rspec.get('net_i2_min_rate', default_Mini2Rate)) + if Mini2Rate != self.Mini2Rate: + self.Mini2Rate = Mini2Rate + logger.log("bwmon: Updating %s: Min i2 Rate = %s" %(self.name, self.Mini2Rate)) + + Maxi2Rate = int(rspec.get('net_i2_max_rate', bwlimit.bwmax / 1000)) + if Maxi2Rate != self.Maxi2Rate: + self.Maxi2Rate = Maxi2Rate + logger.log("bwmon: Updating %s: Max i2 Rate = %s" %(self.name, self.Maxi2Rate)) + + MaxKByte = int(rspec.get('net_max_kbyte', default_MaxKByte)) + if MaxKByte != self.MaxKByte: + self.MaxKByte = MaxKByte + logger.log("bwmon: Updating %s: Max KByte lim = %s" %(self.name, self.MaxKByte)) + + Maxi2KByte = int(rspec.get('net_i2_max_kbyte', default_Maxi2KByte)) + if Maxi2KByte != self.Maxi2KByte: + self.Maxi2KByte = Maxi2KByte + logger.log("bwmon: Updating %s: Max i2 KByte = %s" %(self.name, self.Maxi2KByte)) + + ThreshKByte = int(rspec.get('net_thresh_kbyte', default_ThreshKByte)) + if ThreshKByte != self.ThreshKByte: + self.ThreshKByte = ThreshKByte + logger.log("bwmon: Updating %s: Thresh KByte = %s" %(self.name, self.ThreshKByte)) + + Threshi2KByte = int(rspec.get('net_i2_thresh_kbyte', default_Threshi2KByte)) + if Threshi2KByte != self.Threshi2KByte: + self.Threshi2KByte = Threshi2KByte + logger.log("bwmon: Updating %s: i2 Thresh KByte = %s" %(self.name, self.Threshi2KByte)) + + Share = int(rspec.get('net_share', default_Share)) + if Share != self.Share: + self.Share = Share + logger.log("bwmon: Updating %s: Net Share = %s" %(self.name, self.Share)) + + Sharei2 = int(rspec.get('net_i2_share', default_Share)) + if Sharei2 != self.Sharei2: + self.Sharei2 = Sharei2 + logger.log("bwmon: Updating %s: Net i2 Share = %s" %(self.name, self.i2Share)) + + + def reset(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): """ Begin a new recording period. Remove caps by restoring limits to their default values. """ # Query Node Manager for max rate overrides - self.updateSliceAttributes(data) + self.updateSliceAttributes(rspec) # Reset baseline time self.time = time.time() @@ -302,14 +299,14 @@ class Slice: minexemptrate = self.Mini2Rate * 1000, share = self.Share) - def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, data): + def update(self, runningmaxrate, runningmaxi2rate, usedbytes, usedi2bytes, rspec): """ Update byte counts and check if byte limits have been exceeded. """ # Query Node Manager for max rate overrides - self.updateSliceAttributes(data) + self.updateSliceAttributes(rspec) # Prepare message parameters from the template message = "" @@ -320,7 +317,7 @@ class Slice: 'period': format_period(period)} if usedbytes >= (self.bytes + (self.ThreshKByte * 1024)): - sum = self.bytes + (self.ThreshKBytes * 1024) + sum = self.bytes + (self.ThreshKByte * 1024) maxbyte = self.MaxKByte * 1024 bytesused = usedbytes - self.bytes timeused = int(time.time() - self.time) @@ -385,7 +382,7 @@ class Slice: self.emailed = True slicemail(self.name, subject, message + (footer % params)) -def GetSlivers(data): +def GetSlivers(db): # Defaults global datafile, \ period, \ @@ -427,27 +424,35 @@ def GetSlivers(data): root_xid = bwlimit.get_xid("root") default_xid = bwlimit.get_xid("default") + # Since root is required for sanity, its not in the API/plc database, so pass {} + # to use defaults. if root_xid not in slices.keys(): - slices[root_xid] = Slice(root_xid, "root", data) - slices[root_xid].reset(0, 0, 0, 0, data) - + slices[root_xid] = Slice(root_xid, "root", {}) + slices[root_xid].reset(0, 0, 0, 0, {}) + + # Used by bwlimit. pass {} since there is no rspec (like above). if default_xid not in slices.keys(): - slices[default_xid] = Slice(default_xid, "default", data) - slices[default_xid].reset(0, 0, 0, 0, data) + slices[default_xid] = Slice(default_xid, "default", {}) + slices[default_xid].reset(0, 0, 0, 0, {}) live = {} - # Get running slivers. {xid: name} - for sliver in data['slivers']: - live[bwlimit.get_xid(sliver['name'])] = sliver['name'] + # Get running slivers that should be on this node (from plc). {xid: name} + for sliver in db.keys(): + live[bwlimit.get_xid(sliver)] = sliver # Setup new slices. # live.xids - runing.xids = new.xids newslicesxids = Set(live.keys()) - Set(slices.keys()) for newslicexid in newslicesxids: - if newslicexid != None: + # Delegated slices dont have xids (which are uids) since they haven't been + # instantiated yet. + if newslicexid != None and db[live[newslicexid]].has_key('_rspec') == True: logger.log("bwmon: New Slice %s" % live[newslicexid]) - slices[newslicexid] = Slice(newslicexid, live[newslicexid], data) - slices[newslicexid].reset(0, 0, 0, 0, data) + # _rspec is the computed rspec: NM retrieved data from PLC, computed loans + # and made a dict of computed values. + rspec = db[live[newslicexid]]['_rspec'] + slices[newslicexid] = Slice(newslicexid, live[newslicexid], rspec) + slices[newslicexid].reset(0, 0, 0, 0, rspec) else: logger.log("bwmon Slice %s doesn't have xid. Must be delegated. Skipping." % live[newslicexid]) # Get actual running values from tc. @@ -481,16 +486,28 @@ def GetSlivers(data): # that the byte counters have overflowed (or, more # likely, the node was restarted or the HTB buckets # were re-initialized). - slice.reset(maxrate, maxexemptrate, usedbytes, usedi2bytes, data) + slice.reset(maxrate, \ + maxexemptrate, \ + usedbytes, \ + usedi2bytes, \ + db[slice.name]['_rspec']) else: # Update byte counts - slice.update(maxrate, maxexemptrate, usedbytes, usedi2bytes, data) + slice.update(maxrate, \ + maxexemptrate, \ + usedbytes, \ + usedi2bytes, \ + db[slice.name]['_rspec']) else: # Just in case. Probably (hopefully) this will never happen. # New slice, initialize state logger.log("bwmon: New Slice %s" % name) - slice = slices[xid] = Slice(xid, name, data) - slice.reset(maxrate, maxexemptrate, usedbytes, usedi2bytes, data) + slice = slices[xid] = Slice(xid, name, db[slice.name]['_rspec']) + slice.reset(maxrate, \ + maxexemptrate, \ + usedbytes, \ + usedi2bytes, \ + db[slice.name]['_rspec']) # Delete dead slices dead = Set(slices.keys()) - Set(live.keys()) @@ -513,6 +530,5 @@ def GetSlivers(data): # for attribute in sliver['attributes']: # if attribute['name'] == "KByteThresh": print attribute['value'] -def start(options, config): - pass - +#def start(options, config): +# pass diff --git a/conf_files.py b/conf_files.py index 5d1cb56..4903b3f 100644 --- a/conf_files.py +++ b/conf_files.py @@ -29,7 +29,7 @@ class conf_files: def system(self, cmd): if not self.noscripts and cmd: logger.log('conf_files: running command %s' % cmd) - return os.system(cmd) + return tools.fork_as(None, os.system, cmd) else: return 0 def update_conf_file(self, cf_rec): diff --git a/database.py b/database.py index 58dffbe..878a7ba 100644 --- a/database.py +++ b/database.py @@ -19,7 +19,7 @@ import time import accounts import logger import tools - +import bwmon # We enforce minimum allocations to keep the clueless from hosing their slivers. # Disallow disk loans because there's currently no way to punish slivers over quota. @@ -103,6 +103,8 @@ class Database(dict): for name, rec in self.iteritems(): if rec['instantiation'] == 'plc-instantiated': accounts.get(name).ensure_created(rec) + bwmon.GetSlivers(self) + # request a database dump global dump_requested dump_requested = True diff --git a/nm.py b/nm.py index c4760fc..1df65be 100644 --- a/nm.py +++ b/nm.py @@ -55,7 +55,7 @@ def run(): print "Warning while writing PID file:", err # Load and start modules - for module in ['net', 'proper', 'conf_files', 'sm', 'bwmon']: + for module in ['net', 'proper', 'conf_files', 'sm']: try: m = __import__(module) m.start(options, config) @@ -75,7 +75,7 @@ def run(): while True: try: GetSlivers(plc) except: logger.log_exc() - time.sleep(options.period) + time.sleep(options.period + random.randrange(0,301)) except: logger.log_exc() diff --git a/sliver_vs.py b/sliver_vs.py index adbebd0..27b8663 100644 --- a/sliver_vs.py +++ b/sliver_vs.py @@ -84,7 +84,7 @@ class Sliver_VS(accounts.Account, vserver.VServer): accounts.Account.configure(self, rec) # install ssh keys def start(self, delay=0): - if self.rspec['enabled']: + if self.rspec['enabled'] > 0: logger.log('%s: starting in %d seconds' % (self.name, delay)) time.sleep(delay) child_pid = os.fork() diff --git a/sm.py b/sm.py index 680120c..cb90364 100644 --- a/sm.py +++ b/sm.py @@ -30,6 +30,10 @@ DEFAULT_ALLOCATION = { 'net_i2_min_rate': bwmin / 1000, # kbps 'net_i2_max_rate': bwmax / 1000, # kbps 'net_i2_share': 1, # proportional share + 'net_max_kbyte' : 5662310, #Kbyte + 'net_thresh_kbyte': 4529848, #Kbyte + 'net_i2_max_kbyte': 17196646, + 'net_i2_thresh_kbyte': 13757316, 'disk_max': 5000000 # bytes } @@ -68,6 +72,12 @@ def GetSlivers(data, fullupdate=True): } database.db.deliver_record(emulabdelegate) ### Emulab-specific hack ends here + + + initscripts_by_id = {} + for is_rec in data['initscripts']: + initscripts_by_id[str(is_rec['initscript_id'])] = is_rec['script'] + for sliver in data['slivers']: rec = sliver.copy() rec.setdefault('timestamp', data['timestamp']) @@ -82,7 +92,11 @@ def GetSlivers(data, fullupdate=True): rec.setdefault('type', attr_dict.get('type', 'sliver.VServer')) rec.setdefault('vref', attr_dict.get('vref', 'default')) - rec.setdefault('initscript', attr_dict.get('initscript', '')) + is_id = attr_dict.get('plc_initscript_id') + if is_id is not None and is_id in initscripts_by_id: + rec['initscript'] = initscripts_by_id[is_id] + else: + rec['initscript'] = '' rec.setdefault('delegations', []) # XXX - delegation not yet supported # extract the implied rspec diff --git a/tools.py b/tools.py index 994089e..6b3a84b 100644 --- a/tools.py +++ b/tools.py @@ -43,9 +43,10 @@ def fork_as(su, function, *args): try: os.chdir('/') close_nonstandard_fds() - pw_ent = pwd.getpwnam(su) - os.setegid(pw_ent[3]) - os.seteuid(pw_ent[2]) + if su: + pw_ent = pwd.getpwnam(su) + os.setegid(pw_ent[3]) + os.seteuid(pw_ent[2]) child_pid = os.fork() if child_pid == 0: function(*args) except: -- 2.43.0