From 4e5ca52f5f61668751cf2b014b63cdfde3b6a318 Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Mon, 2 Feb 2009 19:03:58 +0000 Subject: [PATCH] complete connectRegistry. Also minor updates and fixes --- geni/aggregate.py | 57 +++++---- geni/slicemgr.py | 282 ++++++++++++++++++++++++--------------------- geni/util/rspec.py | 15 +++ 3 files changed, 200 insertions(+), 154 deletions(-) diff --git a/geni/aggregate.py b/geni/aggregate.py index df0f672d..c74355f3 100644 --- a/geni/aggregate.py +++ b/geni/aggregate.py @@ -19,9 +19,9 @@ class Aggregate(GeniServer): hrn = None nodes_ttl = None - nodes = {} - slices = {} - policy = {} + nodes = None + slices = None + policy = None timestamp = None threshold = None shell = None @@ -35,8 +35,10 @@ class Aggregate(GeniServer): # @param key_file private key filename of registry # @param cert_file certificate filename containing public key (could be a GID file) - def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/util/geni_config"): + def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"): GeniServer.__init__(self, ip, port, key_file, cert_file) + self.key_file = key_file + self.cert_file = cert_file self.conf = Config(config) basedir = self.conf.GENI_BASE_DIR + os.sep server_basedir = basedir + os.sep + "geni" + os.sep @@ -44,15 +46,17 @@ class Aggregate(GeniServer): nodes_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.components']) self.nodes = SimpleStorage(nodes_file) + self.nodes.load() - node_slices_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.slices']) - self.slices = SimpleStorage(node_slices_file) + slices_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.slices']) + self.slices = SimpleStorage(slices_file) self.slices.load() - policy_file = os.sep.join([server_basedir, 'policy']) - self.policy = SimpleStorage(policy_file, {'whitelist': [], 'blacklist': []}) + policy_file = os.sep.join([server_basedir, 'agg.policy']) + self.policy = SimpleStorage(policy_file) + self.policy.load() - timestamp_file = os.sep.join([server_basedir, 'components', self.hrn + '.timestamp']) + timestamp_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.timestamp']) self.timestamp = SimpleStorage(timestamp_file) self.nodes_ttl = 1 @@ -63,7 +67,11 @@ class Aggregate(GeniServer): """ Connect to the registry """ - pass + # connect to registry using GeniClient + address = self.config.GENI_REGISTRY_HOSTNAME + port = self.config.GENI_REGISTRY_PORT + url = 'https://%(address)s:%(port)s' % locals() + self.registry = GeniClient(url, self.key_file, self.cert_file) def connectPLC(self): """ @@ -176,6 +184,9 @@ class Aggregate(GeniServer): return self.nodes.keys() def get_rspec(self, hrn, type): + """ + Get resource information from PLC + """ # Get the required nodes if type in ['aggregate']: @@ -204,7 +215,7 @@ class Aggregate(GeniServer): # convert and threshold to ints timestamp = self.timestamp['timestamp'] start_time = int(self.timestamp['timestamp'].strftime("%s")) - end_time = int(self.duration.strftime("%s")) + end_time = int(self.threshold.strftime("%s")) duration = end_time - start_time # create the plc dict @@ -232,19 +243,19 @@ class Aggregate(GeniServer): """ Instantiate the specified slice according to whats defined in the rspec. """ - slicename = self.hrn_to_plcslicename(slice_hrn) - - # extract node list from rspec - spec = Rspec(rspec) - nodespecs = spec.getDictsByTagName('NodeSpec') - nodes = [nodespec['name'] for nodespec in nodespecs] # save slice state locally - # we can assume that spec object has been validated so its safer to + # we can assume that spec object has been validated so its safer to # save this instead of the unvalidated rspec the user gave us self.slices[slice_hrn] = spec.toxml() self.slices.write() + # extract node list from rspec + slicename = self.hrn_to_plcslicename(slice_hrn) + spec = Rspec(rspec) + nodespecs = spec.getDictsByTagName('NodeSpec') + nodes = [nodespec['name'] for nodespec in nodespecs] + # add slice to nodes at plc self.shell.AddSliceToNodes(self.auth, slicename, nodes) for attribute in attributes: @@ -320,7 +331,7 @@ class Aggregate(GeniServer): slicename = self.hrn_to_plcslicename(slice_hrn) slices = shell.GetSlices(self.auth, [slicename]) if not slice: - raise RecordNotFound(slice_hrn) + return 1 slice = slices[0] shell.DeleteSliceFromNodes(self.auth, slicename, slice['node_ids']) @@ -333,7 +344,8 @@ class Aggregate(GeniServer): slicename = hrn_to_plcslicename(slice_hrn) slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id']) if not slices: - raise RecordNotFound(slice_hrn) + #raise RecordNotFound(slice_hrn) + return 1 slice_id = slices[0] atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) attribute_id = attreibutes[0] @@ -347,7 +359,8 @@ class Aggregate(GeniServer): slicename = hrn_to_plcslicename(slice_hrn) slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id']) if not slices: - raise RecordNotFound(slice_hrn) + #raise RecordNotFound(slice_hrn) + return 1 slice_id = slices[0] atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) attribute_id = attreibutes[0] @@ -359,7 +372,7 @@ class Aggregate(GeniServer): """ Reset the slice """ - slicename = self.hrn_to_plcslicename(slice_hrn) + # XX not yet implemented return 1 def get_policy(self): diff --git a/geni/slicemgr.py b/geni/slicemgr.py index a64a8b01..9e6be186 100644 --- a/geni/slicemgr.py +++ b/geni/slicemgr.py @@ -19,10 +19,11 @@ class SliceMgr(GeniServer): hrn = None key_file = None cert_file = None - nodes = {} - slices = {} - policy = {} - aggregates = {} + nodes_ttl = None + nodes = None + slices = None + policy = None + aggregates = None timestamp = None threshold = None shell = None @@ -37,30 +38,41 @@ class SliceMgr(GeniServer): # @param key_file private key filename of registry # @param cert_file certificate filename containing public key (could be a GID file) - def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/util/geni_config"): + def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"): GeniServer.__init__(ip, port, key_file, cert_file) self.key_file = key_file self.cert_file = cert_file self.conf = Config(config) basedir = self.conf.GENI_BASE_DIR + os.sep server_basedir = basedir + os.sep + "geni" + os.sep - self.hrn = conf.GENI_INTERFACE_HRN - + self.hrn = conf.GENI_INTERFACE_HRN + # Get list of aggregates this sm talks to + # XX do we use simplestorage to maintain this file manually? aggregates_file = server_basedir + os.sep + 'aggregates' self.aggregates = SimpleStorage(aggregates_file) - self.load_aggregates(aggregates_file) + self.connect_aggregates(aggregates_file) - components_file = os.sep.join([server_basedir, 'components', 'slicemgr.' + hrn + '.comp']) + nodes_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.components']) + self.nodes = SimpleStorage(nodes_file) + self.nodes.load() - self.slices_file = os.sep.join([server_basedir, 'components', 'slicemgr' + hrn + '.slices']) - self.timestamp_file = os.sep.join([server_basedir, 'components', 'slicemgr' + hrn + '.timestamp']) - self.components_ttl = components_ttl - self.policy['whitelist'] = [] - self.policy['blacklist'] = [] - self.connect() + slices_file = os.sep.join([server_basedir, 'slicemgr' + self.hrn + '.slices']) + self.slices = SimpleStorage(slices_file) + self.slices.load() + + policy_file = os.sep.join([server_basedir, 'smgr.policy']) + self.policy = SimpleStorage(policy_file) + self.policy.load() - def load_aggregates(self, aggregates_file): + timestamp_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.timestamp']) + self.timestamp = SimpleStorage(timestamp_file) + + self.nodes_ttl = 1 + self.connectAggregates() + self.connectRegistry() + + def connect_aggregates(self, aggregates_file): """ Get info about the aggregates available to us from file and create an xmlrpc connection to each. If any info is invalid, skip it. @@ -128,27 +140,33 @@ class SliceMgr(GeniServer): aggregates = self.aggregates.keys() all_nodes = [] - all_slices = [] + nodedict = {} for aggregate in aggregates: try: # resolve components hostnames nodes = self.aggregates[aggregate].get_components() all_nodes.extend(nodes) - # update timestamp and threshold - self.timestamp = datetime.datetime.now() - delta = datetime.timedelta(hours=self.components_ttl) - self.threshold = self.timestamp + delta except: # XX print out to some error log pass - self.components = all_nodes - f = open(self.components_file, 'w') - f.write(str(self.components)) - f.close() - f = open(self.timestamp_file, 'w') - f.write(str(self.threshold)) - f.close() + for node in all_nodes: + if self.polciy['whitelist'] and node not in self.polciy['whitelist']: + continue + if self.polciy['blacklist'] and node in self.policy['blacklist']: + continue + + nodedict[node] = node + + self.nodes = SimpleStorate(self.nodes.db_filename, nodedict) + self.nodes.write() + + # update timestamp and threshold + self.timestamp['timestamp'] = datetime.datetime.now() + delta = datetime.timedelta(hours=self.nodes_tt1) + self.threshold = self.timestamp['timestamp'] + delta + self.timestamp.write() + def load_components(self): """ @@ -156,64 +174,26 @@ class SliceMgr(GeniServer): """ print "loading nodes" # Read component list from cached file - if os.path.exists(self.components_file): - f = open(self.components_file, 'r') - self.components = eval(f.read()) - f.close() - + self.nodes.load() + self.timestamp.load() time_format = "%Y-%m-%d %H:%M:%S" - if os.path.exists(self.timestamp_file): - f = open(self.timestamp_file, 'r') - timestamp = str(f.read()).split(".")[0] - self.timestamp = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format))) - delta = datetime.timedelta(hours=self.components_ttl) - self.threshold = self.timestamp + delta - f.close() + timestamp = self.timestamp['timestamp'] + self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format))) + delta = datetime.timedelta(hours=self.nodes_ttl) + self.threshold = self.timestamp['timestamp'] + delta def load_policy(self): """ Read the list of blacklisted and whitelisted nodes. """ - whitelist = [] - blacklist = [] - if os.path.exists(self.whitelist_file): - f = open(self.whitelist_file, 'r') - lines = f.readlines() - f.close() - for line in lines: - line = line.strip().replace(" ", "").replace("\n", "") - whitelist.extend(line.split(",")) - - - if os.path.exists(self.blacklist_file): - f = open(self.blacklist_file, 'r') - lines = f.readlines() - f.close() - for line in lines: - line = line.strip().replace(" ", "").replace("\n", "") - blacklist.extend(line.split(",")) - - self.policy['whitelist'] = whitelist - self.policy['blacklist'] = blacklist + self.policy.load() def load_slices(self): """ Read current slice instantiation states. """ print "loading slices" - if os.path.exists(self.slices_file): - f = open(self.components_file, 'r') - self.slices = eval(f.read()) - f.close() - - def write_slices(self): - """ - Write current slice instantiations to file. - """ - print "writing slices" - f = open(self.slices_file, 'w') - f.write(str(self.slices)) - f.close() + self.slices.load() def get_components(self): @@ -227,107 +207,145 @@ class SliceMgr(GeniServer): self.refresh_components() elif now < self.threshold and not self.components: self.load_components() - return self.components + return self.nodes.keys() def get_slices(self): """ Return a list of instnatiated managed by this slice manager. """ - now = datetime.datetime.now() - #self.load_components() - if not self.threshold or not self.timestamp or now > self.threshold: - self.refresh_components() - elif now < self.threshold and not self.slices: - self.load_components() - return self.slices - - def get_slivers(self, hrn): - """ - Return the list of slices instantiated at the specified component. - """ - - # hrn is assumed to be a component hrn - if hrn not in self.slices: - raise RecordNotFound(hrn) - - return self.slices[hrn] - - def get_rspec(self, hrn, type): - #rspec = Rspec() - if type in ['node']: - nodes = self.shell.GetNodes(self.auth) - elif type in ['slice']: - slices = self.shell.GetSlices(self.auth) - elif type in ['aggregate']: - pass + return dict(self.slices) def get_resources(self, slice_hrn): """ Return the current rspec for the specified slice. """ - slicename = hrn_to_plcslicename(slice_hrn) - rspec = self.get_rspec(slicenamem, 'slice' ) - + + cred = None + if slice_hrn in self.slices.keys(): + # check if we alreay have this slices state saved + rspec = self.slices[slice_hrn] + else: + # request this slices state from all known aggregates + rspecdicts = [] + for hrn in self.aggregates.keys(): + # XX need to use the right credentials for this call + # check if the slice has resources at this hrn + tempresources = self.aggregates[hrn].resources(cred, slice_hrn) + temprspec = Rspec() + temprspec.parseString(temprspec) + if temprspec.getDictsByTagName('NodeSpec'): + # append this rspec to the list of rspecs + rspecdicts.append(temprspec.toDict()) + + # merge all these rspecs into one + start_time = int(self.timestamp['timestamp'].strftime("%s")) + end_time = int(self.duration.strftime("%s")) + duration = end_time - start_time + + # create a plc dict + networks = [rspecdict['networks'][0] for rspecdict in rspecdicts] + resources = {'networks': networks, 'start_time': start_time, 'duration': duration} + # convert the plc dict to an rspec dict + resourceDict = RspecDict(resources) + resourceSpec = Rspec() + resourceSpec.parseDict(resourceDict) + rspec = resourceSpec.toxml() + # save this slices resources + self.slices[slice_hrn] = rspec + self.slices.write() + return rspec def create_slice(self, slice_hrn, rspec, attributes): """ Instantiate the specified slice according to whats defined in the rspec. """ + # XX need to gget the correct credentials + cred = None + + # save slice state locally + # we can assume that spec object has been validated so its safer to + # save this instead of the unvalidated rspec the user gave us + self.slices[slice_hrn] = spec.toxml() + self.slices.write() + + # extract network list from the rspec and create a separate + # rspec for each network slicename = self.hrn_to_plcslicename(slice_hrn) - #spec = Rspec(rspec) - node_hrns = [] - #for netspec in spec['networks]: - # networkname = netspec['name'] - # nodespec = spec['networks']['nodes'] - # nodes = [nspec['name'] for nspec in nodespec] - # node_hrns = [networkname + node for node in nodes] - # - self.db.AddSliceToNodes(slice_hrn, node_hrns) + spec = Rspec() + spec.parseString(rspec) + specDict = spec.toDict() + start_time = specDict['start_time'] + end_time = specDict['end_time'] + + rspecs = {} + # only attempt to extract information about the aggregates we know about + for hrn in self.aggregates.keys(): + netspec = spec.getDictByTagNameValue('NetSpec', 'hrn') + if netspec: + # creat a plc dict + tempdict = {'start_time': star_time, 'end_time': end_time, 'networks': netspec} + #convert the plc dict to rpsec dict + resourceDict = RspecDict(tempdict) + # parse rspec dict + tempspec = Rspec() + tempspec.parseDict(resourceDict) + rspecs[hrn] = tempspec.toxml() + + # notify the aggregates + for hrn in self.rspecs.keys(): + self.aggregates[hrn].createSlice(cred, rspecs[hrn]) + return 1 + + def update_slice(self, slice_hrn, rspec, attributes = []): + """ + Update the specifed slice + """ + self.create_slice(slice_hrn, rspec, attributes) def delete_slice_(self, slice_hrn): """ Remove this slice from all components it was previouly associated with and free up the resources it was using. """ - self.db.DeleteSliceFromNodes(self.auth, slicename, self.components) + # XX need to get the correct credential + cred = None + + if self.slices.has_key(slice_hrn): + self.slices.pop(slice_hrn) + self.slices.write() + + for hrn in self.aggregates.keys(): + self.aggregates[hrn].deleteSlice(cred, slice_hrn) + return 1 def start_slice(self, slice_hrn): """ Stop the slice at plc. """ - slicename = hrn_to_plcslicename(slice_hrn) - slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(slice_hrn) - slice_id = slices[0] - atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) - attribute_id = attreibutes[0] - self.shell.UpdateSliceAttribute(self.auth, attribute_id, "1" ) + # XX need to get the correct credential + cred = None + + for hrn in self.aggregates.keys(): + self.aggregates[hrn].startSlice(cred, slice_hrn) return 1 def stop_slice(self, slice_hrn): """ Stop the slice at plc """ - slicename = hrn_to_plcslicename(slice_hrn) - slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(slice_hrn) - slice_id = slices[0] - atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) - attribute_id = attreibutes[0] - self.shell.UpdateSliceAttribute(self.auth, attribute_id, "0") + for hrn in self.aggregates.keys(): + self.aggregates[hrn].startSlice(cred, slice_hrn) return 1 def reset_slice(self, slice_hrn): """ Reset the slice """ - slicename = self.hrn_to_plcslicename(slice_hrn) + # XX not yet implemented return 1 def get_policy(self): diff --git a/geni/util/rspec.py b/geni/util/rspec.py index 38378ade..6a0fac47 100644 --- a/geni/util/rspec.py +++ b/geni/util/rspec.py @@ -159,4 +159,19 @@ class Rspec(): dicts.append(value) return dicts + def getDictByTagNameValue(self, tagname, value, dom = None) + """ + Search the dom for the first element with the specified tagname + and value and return it as a dict. + """ + tempdict = {} + if not dom: + dom = self.rootNode + dicts = self.getDictsByTagName(tagname, dom) + + for rdict in dicts: + if rdict.has_key('name') and rdict['name'] in [value]: + return rdict + + return tempdict # vim:ts=4:expandtab -- 2.43.0