From 44c9e5344b613012de9280bb621e10dc8563c889 Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Tue, 14 Apr 2009 21:06:02 +0000 Subject: [PATCH] Nodes, Slices classes --- geni/util/nodes.py | 211 ++++++++++++++++++++++++++++++++ geni/util/slices.py | 288 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 499 insertions(+) create mode 100644 geni/util/nodes.py create mode 100644 geni/util/slices.py diff --git a/geni/util/nodes.py b/geni/util/nodes.py new file mode 100644 index 00000000..2578abf1 --- /dev/null +++ b/geni/util/nodes.py @@ -0,0 +1,211 @@ +import os +import time +import datetime +import sys + +from geni.util.misc import * +from geni.util.rspec import * +from geni.util.specdict import * +from geni.util.excep import * +from geni.util.storage import * +from geni.util.debug import log +from geni.util.rspec import * +from geni.util.specdict import * +from geni.aggregate import Aggregates +from geni.util.policy import Policy + +class Nodes(SimpleStorage): + + def __init__(self, api, ttl = 1): + self.api = api + self.ttl = ttl + self.threshold = None + self.nodes_file = os.sep.join([self.api.server_basedir, self.api.interface +'.'+ self.api.hrn + '.nodes']) + SimpleStorage.__init__(self, self.nodes_file) + self.policy = Policy(api) + self.load() + + + def refresh(self): + """ + Update the cached list of nodes + """ + + # Reload components list + now = datetime.datetime.now() + if not self.has_key('threshold') or not self.has_key('timestamp') or \ + now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))): + if self.api.interface in ['aggregate']: + self.refresh_nodes_aggregate() + elif self.api.interface in ['slicemgr']: + self.refresh_nodes_smgr() + + + def refresh_nodes_aggregate(self): + rspec = Rspec() + rspec.parseString(self.get_rspec()) + + # filter nodes according to policy + blist = self.policy['node_blacklist'] + wlist = self.policy['node_whitelist'] + rspec.filter('NodeSpec', 'name', blacklist=blist, whitelist=wlist) + + # extract ifspecs from rspec to get ips' + ips = [] + ifspecs = rspec.getDictsByTagName('IfSpec') + for ifspec in ifspecs: + if ifspec.has_key('addr') and ifspec['addr']: + ips.append(ifspec['addr']) + + # extract nodespecs from rspec to get dns names + hostnames = [] + nodespecs = rspec.getDictsByTagName('NodeSpec') + for nodespec in nodespecs: + if nodespec.has_key('name') and nodespec['name']: + hostnames.append(nodespec['name']) + + # update timestamp and threshold + timestamp = datetime.datetime.now() + hr_timestamp = timestamp.strftime(self.api.time_format) + delta = datetime.timedelta(hours=self.ttl) + threshold = timestamp + delta + hr_threshold = threshold.strftime(self.api.time_format) + + node_details = {} + node_details['rspec'] = rspec.toxml() + node_details['ip'] = ips + node_details['dns'] = hostnames + node_details['timestamp'] = hr_timestamp + node_details['threshold'] = hr_threshold + # save state + self.update(node_details) + self.write() + + def refresh_nodes_smgr(self): + # convert and threshold to ints + if self.has_key('timestamp') and self['timestamp']: + hr_timestamp = self['timestamp'] + timestamp = datetime.datetime.fromtimestamp(time.mktime(time.strptime(hr_timestamp, self.api.time_format))) + hr_threshold = self['threshold'] + threshold = datetime.datetime.fromtimestamp(time.mktime(time.strptime(hr_threshold, self.api.time_format))) + else: + timestamp = datetime.datetime.now() + hr_timestamp = timestamp.strftime(self.api.time_format) + delta = datetime.timedelta(hours=self.ttl) + threshold = timestamp + delta + hr_threshold = threshold.strftime(self.api.time_format) + + start_time = int(timestamp.strftime("%s")) + end_time = int(threshold.strftime("%s")) + duration = end_time - start_time + + aggregates = Aggregates(self.api) + rspecs = {} + networks = [] + rspec = Rspec() + credential = self.api.getCredential() + for aggregate in aggregates: + try: + # get the rspec from the aggregate + agg_rspec = aggregates[aggregate].resources(credential) + # extract the netspec from each aggregates rspec + rspec.parseString(agg_rspec) + networks.extend([{'NetSpec': rspec.getDictsByTagName('NetSpec')}]) + except: + raise + # XX print out to some error log + print >> log, "Error calling list nodes at aggregate %s" % aggregate + # create the rspec dict + resources = {'networks': networks, 'start_time': start_time, 'duration': duration} + resourceDict = {'Rspec': resources} + # convert rspec dict to xml + rspec.parseDict(resourceDict) + + # filter according to policy + blist = self.policy['node_blacklist'] + wlist = self.policy['node_whitelist'] + rspec.filter('NodeSpec', 'name', blacklist=blist, whitelist=wlist) + + # update timestamp and threshold + timestamp = datetime.datetime.now() + hr_timestamp = timestamp.strftime(self.api.time_format) + delta = datetime.timedelta(hours=self.ttl) + threshold = timestamp + delta + hr_threshold = threshold.strftime(self.api.time_format) + + nodedict = {'rspec': rspec.toxml(), + 'timestamp': hr_timestamp, + 'threshold': hr_threshold} + + self.update(nodedict) + self.write() + + + def get_rspec(self, hrn = None): + """ + Get resource information from PLC + """ + + # Get the required nodes + if not hrn: + nodes = self.api.plshell.GetNodes(self.api.plauth) + try: linkspecs = self.api.plshell.GetLinkSpecs() # if call is supported + except: linkspecs = [] + else: + slicename = hrn_to_pl_slicename(hrn) + slices = self.api.plshell.GetSlices(self.api.plauth, [slicename]) + if not slices: + nodes = [] + else: + slice = slices[0] + node_ids = slice['node_ids'] + nodes = self.api.plshell.GetNodes(self.api.plauth, node_ids) + + # Filter out whitelisted nodes + public_nodes = lambda n: n.has_key('slice_ids_whitelist') and not n['slice_ids_whitelist'] + nodes = filter(public_nodes, nodes) + + # Get all network interfaces + interface_ids = [] + for node in nodes: + interface_ids.extend(node['nodenetwork_ids']) + interfaces = self.api.plshell.GetNodeNetworks(self.api.plauth, interface_ids) + interface_dict = {} + for interface in interfaces: + interface_dict[interface['nodenetwork_id']] = interface + + # join nodes with thier interfaces + for node in nodes: + node['interfaces'] = [] + for nodenetwork_id in node['nodenetwork_ids']: + node['interfaces'].append(interface_dict[nodenetwork_id]) + + # convert and threshold to ints + if self.has_key('timestamp') and self['timestamp']: + timestamp = datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['timestamp'], self.api.time_format))) + threshold = datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))) + else: + timestamp = datetime.datetime.now() + delta = datetime.timedelta(hours=self.ttl) + threshold = timestamp + delta + + start_time = int(timestamp.strftime("%s")) + end_time = int(threshold.strftime("%s")) + duration = end_time - start_time + + # create the plc dict + networks = [{'nodes': nodes, + 'name': self.api.hrn, + 'start_time': start_time, + 'duration': duration}] + if not hrn: + networks[0]['links'] = linkspecs + resources = {'networks': networks, 'start_time': start_time, 'duration': duration} + + # convert the plc dict to an rspec dict + resourceDict = RspecDict(resources) + # convert the rspec dict to xml + rspec = Rspec() + rspec.parseDict(resourceDict) + return rspec.toxml() + diff --git a/geni/util/slices.py b/geni/util/slices.py new file mode 100644 index 00000000..f3655633 --- /dev/null +++ b/geni/util/slices.py @@ -0,0 +1,288 @@ +import datetime +import time +from geni.util.misc import * +from geni.util.rspec import * +from geni.util.specdict import * +from geni.util.excep import * +from geni.util.storage import * +from geni.util.debug import log +from geni.aggregate import Aggregates +from geni.registry import Registries + +class Slices(SimpleStorage): + + def __init__(self, api, ttl = .5): + self.api = api + self.ttl = ttl + self.threshold = None + self.slices_file = os.sep.join([self.api.server_basedir, self.api.interface +'.'+ self.api.hrn + '.slices']) + SimpleStorage.__init__(self, self.slices_file) + self.load() + + + def refresh(self): + """ + Update the cached list of slices + """ + # Reload components list + now = datetime.datetime.now() + if not self.has_key('threshold') or not self.has_key('timestamp') or \ + now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))): + if self.api.interface in ['aggregate']: + self.refresh_slices_aggregate() + elif self.api.interface in ['slicemgr']: + self.refresh_slices_smgr() + + def refresh_slices_aggregate(self): + slices = self.api.plshell.GetSlices(self.api.plauth, {}, ['name']) + slice_hrns = [slicename_to_hrn(self.api.hrn, slice['name']) for slice in slices] + + # update timestamp and threshold + timestamp = datetime.datetime.now() + hr_timestamp = timestamp.strftime(self.api.time_format) + delta = datetime.timedelta(hours=self.ttl) + threshold = timestamp + delta + hr_threshold = threshold.strftime(self.api.time_format) + + slice_details = {'hrn': slice_hrns, + 'timestamp': hr_timestamp, + 'threshold': hr_threshold + } + self.update(slice_details) + self.write() + + + def refresh_slices_smgr(self): + slice_hrns = [] + aggregates = Aggregates(self.api) + credential = self.api.getCredential() + for aggregate in aggregates: + try: + slices = aggregates[aggregate].slices(credential) + slice_hrns.extend(slices) + except: + print >> log, "Error calling slices at aggregate %(aggregate)s" % locals() + # update timestamp and threshold + timestamp = datetime.datetime.now() + hr_timestamp = timestamp.strftime(self.api.time_format) + delta = datetime.timedelta(hours=self.ttl) + threshold = timestamp + delta + hr_threshold = threshold.strftime(self.api.time_format) + + slice_details = {'hrn': slice_hrns, + 'timestamp': hr_timestamp, + 'threshold': hr_threshold + } + self.update(slice_details) + self.write() + + + def delete_slice(self, hrn): + if self.api.interface in ['aggregate']: + self.delete_slice_aggregate(hrn) + elif self.api.interface in ['slicemgr']: + self.delete_slice_smgr(hrn) + + def delete_slice_aggregate(self, hrn): + slicename = hrn_to_pl_slicename(hrn) + slices = self.api.plshell.GetSlices(self.api.plauth, [slicename]) + if not slices: + return 1 + slice = slices[0] + + self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, slice['node_ids']) + return 1 + + def delete_slice_smgr(self, hrn): + credential = self.api.getCredential() + aggregates = Aggregates(self.api) + for aggregate in aggregates: + aggregates[aggregate].delete_slice(credential, hrn) + + def create_slice(self, hrn, rspec): + if self.api.interface in ['aggregate']: + self.create_slice_aggregate(hrn, rspec) + elif self.api.interface in ['slicemgr']: + self.create_slice_smgr(hrn, rspec) + + def create_slice_aggregate(self, hrn, rspec): + spec = Rspec(rspec) + # Get the slice record from geni + slice = {} + registries = Registries(self.api) + registry = registries[self.api.hrn] + credential = self.api.getCredential() + records = registry.resolve(credential, hrn) + for record in records: + if record.get_type() in ['slice']: + slice_info = record.as_dict() + slice = slice_info['pl_info'] + if not slice: + raise RecordNotFound(slice_hrn) + + # Make sure slice exists at plc, if it doesnt add it + slicename = hrn_to_pl_slicename(hrn) + slices = self.api.plshell.GetSlices(self.api.plauth, [slicename], ['node_ids']) + if not slices: + parts = slicename.split("_") + login_base = parts[0] + # if site doesnt exist add it + sites = self.api.plshell.GetSites(self.api.plauth, [login_base]) + if not sites: + authority = get_authority(hrn) + site_records = registry.resolve(credential, authority) + site_record = {} + if not site_records: + raise RecordNotFound(authority) + site_record = site_records[0] + site_info = site_record.as_dict() + site = site_info['pl_info'] + + # add the site + site.pop('site_id') + site_id = self.api.plshell.AddSite(self.api.plauth, site) + else: + site = sites[0] + + self.api.plshell.AddSlice(self.api.plauth, slice) + + # get the list of valid slice users from the registry and make + # they are added to the slice + geni_info = slice_info['geni_info'] + researchers = geni_info['researcher'] + for researcher in researchers: + person_record = {} + person_records = registry.resolve(credential, researcher) + for record in person_records: + if record.get_type() in ['user']: + person_record = record + if not person_record: + pass + person_dict = person_record.as_dict()['pl_info'] + persons = self.api.plshell.GetPersons(self.api.plauth, [person_dict['email']], ['person_id', 'key_ids']) + + # Create the person record + if not persons: + self.api.plshell.AddPerson(self.api.plauth, person_dict) + key_ids = [] + else: + key_ids = persons[0]['key_ids'] + + self.api.plshell.AddPersonToSlice(self.api.plauth, person_dict['email'], slicename) + + # Get this users local keys + keylist = self.api.plshell.GetKeys(self.api.plauth, key_ids, ['key']) + keys = [key['key'] for key in keylist] + + # add keys that arent already there + for personkey in person_dict['keys']: + if personkey not in keys: + key = {'key_type': 'ssh', 'key': personkey} + self.api.plshell.AddPersonKey(self.api.plauth, person_dict['email'], key) + + # find out where this slice is currently running + nodelist = self.api.plshell.GetNodes(self.api.plauth, slice['node_ids'], ['hostname']) + hostnames = [node['hostname'] for node in nodelist] + + # get netspec details + nodespecs = spec.getDictsByTagName('NodeSpec') + nodes = [] + for nodespec in nodespecs: + if isinstance(nodespec['name'], list): + nodes.extend(nodespec['name']) + elif isinstance(nodespec['name'], StringTypes): + nodes.append(nodespec['name']) + + # remove nodes not in rspec + deleted_nodes = list(set(hostnames).difference(nodes)) + # add nodes from rspec + added_nodes = list(set(nodes).difference(hostnames)) + + self.api.plshell.AddSliceToNodes(self.api.plauth, slicename, added_nodes) + self.api.plshell.DeleteSliceFromNodes(self.api.plauth, slicename, deleted_nodes) + + return 1 + + def create_slice_smgr(self, hrn, rspec): + spec = Rspec() + tempspec = Rspec() + spec.parseString(rspec) + slicename = hrn_to_pl_slicename(hrn) + specDict = spec.toDict() + if specDict.has_key('Rspec'): specDict = specDict['Rspec'] + if specDict.has_key('start_time'): start_time = specDict['start_time'] + else: start_time = 0 + if specDict.has_key('end_time'): end_time = specDict['end_time'] + else: end_time = 0 + + rspecs = {} + aggregates = Aggregates(self.api) + credential = self.api.getCredential() + # only attempt to extract information about the aggregates we know about + for aggregate in aggregates: + netspec = spec.getDictByTagNameValue('NetSpec', aggregate) + if netspec: + # creat a plc dict + resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec} + resourceDict = {'Rspec': resources} + tempspec.parseDict(resourceDict) + rspecs[aggregate] = tempspec.toxml() + + # notify the aggregates + for aggregate in rspecs.keys(): + try: + aggregates[aggregate].create_slice(credential, hrn, rspecs[aggregate]) + except: + print >> log, "Error creating slice %(hrn)% at aggregate %(aggregate)%" % locals() + + return 1 + + + def start_slice(self, hrn): + if self.api.interface in ['aggregate']: + self.start_slice_aggregate() + elif self.api.interface in ['slicemgr']: + self.start_slice_smgr() + + def start_slice_aggregate(self, hrn): + slicename = hrn_to_pl_slicename(hrn) + slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice_id = slices[0] + attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) + attribute_id = attreibutes[0]['slice_attribute_id'] + self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "1" ) + return 1 + + def start_slice_smgr(self, hrn): + credential = self.api.getCredential() + aggregates = Aggregates() + for aggregate in aggregates: + aggreegates[aggregate].start_slice(credential, hrn) + return 1 + + + def stop_slice(self, hrn): + if self.api.interface in ['aggregate']: + self.stop_slice_aggregate() + elif self.api.interface in ['slicemgr']: + self.stop_slice_smgr() + + def stop_slice_aggregate(self, hrn): + slicename = hrn_to_pl_slicename(hrn) + slices = self.api.plshell.GetSlices(self.api.plauth, {'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice_id = slices[0] + attributes = self.api.plshell.GetSliceAttributes(self.api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) + attribute_id = attributes[0]['slice_attribute_id'] + self.api.plshell.UpdateSliceAttribute(self.api.plauth, attribute_id, "0") + return 1 + + def stop_slice_smgr(self, hrn): + credential = self.api.getCredential() + aggregates = Aggregates() + for aggregate in aggregates: + aggregate[aggregate].stop_slice(credential, hrn) + -- 2.43.0