X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=geni%2Faggregate.py;h=c74355f3b925265a7de67eae601fa129d99c470e;hb=8eb7da3916324c083e30b6dadcf6598e4d34b671;hp=ba539638e2e74674219c556cd503736d7670233f;hpb=f1d8666cb3127bc29027a1a93df3077e909991c7;p=sfa.git diff --git a/geni/aggregate.py b/geni/aggregate.py index ba539638..c74355f3 100644 --- a/geni/aggregate.py +++ b/geni/aggregate.py @@ -4,24 +4,24 @@ import datetime import time import xmlrpclib -from geni.util.geniserver import * +from geni.util.geniserver import GeniServer from geni.util.geniclient import * -from geni.util.cert import * -from geni.util.trustedroot import * +from geni.util.cert import Keypair, Certificate +from geni.util.trustedroot import TrustedRootList from geni.util.excep import * from geni.util.misc import * from geni.util.config import Config from geni.util.rspec import Rspec +from geni.util.specdict import * +from geni.util.storage import SimpleStorage class Aggregate(GeniServer): hrn = None - nodes_file = None nodes_ttl = None - nodes = [] - whitelist_file = None - blacklist_file = None - policy = {} + nodes = None + slices = None + policy = None timestamp = None threshold = None shell = None @@ -35,19 +35,31 @@ class Aggregate(GeniServer): # @param key_file private key filename of registry # @param cert_file certificate filename containing public key (could be a GID file) - def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/util/geni_config"): + def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"): GeniServer.__init__(self, ip, port, key_file, cert_file) - conf = Config(config) - basedir = conf.GENI_BASE_DIR + os.sep - server_basedir = basedir + os.sep + "plc" + os.sep - self.hrn = conf.GENI_INTERFACE_HRN - self.nodes_file = os.sep.join([server_basedir, 'components', self.hrn + '.comp']) - self.whitelist_file = os.sep.join([server_basedir, 'policy', 'whitelist']) - self.blacklist_file = os.sep.join([server_basedir, 'policy', 'blacklist']) - self.timestamp_file = os.sep.join([server_basedir, 'components', self.hrn + '.timestamp']) + self.key_file = key_file + self.cert_file = cert_file + self.conf = Config(config) + basedir = self.conf.GENI_BASE_DIR + os.sep + server_basedir = basedir + os.sep + "geni" + os.sep + self.hrn = self.conf.GENI_INTERFACE_HRN + + nodes_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.components']) + self.nodes = SimpleStorage(nodes_file) + self.nodes.load() + + slices_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.slices']) + self.slices = SimpleStorage(slices_file) + self.slices.load() + + policy_file = os.sep.join([server_basedir, 'agg.policy']) + self.policy = SimpleStorage(policy_file) + self.policy.load() + + timestamp_file = os.sep.join([server_basedir, 'agg.' + self.hrn + '.timestamp']) + self.timestamp = SimpleStorage(timestamp_file) + self.nodes_ttl = 1 - self.policy['whitelist'] = [] - self.policy['blacklist'] = [] self.connectPLC() self.connectRegistry() @@ -55,32 +67,36 @@ class Aggregate(GeniServer): """ Connect to the registry """ - pass + # connect to registry using GeniClient + address = self.config.GENI_REGISTRY_HOSTNAME + port = self.config.GENI_REGISTRY_PORT + url = 'https://%(address)s:%(port)s' % locals() + self.registry = GeniClient(url, self.key_file, self.cert_file) def connectPLC(self): """ Connect to the plc api interface. First attempt to impor thte shell, if that fails try to connect to the xmlrpc server. """ - self.auth = {'Username': conf.GENI_PLC_USER, + self.auth = {'Username': self.conf.GENI_PLC_USER, 'AuthMethod': 'password', - 'AuthString': conf.GENI_PLC_PASSWORD} + 'AuthString': self.conf.GENI_PLC_PASSWORD} try: # try to import PLC.Shell directly - sys.path.append(conf.GENI_PLC_SHELL_PATH) + sys.path.append(self.conf.GENI_PLC_SHELL_PATH) import PLC.Shell self.shell = PLC.Shell.Shell(globals()) self.shell.AuthCheck() except ImportError: # connect to plc api via xmlrpc - plc_host = conf.GENI_PLC_HOST - plc_port = conf.GENI_PLC_PORT - plc_api_path = conf.GENI_PLC_API_PATH + plc_host = self.conf.GENI_PLC_HOST + plc_port = self.conf.GENI_PLC_PORT + plc_api_path = self.conf.GENI_PLC_API_PATH url = "https://%(plc_host)s:%(plc_port)s/%(plc_api_path)s/" % locals() - self.auth = {'Username': conf.GENI_PLC_USER, + self.auth = {'Username': self.conf.GENI_PLC_USER, 'AuthMethod': 'password', - 'AuthString': conf.GENI_PLC_PASSWORD} + 'AuthString': self.conf.GENI_PLC_PASSWORD} self.shell = xmlrpclib.Server(url, verbose = 0, allow_none = True) self.shell.AuthCheck(self.auth) @@ -114,73 +130,45 @@ class Aggregate(GeniServer): site_dict[site['site_id']] = site['login_base'] # convert plc names to geni hrn - self.nodes = [self.hostname_to_hrn(site_dict[node['site_id']], node['hostname']) for node in nodes] - - # apply policy. Do not allow nodes found in blacklist, only allow nodes found in whitelist - whitelist_policy = lambda node: node in self.policy['whitelist'] - blacklist_policy = lambda node: node not in self.policy['blacklist'] + nodedict = {} + for node in nodes: + node_hrn = self.hostname_to_hrn(site_dict[node['site_id']], node['hostname']) + # apply policy. + # Do not allow nodes found in blacklist, only allow nodes found in whitelist + if self.polciy['whitelist'] and node_hrn not in self.polciy['whitelist']: + continue + if self.polciy['blacklist'] and node_hrn in self.policy['blacklist']: + continue + nodedict[node_hrn] = node['hostname'] + + self.nodes = SimpleStorage(self.nodes.db_filename, nodedict) + self.nodes.write() - if self.policy['blacklist']: - self.nodes = blacklist_policy(self.nodes) - if self.policy['whitelist']: - self.nodes = whitelist_policy(self.nodes) - # update timestamp and threshold - self.timestamp = datetime.datetime.now() + self.timestamp['timestamp'] = datetime.datetime.now() delta = datetime.timedelta(hours=self.nodes_ttl) - self.threshold = self.timestamp + delta - - f = open(self.nodes_file, 'w') - f.write(str(self.nodes)) - f.close() - f = open(self.timestamp_file, 'w') - f.write(str(self.threshold)) - f.close() + self.threshold = self.timestamp['timestamp'] + delta + self.timestamp.write() def load_components(self): """ Read cached list of nodes. """ # Read component list from cached file - if os.path.exists(self.nodes_file): - f = open(self.nodes_file, 'r') - self.nodes = eval(f.read()) - f.close() - + self.nodes.load() + self.timestamp.load() time_format = "%Y-%m-%d %H:%M:%S" - if os.path.exists(self.timestamp_file): - f = open(self.timestamp_file, 'r') - timestamp = str(f.read()).split(".")[0] - self.timestamp = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format))) - delta = datetime.timedelta(hours=self.nodes_ttl) - self.threshold = self.timestamp + delta - f.close() + timestamp = self.timestamp['timestamp'] + self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format))) + delta = datetime.timedelta(hours=self.nodes_ttl) + self.threshold = self.timestamp['timestamp'] + delta def load_policy(self): """ Read the list of blacklisted and whitelisted nodes. """ - whitelist = [] - blacklist = [] - if os.path.exists(self.whitelist_file): - f = open(self.whitelist_file, 'r') - lines = f.readlines() - f.close() - for line in lines: - line = line.strip().replace(" ", "").replace("\n", "") - whitelist.extend(line.split(",")) - - - if os.path.exists(self.blacklist_file): - f = open(self.blacklist_file, 'r') - lines = f.readlines() - f.close() - for line in lines: - line = line.strip().replace(" ", "").replace("\n", "") - blacklist.extend(line.split(",")) + self.policy.load() - self.policy['whitelist'] = whitelist - self.policy['blacklist'] = blacklist def get_components(self): """ @@ -189,29 +177,58 @@ class Aggregate(GeniServer): # Reload components list now = datetime.datetime.now() #self.load_components() - if not self.threshold or not self.timestamp or now > self.threshold: + if not self.threshold or not self.timestamp['timestamp'] or now > self.threshold: self.refresh_components() - elif now < self.threshold and not self.nodes: + elif now < self.threshold and not self.nodes.keys(): self.load_components() - return self.nodes + return self.nodes.keys() def get_rspec(self, hrn, type): - rspec = Rspec() - rspec['nodespec'] = {'name': self.conf.GENI_INTERFACE_HRN} - rsepc['nodespec']['nodes'] = [] - if type in ['node']: + """ + Get resource information from PLC + """ + + # Get the required nodes + if type in ['aggregate']: nodes = self.shell.GetNodes(self.auth) elif type in ['slice']: slicename = hrn_to_pl_slicename(hrn) slices = self.shell.GetSlices(self.auth, [slicename]) node_ids = slices[0]['node_ids'] nodes = self.shell.GetNodes(self.auth, node_ids) - for node in nodes: - nodespec = {'name': node['hostname'], 'type': 'std'} - elif type in ['aggregate']: - pass - - return rspec + + # Get all network interfaces + interface_ids = [] + for node in nodes: + interface_ids.extend(node['nodenetwork_ids']) + interfaces = self.shell.GetNodeNetworks(self.auth, interface_ids) + interface_dict = {} + for interface in interfaces: + interface_dict[interface['nodenetwork_id']] = interface + + # join nodes with thier interfaces + for node in nodes: + node['interfaces'] = [] + for nodenetwork_id in node['nodenetwork_ids']: + node['interfaces'].append(interface_dict[nodenetwork_id]) + + # convert and threshold to ints + timestamp = self.timestamp['timestamp'] + start_time = int(self.timestamp['timestamp'].strftime("%s")) + end_time = int(self.threshold.strftime("%s")) + duration = end_time - start_time + + # create the plc dict + networks = {'nodes': nodes, 'name': self.hrn, 'start_time': start_time, 'duration': duration} + resources = {'networks': networks, 'start_time': start_time, 'duration': duration} + + # convert the plc dict to an rspec dict + resouceDict = RspecDict(resources) + + # convert the rspec dict to xml + rspec = Rspec() + rspec.parseDict(resourceDict) + return rspec.toxml() def get_resources(self, slice_hrn): """ @@ -226,10 +243,20 @@ class Aggregate(GeniServer): """ Instantiate the specified slice according to whats defined in the rspec. """ + + # save slice state locally + # we can assume that spec object has been validated so its safer to + # save this instead of the unvalidated rspec the user gave us + self.slices[slice_hrn] = spec.toxml() + self.slices.write() + + # extract node list from rspec slicename = self.hrn_to_plcslicename(slice_hrn) spec = Rspec(rspec) nodespecs = spec.getDictsByTagName('NodeSpec') - nodes = [nodespec['name'] for nodespec in nodespecs] + nodes = [nodespec['name'] for nodespec in nodespecs] + + # add slice to nodes at plc self.shell.AddSliceToNodes(self.auth, slicename, nodes) for attribute in attributes: type, value, node, nodegroup = attribute['type'], attribute['value'], attribute['node'], attribute['nodegroup'] @@ -265,6 +292,13 @@ class Aggregate(GeniServer): spec = Rspec(rspec) nodespecs = spec.getDictsByTagName('NodeSpec') nodes = [nodespec['name'] for nodespec in nodespecs] + + # save slice state locally + # we can assume that spec object has been validated so its safer to + # save this instead of the unvalidated rspec the user gave us + self.slices[slice_hrn] = spec.toxml() + self.slices.write() + # remove nodes not in rspec delete_nodes = set(hostnames).difference(nodes) # add nodes from rspec @@ -282,16 +316,22 @@ class Aggregate(GeniServer): # persons = slice_record['users'] #for person in persons: - # shell.AddPersonToSlice(person['email'], slice_name) + # shell.AddPersonToSlice(person['email'], slice_name) + + def delete_slice_(self, slice_hrn): """ Remove this slice from all components it was previouly associated with and free up the resources it was using. """ + if self.slices.has_key(slice_hrn): + self.slices.pop(slice_hrn) + self.slices.write() + slicename = self.hrn_to_plcslicename(slice_hrn) slices = shell.GetSlices(self.auth, [slicename]) if not slice: - raise RecordNotFound(slice_hrn) + return 1 slice = slices[0] shell.DeleteSliceFromNodes(self.auth, slicename, slice['node_ids']) @@ -304,7 +344,8 @@ class Aggregate(GeniServer): slicename = hrn_to_plcslicename(slice_hrn) slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id']) if not slices: - raise RecordNotFound(slice_hrn) + #raise RecordNotFound(slice_hrn) + return 1 slice_id = slices[0] atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) attribute_id = attreibutes[0] @@ -318,7 +359,8 @@ class Aggregate(GeniServer): slicename = hrn_to_plcslicename(slice_hrn) slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id']) if not slices: - raise RecordNotFound(slice_hrn) + #raise RecordNotFound(slice_hrn) + return 1 slice_id = slices[0] atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) attribute_id = attreibutes[0] @@ -330,7 +372,7 @@ class Aggregate(GeniServer): """ Reset the slice """ - slicename = self.hrn_to_plcslicename(slice_hrn) + # XX not yet implemented return 1 def get_policy(self): @@ -358,30 +400,30 @@ class Aggregate(GeniServer): return self.get_resources(hrn) - def create(self, cred, hrn, rspec): + def createSlice(self, cred, hrn, rspec): self.decode_authentication(cred, 'embed') self.verify_object_belongs_to_me(hrn) - return self.create(hrn) + return self.create_slice(hrn) - def update(self, cred, hrn, rspec): + def updateSlice(self, cred, hrn, rspec): self.decode_authentication(cred, 'embed') self.verify_object_belongs_to_me(hrn) - return self.update(hrn) + return self.update_slice(hrn) - def delete(self, cred, hrn): + def deleteSlice(self, cred, hrn): self.decode_authentication(cred, 'embed') self.verify_object_belongs_to_me(hrn) return self.delete_slice(hrn) - def start(self, cred, hrn): + def startSlice(self, cred, hrn): self.decode_authentication(cred, 'control') - return self.start(hrn) + return self.start_slice(hrn) - def stop(self, cred, hrn): + def stopSlice(self, cred, hrn): self.decode_authentication(cred, 'control') return self.stop(hrn) - def reset(self, cred, hrn): + def resetSlice(self, cred, hrn): self.decode_authentication(cred, 'control') return self.reset(hrn) @@ -396,10 +438,10 @@ class Aggregate(GeniServer): self.server.register_function(self.components) #self.server.register_function(self.slices) self.server.register_function(self.resources) - self.server.register_function(self.create) - self.server.register_function(self.delete) - self.server.register_function(self.start) - self.server.register_function(self.stop) - self.server.register_function(self.reset) + self.server.register_function(self.createSlice) + self.server.register_function(self.deleteSlice) + self.server.register_function(self.startSlice) + self.server.register_function(self.stopSlice) + self.server.register_function(self.resetSlice) self.server.register_function(self.policy)