X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fmanagers%2Faggregate_manager_max.py;h=67b115d1d6eaa5259c64be0f2695e84219a0aef4;hb=2d2a85edf02c635b592dfdb52c92dfa97b845f68;hp=949e241ff9bf602a659cc9063b1de5951c1f104b;hpb=1063c68f7a401565891afcc3cbeee5c9e94942b9;p=sfa.git diff --git a/sfa/managers/aggregate_manager_max.py b/sfa/managers/aggregate_manager_max.py index 949e241f..67b115d1 100644 --- a/sfa/managers/aggregate_manager_max.py +++ b/sfa/managers/aggregate_manager_max.py @@ -1,338 +1,335 @@ -#!/usr/bin/python - -from sfa.util.rspec import RSpec -import sys -import pdb -from sfa.util.namespace import hrn_to_pl_slicename, hrn_to_urn -from sfa.util.xrn import urn_to_hrn, get_authority -from sfa.util.rspec import * -from sfa.util.specdict import * -from sfa.util.faults import * -from sfa.util.storage import * -from sfa.util.policy import Policy -from sfa.server.aggregate import Aggregates -from sfa.server.registry import Registries -from sfa.util.faults import * - -import xml.dom.minidom - -SFA_MAX_CONF_FILE = '/etc/sfa/max_allocations' -SFA_MAX_DEFAULT_RSPEC = '/etc/sfa/max_physical.xml' -SFA_MAX_CANNED_RSPEC = '/etc/sfa/max_physical_canned.xml' - -topology = {} - -class SfaOutOfResource(SfaFault): - def __init__(self, interface): - faultString = "Interface " + interface + " not available" - SfaFault.__init__(self, 100, faultString, '') - -class SfaNoPairRSpec(SfaFault): - def __init__(self, interface, interface2): - faultString = "Interface " + interface + " should be paired with " + interface2 - SfaFault.__init__(self, 100, faultString, '') - -# Returns a mapping from interfaces to the nodes they lie on and their peer interfaces -# i -> node,i_peer - -def get_interface_map(): - r = RSpec() - r.parseFile(SFA_MAX_DEFAULT_RSPEC) - rspec = r.toDict() - capacity = rspec['rspec']['capacity'] - netspec = capacity[0]['netspec'][0] - linkdefs = {} - for n in netspec['nodespec']: - ifspecs = n['ifspec'] - nodename = n['node'] - for i in ifspecs: - ifname = i['name'] - linkid = i['linkid'] - - if (linkdefs.has_key(linkid)): - linkdefs[linkid].extend([(nodename,ifname)]) +import os +import time +import re + +#from sfa.util.faults import * +from sfa.util.sfalogging import logger +from sfa.util.sfatime import SFATIME_FORMAT +from sfa.util.config import Config +from sfa.util.callids import Callids +from sfa.util.version import version_core +from sfa.util.xrn import urn_to_hrn, hrn_to_urn, Xrn + +# xxx the sfa.rspecs module is dead - this symbol is now undefined +#from sfa.rspecs.sfa_rspec import sfa_rspec_version + +from sfa.managers.aggregate_manager import AggregateManager + +from sfa.planetlab.plslices import PlSlices + + +class AggregateManagerMax (AggregateManager): + + def __init__(self, config): + pass + + RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec" + + # execute shell command and return both exit code and text output + def shell_execute(self, cmd, timeout): + pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r') + pipe = os.popen(cmd + ' 2>&1', 'r') + text = '' + while timeout: + line = pipe.read() + text += line + time.sleep(1) + timeout = timeout - 1 + code = pipe.close() + if code is None: + code = 0 + if text[-1:] == '\n': + text = text[:-1] + return code, text + + def call_am_apiclient(self, client_app, params, timeout): + """ + call AM API client with command like in the following example: + cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \ + net.geni.aggregate.client.examples.CreateSliceNetworkClient \ + ./repo https://geni:8443/axis2/services/AggregateGENI \ + ... params ... + """ + (client_path, am_url) = Config().get_max_aggrMgr_info() + sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + \ + client_app + " ./repo " + am_url + " " + ' '.join(params) + ret = self.shell_execute(sys_cmd, timeout) + logger.debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret)) + return ret + + # save request RSpec xml content to a tmp file + def save_rspec_to_file(self, rspec): + path = AggregateManagerMax.RSPEC_TMP_FILE_PREFIX + "_" + \ + time.strftime(SFATIME_FORMAT, time.gmtime(time.time())) + ".xml" + file = open(path, "w") + file.write(rspec) + file.close() + return path + + # get stripped down slice id/name plc.maxpl.xislice1 --> maxpl_xislice1 + def get_plc_slice_id(self, cred, xrn): + (hrn, type) = urn_to_hrn(xrn) + slice_id = hrn.find(':') + sep = '.' + if hrn.find(':') != -1: + sep = ':' + elif hrn.find('+') != -1: + sep = '+' + else: + sep = '.' + slice_id = hrn.split(sep)[-2] + '_' + hrn.split(sep)[-1] + return slice_id + + # extract xml + def get_xml_by_tag(self, text, tag): + indx1 = text.find('<' + tag) + indx2 = text.find('/' + tag + '>') + xml = None + if indx1 != -1 and indx2 > indx1: + xml = text[indx1:indx2 + len(tag) + 2] + return xml + + # formerly in aggregate_manager.py but got unused in there... + def _get_registry_objects(self, slice_xrn, creds, users): + """ + + """ + hrn, _ = urn_to_hrn(slice_xrn) + + #hrn_auth = get_authority(hrn) + + # Build up objects that an SFA registry would return if SFA + # could contact the slice's registry directly + reg_objects = None + + if users: + # dont allow special characters in the site login base + #only_alphanumeric = re.compile('[^a-zA-Z0-9]+') + #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower() + slicename = hrn_to_pl_slicename(hrn) + login_base = slicename.split('_')[0] + reg_objects = {} + site = {} + site['site_id'] = 0 + site['name'] = 'geni.%s' % login_base + site['enabled'] = True + site['max_slices'] = 100 + + # Note: + # Is it okay if this login base is the same as one already at this myplc site? + # Do we need uniqueness? Should use hrn_auth instead of just the + # leaf perhaps? + site['login_base'] = login_base + site['abbreviated_name'] = login_base + site['max_slivers'] = 1000 + reg_objects['site'] = site + + slice = {} + + # get_expiration always returns a normalized datetime - no need to + # utcparse + extime = Credential(string=creds[0]).get_expiration() + # If the expiration time is > 60 days from now, set the expiration + # time to 60 days from now + if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60): + extime = datetime.datetime.utcnow() + datetime.timedelta(days=60) + slice['expires'] = int(time.mktime(extime.timetuple())) + slice['hrn'] = hrn + slice['name'] = hrn_to_pl_slicename(hrn) + slice['url'] = hrn + slice['description'] = hrn + slice['pointer'] = 0 + reg_objects['slice_record'] = slice + + reg_objects['users'] = {} + for user in users: + user['key_ids'] = [] + hrn, _ = urn_to_hrn(user['urn']) + user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net" + user['first_name'] = hrn + user['last_name'] = hrn + reg_objects['users'][user['email']] = user + + return reg_objects + + def prepare_slice(self, api, slice_xrn, creds, users): + reg_objects = self._get_registry_objects(slice_xrn, creds, users) + (hrn, type) = urn_to_hrn(slice_xrn) + slices = PlSlices(self.driver) + peer = slices.get_peer(hrn) + sfa_peer = slices.get_sfa_peer(hrn) + slice_record = None + if users: + slice_record = users[0].get('slice_record', {}) + registry = api.registries[api.hrn] + credential = api.getCredential() + # ensure site record exists + site = slices.verify_site(hrn, slice_record, peer, sfa_peer) + # ensure slice record exists + slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) + # ensure person records exists + persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) + + def parse_resources(self, text, slice_xrn): + resources = [] + urn = hrn_to_urn(slice_xrn, 'sliver') + plc_slice = re.search("Slice Status => ([^\n]+)", text) + if plc_slice.group(1) != 'NONE': + res = {} + res['geni_urn'] = urn + '_plc_slice' + res['geni_error'] = '' + res['geni_status'] = 'unknown' + if plc_slice.group(1) == 'CREATED': + res['geni_status'] = 'ready' + resources.append(res) + vlans = re.findall("GRI => ([^\n]+)\n\t Status => ([^\n]+)", text) + for vlan in vlans: + res = {} + res['geni_error'] = '' + res['geni_urn'] = urn + '_vlan_' + vlan[0] + if vlan[1] == 'ACTIVE': + res['geni_status'] = 'ready' + elif vlan[1] == 'FAILED': + res['geni_status'] = 'failed' else: - linkdefs[linkid]=[(nodename,ifname)] - - # topology maps interface x interface -> link,node1,node2 - topology={} - - for k in linkdefs.keys(): - (n1,i1) = linkdefs[k][0] - (n2,i2) = linkdefs[k][1] - - topology[i1] = (n1, i2) - topology[i2] = (n2, i1) - - - return topology - - -def allocations_to_rspec(allocations): - rspec = xml.dom.minidom.parse(SFA_MAX_DEFAULT_RSPEC) - req = rspec.firstChild.appendChild(rspec.createElement("request")) - for (iname,ip) in allocations: - ifspec = req.appendChild(rspec.createElement("ifspec")) - ifspec.setAttribute("name","tns:"+iname) - ifspec.setAttribute("ip",ip) - - return rspec.toxml() - - -def if_endpoints(ifs): - nodes=[] - for l in ifs: - nodes.extend(topology[l][0]) - return nodes - -def lock_state_file(): - # Noop for demo - return True - -def unlock_state_file(): - return True - # Noop for demo - -def read_alloc_dict(): - alloc_dict={} - rows = open(SFA_MAX_CONF_FILE).read().split('\n') - for r in rows: - columns = r.split(' ') - if (len(columns)==2): - hrn = columns[0] - allocs = columns[1].split(',') - ipallocs = map(lambda alloc:alloc.split('/'), allocs) - alloc_dict[hrn]=ipallocs - return alloc_dict - -def commit_alloc_dict(d): - f = open(SFA_MAX_CONF_FILE, 'w') - for hrn in d.keys(): - columns = d[hrn] - ipcolumns = map(lambda x:"/".join(x), columns) - row = hrn+' '+','.join(ipcolumns)+'\n' - f.write(row) - f.close() - -def collapse_alloc_dict(d): - ret = [] - for k in d.keys(): - ret.extend(d[k]) - return ret - - -def alloc_links(api, hrn, links_to_add, links_to_drop): - slicename=hrn_to_pl_slicename(hrn) - for (iface,ip) in links_to_add: - node = topology[iface][0][0] - try: - api.plshell.AddSliceTag(api.plauth, slicename, "ip_addresses", ip, node) - api.plshell.AddSliceTag(api.plauth, slicename, "vsys", "getvlan", node) - except Exception: - # Probably a duplicate tag. XXX July 21 - pass - return True - -def alloc_nodes(api,hrn, requested_ifs): - requested_nodes = if_endpoints(requested_ifs) - create_slice_max_aggregate(api, hrn, requested_nodes) - -# Taken from slices.py - -def create_slice_max_aggregate(api, hrn, nodes): - # Get the slice record - global topology - topology = get_interface_map() - slice = {} - registries = Registries(api) - registry = registries[api.hrn] - credential = api.getCredential() - urn = hrn_to_urn(hrn, 'slice') - records = registry.Resolve(urn, credential) - for record in records: - if record.get_type() in ['slice']: - slice = record.as_dict() - if not slice: - raise RecordNotFound(hrn) - - # Make sure slice exists at plc, if it doesnt add it - slicename = hrn_to_pl_slicename(hrn) - slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids']) - if not slices: - parts = slicename.split("_") - login_base = parts[0] - # if site doesnt exist add it - sites = api.plshell.GetSites(api.plauth, [login_base]) - if not sites: - authority = get_authority(hrn) - authority_urn = hrn_to_urn(authority, 'authority') - site_records = registry.Resolve(authority_urn, credential) - site_record = {} - if not site_records: - raise RecordNotFound(authority) - site_record = site_records[0] - site = site_record.as_dict() - - # add the site - site.pop('site_id') - site_id = api.plshell.AddSite(api.plauth, site) + res['geni_status'] = 'configuring' + resources.append(res) + return resources + + def slice_status(self, api, slice_xrn, creds): + urn = hrn_to_urn(slice_xrn, 'slice') + result = {} + top_level_status = 'unknown' + slice_id = self.get_plc_slice_id(creds, urn) + (ret, output) = self.call_am_apiclient( + "QuerySliceNetworkClient", [slice_id, ], 5) + # parse output into rspec XML + if output.find("Unkown Rspec:") > 0: + top_level_staus = 'failed' + result['geni_resources'] = '' else: - site = sites[0] - - slice_fields = {} - slice_keys = ['name', 'url', 'description'] - for key in slice_keys: - if key in slice and slice[key]: - slice_fields[key] = slice[key] - api.plshell.AddSlice(api.plauth, slice_fields) - slice = slice_fields - slice['node_ids'] = 0 - else: - slice = slices[0] - - # get the list of valid slice users from the registry and make - # they are added to the slice - researchers = record.get('researcher', []) - for researcher in researchers: - person_record = {} - researcher_urn = hrn_to_urn(researcher, 'user') - person_records = registry.Resolve(researcher_urn, credential) - for record in person_records: - if record.get_type() in ['user']: - person_record = record - if not person_record: - pass - person_dict = person_record.as_dict() - persons = api.plshell.GetPersons(api.plauth, [person_dict['email']], - ['person_id', 'key_ids']) - - # Create the person record - if not persons: - person_id=api.plshell.AddPerson(api.plauth, person_dict) - - # The line below enables the user account on the remote aggregate - # soon after it is created. - # without this the user key is not transfered to the slice - # (as GetSlivers returns key of only enabled users), - # which prevents the user from login to the slice. - # We may do additional checks before enabling the user. - - api.plshell.UpdatePerson(api.plauth, person_id, {'enabled' : True}) - key_ids = [] + has_failure = 0 + all_active = 0 + if output.find("Status => FAILED") > 0: + top_level_staus = 'failed' + elif (output.find("Status => ACCEPTED") > 0 or output.find("Status => PENDING") > 0 + or output.find("Status => INSETUP") > 0 or output.find("Status => INCREATE") > 0 + ): + top_level_status = 'configuring' + else: + top_level_status = 'ready' + result['geni_resources'] = self.parse_resources(output, slice_xrn) + result['geni_urn'] = urn + result['geni_status'] = top_level_status + return result + + def create_slice(self, api, xrn, cred, rspec, users): + indx1 = rspec.find("") + if indx1 > -1 and indx2 > indx1: + rspec = rspec[indx1 + len(""):indx2 - 1] + rspec_path = self.save_rspec_to_file(rspec) + self.prepare_slice(api, xrn, cred, users) + slice_id = self.get_plc_slice_id(cred, xrn) + sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + slice_id + "/g\" " + \ + rspec_path + \ + ";sed -i \"s/:rspec=[^:'<\\\" ]*/:rspec=" + \ + slice_id + "/g\" " + rspec_path + ret = self.shell_execute(sys_cmd, 1) + sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + \ + rspec_path + "/g\"" + ret = self.shell_execute(sys_cmd, 1) + (ret, output) = self.call_am_apiclient( + "CreateSliceNetworkClient", [rspec_path, ], 3) + # parse output ? + rspec = " Done! " + return True + + def delete_slice(self, api, xrn, cred): + slice_id = self.get_plc_slice_id(cred, xrn) + (ret, output) = self.call_am_apiclient( + "DeleteSliceNetworkClient", [slice_id, ], 3) + # parse output ? + return 1 + + def get_rspec(self, api, cred, slice_urn): + logger.debug("#### called max-get_rspec") + # geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1 + if slice_urn == None: + (ret, output) = self.call_am_apiclient( + "GetResourceTopology", ['all', '\"\"'], 5) else: - key_ids = persons[0]['key_ids'] - - api.plshell.AddPersonToSlice(api.plauth, person_dict['email'], - slicename) - - # Get this users local keys - keylist = api.plshell.GetKeys(api.plauth, key_ids, ['key']) - keys = [key['key'] for key in keylist] - - # add keys that arent already there - for personkey in person_dict['keys']: - if personkey not in keys: - key = {'key_type': 'ssh', 'key': personkey} - api.plshell.AddPersonKey(api.plauth, person_dict['email'], key) - - # find out where this slice is currently running - nodelist = api.plshell.GetNodes(api.plauth, slice['node_ids'], - ['hostname']) - hostnames = [node['hostname'] for node in nodelist] - - # remove nodes not in rspec - deleted_nodes = list(set(hostnames).difference(nodes)) - # add nodes from rspec - added_nodes = list(set(nodes).difference(hostnames)) - - api.plshell.AddSliceToNodes(api.plauth, slicename, added_nodes) - api.plshell.DeleteSliceFromNodes(api.plauth, slicename, deleted_nodes) - - return 1 - - -def get_rspec(api, creds, options): - # get slice's hrn from options - xrn = options.get('geni_slice_urn', None) - hrn, type = urn_to_hrn(xrn) - # Eg. config line: - # plc.princeton.sapan vlan23,vlan45 - - allocations = read_alloc_dict() - if (hrn and allocations.has_key(hrn)): - ret_rspec = allocations_to_rspec(allocations[hrn]) - else: - ret_rspec = open(SFA_MAX_CANNED_RSPEC).read() - - return (ret_rspec) - - -def create_slice(api, xrn, creds, rspec_xml, users): - global topology - hrn = urn_to_hrn(xrn)[0] - topology = get_interface_map() - - # Check if everything in rspec is either allocated by hrn - # or not allocated at all. - r = RSpec() - r.parseString(rspec_xml) - rspec = r.toDict() - - lock_state_file() - - allocations = read_alloc_dict() - requested_allocations = rspec_to_allocations (rspec) - current_allocations = collapse_alloc_dict(allocations) - try: - current_hrn_allocations=allocations[hrn] - except KeyError: - current_hrn_allocations=[] - - # Check request against current allocations - requested_interfaces = map(lambda(elt):elt[0], requested_allocations) - current_interfaces = map(lambda(elt):elt[0], current_allocations) - current_hrn_interfaces = map(lambda(elt):elt[0], current_hrn_allocations) - - for a in requested_interfaces: - if (a not in current_hrn_interfaces and a in current_interfaces): - raise SfaOutOfResource(a) - if (topology[a][1] not in requested_interfaces): - raise SfaNoPairRSpec(a,topology[a][1]) - # Request OK - - # Allocations to delete - allocations_to_delete = [] - for a in current_hrn_allocations: - if (a not in requested_allocations): - allocations_to_delete.extend([a]) - - # Ok, let's do our thing - alloc_nodes(api, hrn, requested_interfaces) - alloc_links(api, hrn, requested_allocations, allocations_to_delete) - allocations[hrn] = requested_allocations - commit_alloc_dict(allocations) - - unlock_state_file() - - return True - -def rspec_to_allocations(rspec): - ifs = [] - try: - ifspecs = rspec['rspec']['request'][0]['ifspec'] - for l in ifspecs: - ifs.extend([(l['name'].replace('tns:',''),l['ip'])]) - except KeyError: - # Bad RSpec - pass - return ifs - -def main(): - t = get_interface_map() - r = RSpec() - rspec_xml = open(sys.argv[1]).read() - #get_rspec(None,'foo') - create_slice(None, "plc.princeton.sap0", rspec_xml) - -if __name__ == "__main__": - main() + slice_id = self.get_plc_slice_id(cred, slice_urn) + (ret, output) = self.call_am_apiclient( + "GetResourceTopology", ['all', slice_id, ], 5) + # parse output into rspec XML + if output.find("No resouce found") > 0: + rspec = " No resource found " + else: + comp_rspec = self.get_xml_by_tag(output, 'computeResource') + logger.debug("#### computeResource %s" % comp_rspec) + topo_rspec = self.get_xml_by_tag(output, 'topology') + logger.debug("#### topology %s" % topo_rspec) + rspec = " " + if comp_rspec != None: + rspec = rspec + self.get_xml_by_tag(output, 'computeResource') + if topo_rspec != None: + rspec = rspec + self.get_xml_by_tag(output, 'topology') + rspec = rspec + " " + return (rspec) + + def start_slice(self, api, xrn, cred): + # service not supported + return None + + def stop_slice(self, api, xrn, cred): + # service not supported + return None + + def reset_slices(self, api, xrn): + # service not supported + return None + + # GENI AM API Methods + + def SliverStatus(self, api, slice_xrn, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): + return {} + return self.slice_status(api, slice_xrn, creds) + + def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): + return "" + # TODO: create real CreateSliver response rspec + ret = self.create_slice(api, slice_xrn, creds, rspec_string, users) + if ret: + return self.get_rspec(api, creds, slice_xrn) + else: + return " Error! " + + def DeleteSliver(self, api, xrn, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): + return "" + return self.delete_slice(api, xrn, creds) + + # no caching + def ListResources(self, api, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): + return "" + # version_string = "rspec_%s" % (rspec_version.get_version_name()) + slice_urn = options.get('geni_slice_urn') + return self.get_rspec(api, creds, slice_urn) + + def fetch_context(self, slice_hrn, user_hrn, contexts): + """ + Returns the request context required by sfatables. At some point, this mechanism should be changed + to refer to "contexts", which is the information that sfatables is requesting. But for now, we just + return the basic information needed in a dict. + """ + base_context = {'sfa': {'user': {'hrn': user_hrn}}} + return base_context