From: Thierry Parmentelat Date: Tue, 8 Nov 2011 00:12:20 +0000 (+0100) Subject: Merge branch 'upstreammaster' X-Git-Tag: sfa-2.1-24~36 X-Git-Url: http://git.onelab.eu/?p=sfa.git;a=commitdiff_plain;h=68f0f5bf538b2633fceee633be2748aaa3eaa709;hp=e98b602a586412b828057bcebf2b6073697720a5 Merge branch 'upstreammaster' --- diff --git a/sfa.spec b/sfa.spec index d2979e44..55b6ec39 100644 --- a/sfa.spec +++ b/sfa.spec @@ -1,6 +1,6 @@ %define name sfa %define version 1.1 -%define taglevel 1 +%define taglevel 2 %define release %{taglevel}%{?pldistro:.%{pldistro}}%{?date:.%{date}} %global python_sitearch %( python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(1)" ) @@ -197,6 +197,14 @@ fi [ "$1" -ge "1" ] && service sfa-cm restart || : %changelog +* Mon Nov 07 2011 Thierry Parmentelat - sfa-1.1-2 +- checkpoint tag: use SFA_GENERIC_FLAVOUR instead of SFA_*_TYPE +- improvements in the pgv2 rspecs +- driver separated from api +- code starts moving around where it belongs +- sfascan caches getversion across invokations +- vini topology extracted as a config file + * Fri Oct 28 2011 Thierry Parmentelat - sfa-1.1-1 - first support for protogeni rspecs is working - vini no longer needs a specific manager diff --git a/sfa/generic/pl.py b/sfa/generic/pl.py index 167b58af..098a27a3 100644 --- a/sfa/generic/pl.py +++ b/sfa/generic/pl.py @@ -16,9 +16,9 @@ class pl (Generic): def registry_manager_class (self) : return sfa.managers.registry_manager def slicemgr_manager_class (self) : - return sfa.managers.slice_manager + return sfa.managers.slice_manager.SliceManager def aggregate_manager_class (self) : - return sfa.managers.aggregate_manager + return sfa.managers.aggregate_manager.AggregateManager # driver class for server-side services, talk to the whole testbed def driver_class (self): diff --git a/sfa/managers/aggregate_manager.py b/sfa/managers/aggregate_manager.py index df97dfc8..fb49cb68 100644 --- a/sfa/managers/aggregate_manager.py +++ b/sfa/managers/aggregate_manager.py @@ -20,394 +20,372 @@ import sfa.plc.peers as peers from sfa.plc.aggregate import Aggregate from sfa.plc.slices import Slices -def GetVersion(api): +class AggregateManager: - version_manager = VersionManager() - ad_rspec_versions = [] - request_rspec_versions = [] - for rspec_version in version_manager.versions: - if rspec_version.content_type in ['*', 'ad']: - ad_rspec_versions.append(rspec_version.to_dict()) - if rspec_version.content_type in ['*', 'request']: - request_rspec_versions.append(rspec_version.to_dict()) - default_rspec_version = version_manager.get_version("sfa 1").to_dict() - xrn=Xrn(api.hrn) - version_more = {'interface':'aggregate', - 'testbed':'myplc', - 'hrn':xrn.get_hrn(), - 'request_rspec_versions': request_rspec_versions, - 'ad_rspec_versions': ad_rspec_versions, - 'default_ad_rspec': default_rspec_version - } - return version_core(version_more) - -def __get_registry_objects(slice_xrn, creds, users): - """ - - """ - hrn, _ = urn_to_hrn(slice_xrn) - - hrn_auth = get_authority(hrn) - - # Build up objects that an SFA registry would return if SFA - # could contact the slice's registry directly - reg_objects = None - - if users: - # dont allow special characters in the site login base - #only_alphanumeric = re.compile('[^a-zA-Z0-9]+') - #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower() + def __init__ (self): + # xxx Thierry : caching at the aggregate level sounds wrong... + #self.caching=True + self.caching=False + + def GetVersion(self, api): + + version_manager = VersionManager() + ad_rspec_versions = [] + request_rspec_versions = [] + for rspec_version in version_manager.versions: + if rspec_version.content_type in ['*', 'ad']: + ad_rspec_versions.append(rspec_version.to_dict()) + if rspec_version.content_type in ['*', 'request']: + request_rspec_versions.append(rspec_version.to_dict()) + default_rspec_version = version_manager.get_version("sfa 1").to_dict() + xrn=Xrn(api.hrn) + version_more = {'interface':'aggregate', + 'testbed':'myplc', + 'hrn':xrn.get_hrn(), + 'request_rspec_versions': request_rspec_versions, + 'ad_rspec_versions': ad_rspec_versions, + 'default_ad_rspec': default_rspec_version + } + return version_core(version_more) + + def _get_registry_objects(self, slice_xrn, creds, users): + """ + + """ + hrn, _ = urn_to_hrn(slice_xrn) + + hrn_auth = get_authority(hrn) + + # Build up objects that an SFA registry would return if SFA + # could contact the slice's registry directly + reg_objects = None + + if users: + # dont allow special characters in the site login base + #only_alphanumeric = re.compile('[^a-zA-Z0-9]+') + #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower() + slicename = hrn_to_pl_slicename(hrn) + login_base = slicename.split('_')[0] + reg_objects = {} + site = {} + site['site_id'] = 0 + site['name'] = 'geni.%s' % login_base + site['enabled'] = True + site['max_slices'] = 100 + + # Note: + # Is it okay if this login base is the same as one already at this myplc site? + # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps? + site['login_base'] = login_base + site['abbreviated_name'] = login_base + site['max_slivers'] = 1000 + reg_objects['site'] = site + + slice = {} + + # get_expiration always returns a normalized datetime - no need to utcparse + extime = Credential(string=creds[0]).get_expiration() + # If the expiration time is > 60 days from now, set the expiration time to 60 days from now + if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60): + extime = datetime.datetime.utcnow() + datetime.timedelta(days=60) + slice['expires'] = int(time.mktime(extime.timetuple())) + slice['hrn'] = hrn + slice['name'] = hrn_to_pl_slicename(hrn) + slice['url'] = hrn + slice['description'] = hrn + slice['pointer'] = 0 + reg_objects['slice_record'] = slice + + reg_objects['users'] = {} + for user in users: + user['key_ids'] = [] + hrn, _ = urn_to_hrn(user['urn']) + user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net" + user['first_name'] = hrn + user['last_name'] = hrn + reg_objects['users'][user['email']] = user + + return reg_objects + + def SliverStatus(self, api, slice_xrn, creds, call_id): + if Callids().already_handled(call_id): return {} + + (hrn, _) = urn_to_hrn(slice_xrn) + # find out where this slice is currently running slicename = hrn_to_pl_slicename(hrn) - login_base = slicename.split('_')[0] - reg_objects = {} - site = {} - site['site_id'] = 0 - site['name'] = 'geni.%s' % login_base - site['enabled'] = True - site['max_slices'] = 100 - - # Note: - # Is it okay if this login base is the same as one already at this myplc site? - # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps? - site['login_base'] = login_base - site['abbreviated_name'] = login_base - site['max_slivers'] = 1000 - reg_objects['site'] = site - - slice = {} - # get_expiration always returns a normalized datetime - no need to utcparse - extime = Credential(string=creds[0]).get_expiration() - # If the expiration time is > 60 days from now, set the expiration time to 60 days from now - if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60): - extime = datetime.datetime.utcnow() + datetime.timedelta(days=60) - slice['expires'] = int(time.mktime(extime.timetuple())) - slice['hrn'] = hrn - slice['name'] = hrn_to_pl_slicename(hrn) - slice['url'] = hrn - slice['description'] = hrn - slice['pointer'] = 0 - reg_objects['slice_record'] = slice - - reg_objects['users'] = {} - for user in users: - user['key_ids'] = [] - hrn, _ = urn_to_hrn(user['urn']) - user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net" - user['first_name'] = hrn - user['last_name'] = hrn - reg_objects['users'][user['email']] = user - - return reg_objects - -def __get_hostnames(nodes): - hostnames = [] - for node in nodes: - hostnames.append(node.hostname) - return hostnames - -def SliverStatus(api, slice_xrn, creds, call_id): - if Callids().already_handled(call_id): return {} - - (hrn, _) = urn_to_hrn(slice_xrn) - # find out where this slice is currently running - slicename = hrn_to_pl_slicename(hrn) - - slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires']) - if len(slices) == 0: - raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename)) - slice = slices[0] - - # report about the local nodes only - nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None}, - ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact']) - site_ids = [node['site_id'] for node in nodes] - - result = {} - top_level_status = 'unknown' - if nodes: - top_level_status = 'ready' - slice_urn = Xrn(slice_xrn, 'slice').get_urn() - result['geni_urn'] = slice_urn - result['pl_login'] = slice['name'] - result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime() - - resources = [] - for node in nodes: - res = {} - res['pl_hostname'] = node['hostname'] - res['pl_boot_state'] = node['boot_state'] - res['pl_last_contact'] = node['last_contact'] - if node['last_contact'] is not None: - res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime() - sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) - res['geni_urn'] = sliver_id - if node['boot_state'] == 'boot': - res['geni_status'] = 'ready' - else: - res['geni_status'] = 'failed' - top_level_status = 'failed' + slices = api.driver.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires']) + if len(slices) == 0: + raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename)) + slice = slices[0] + + # report about the local nodes only + nodes = api.driver.GetNodes({'node_id':slice['node_ids'],'peer_id':None}, + ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact']) + site_ids = [node['site_id'] for node in nodes] + + result = {} + top_level_status = 'unknown' + if nodes: + top_level_status = 'ready' + slice_urn = Xrn(slice_xrn, 'slice').get_urn() + result['geni_urn'] = slice_urn + result['pl_login'] = slice['name'] + result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime() + + resources = [] + for node in nodes: + res = {} + res['pl_hostname'] = node['hostname'] + res['pl_boot_state'] = node['boot_state'] + res['pl_last_contact'] = node['last_contact'] + if node['last_contact'] is not None: + res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime() + sliver_id = urn_to_sliver_id(slice_urn, slice['slice_id'], node['node_id']) + res['geni_urn'] = sliver_id + if node['boot_state'] == 'boot': + res['geni_status'] = 'ready' + else: + res['geni_status'] = 'failed' + top_level_status = 'failed' + + res['geni_error'] = '' + + resources.append(res) - res['geni_error'] = '' - - resources.append(res) + result['geni_status'] = top_level_status + result['geni_resources'] = resources + return result + + def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id): + """ + Create the sliver[s] (slice) at this aggregate. + Verify HRN and initialize the slice record in PLC if necessary. + """ + if Callids().already_handled(call_id): return "" + + aggregate = Aggregate(api) + slices = Slices(api) + (hrn, _) = urn_to_hrn(slice_xrn) + peer = slices.get_peer(hrn) + sfa_peer = slices.get_sfa_peer(hrn) + slice_record=None + if users: + slice_record = users[0].get('slice_record', {}) + + # parse rspec + rspec = RSpec(rspec_string) + requested_attributes = rspec.version.get_slice_attributes() - result['geni_status'] = top_level_status - result['geni_resources'] = resources - return result - -def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id): - """ - Create the sliver[s] (slice) at this aggregate. - Verify HRN and initialize the slice record in PLC if necessary. - """ - if Callids().already_handled(call_id): return "" - - aggregate = Aggregate(api) - slices = Slices(api) - (hrn, _) = urn_to_hrn(slice_xrn) - peer = slices.get_peer(hrn) - sfa_peer = slices.get_sfa_peer(hrn) - slice_record=None - if users: - slice_record = users[0].get('slice_record', {}) - - # parse rspec - rspec = RSpec(rspec_string) - requested_attributes = rspec.version.get_slice_attributes() - - # ensure site record exists - site = slices.verify_site(hrn, slice_record, peer, sfa_peer) - # ensure slice record exists - slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) - # ensure person records exists - persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) - # ensure slice attributes exists - slices.verify_slice_attributes(slice, requested_attributes) - - # add/remove slice from nodes - requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()] - slices.verify_slice_nodes(slice, requested_slivers, peer) - - aggregate.prepare_nodes({'hostname': requested_slivers}) - aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()}) - slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate) - - # handle MyPLC peer association. - # only used by plc and ple. - slices.handle_peer(site, slice, persons, peer) + # ensure site record exists + site = slices.verify_site(hrn, slice_record, peer, sfa_peer) + # ensure slice record exists + slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) + # ensure person records exists + persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) + # ensure slice attributes exists + slices.verify_slice_attributes(slice, requested_attributes) + + # add/remove slice from nodes + requested_slivers = [str(host) for host in rspec.version.get_nodes_with_slivers()] + slices.verify_slice_nodes(slice, requested_slivers, peer) + + aggregate.prepare_nodes({'hostname': requested_slivers}) + aggregate.prepare_interfaces({'node_id': aggregate.nodes.keys()}) + slices.verify_slice_links(slice, rspec.version.get_link_requests(), aggregate) + + # handle MyPLC peer association. + # only used by plc and ple. + slices.handle_peer(site, slice, persons, peer) + + return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version) + + + def RenewSliver(self, api, xrn, creds, expiration_time, call_id): + if Callids().already_handled(call_id): return True + (hrn, _) = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice = slices[0] + requested_time = utcparse(expiration_time) + record = {'expires': int(time.mktime(requested_time.timetuple()))} + try: + api.driver.UpdateSlice(slice['slice_id'], record) + return True + except: + return False + + def start_slice(self, api, xrn, creds): + (hrn, _) = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice_id = slices[0]['slice_id'] + slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id']) + # just remove the tag if it exists + if slice_tags: + api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id']) - return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version) - - -def RenewSliver(api, xrn, creds, expiration_time, call_id): - if Callids().already_handled(call_id): return True - (hrn, _) = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(hrn) - slice = slices[0] - requested_time = utcparse(expiration_time) - record = {'expires': int(time.mktime(requested_time.timetuple()))} - try: - api.driver.UpdateSlice(slice['slice_id'], record) - return True - except: - return False - -def start_slice(api, xrn, creds): - (hrn, _) = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(hrn) - slice_id = slices[0]['slice_id'] - slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id']) - # just remove the tag if it exists - if slice_tags: - api.driver.DeleteSliceTag(slice_tags[0]['slice_tag_id']) - - return 1 - -def stop_slice(api, xrn, creds): - hrn, _ = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(hrn) - slice_id = slices[0]['slice_id'] - slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}) - if not slice_tags: - api.driver.AddSliceTag(slice_id, 'enabled', '0') - elif slice_tags[0]['value'] != "0": - tag_id = slice_tags[0]['slice_tag_id'] - api.driver.UpdateSliceTag(tag_id, '0') - return 1 - -def reset_slice(api, xrn): - # XX not implemented at this interface - return 1 - -def DeleteSliver(api, xrn, creds, call_id): - if Callids().already_handled(call_id): return "" - (hrn, _) = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.driver.GetSlices({'name': slicename}) - if not slices: return 1 - slice = slices[0] - - # determine if this is a peer slice - peer = peers.get_peer(api, hrn) - try: - if peer: - api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer) - api.driver.DeleteSliceFromNodes(slicename, slice['node_ids']) - finally: - if peer: - api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id']) - return 1 - -# xxx Thierry : caching at the aggregate level sounds wrong... -#caching=True -caching=False -def ListSlices(api, creds, call_id): - if Callids().already_handled(call_id): return [] - # look in cache first - if caching and api.cache: - slices = api.cache.get('slices') - if slices: - return slices - - # get data from db - slices = api.driver.GetSlices({'peer_id': None}, ['name']) - slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices] - slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns] - - # cache the result - if caching and api.cache: - api.cache.add('slices', slice_urns) - - return slice_urns + + def stop_slice(self, api, xrn, creds): + hrn, _ = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.driver.GetSlices({'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice_id = slices[0]['slice_id'] + slice_tags = api.driver.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}) + if not slice_tags: + api.driver.AddSliceTag(slice_id, 'enabled', '0') + elif slice_tags[0]['value'] != "0": + tag_id = slice_tags[0]['slice_tag_id'] + api.driver.UpdateSliceTag(tag_id, '0') + return 1 -def ListResources(api, creds, options, call_id): - if Callids().already_handled(call_id): return "" - # get slice's hrn from options - xrn = options.get('geni_slice_urn', None) - (hrn, _) = urn_to_hrn(xrn) - - version_manager = VersionManager() - # get the rspec's return format from options - rspec_version = version_manager.get_version(options.get('rspec_version')) - version_string = "rspec_%s" % (rspec_version.to_string()) - - #panos adding the info option to the caching key (can be improved) - if options.get('info'): - version_string = version_string + "_"+options.get('info', 'default') - - # look in cache first - if caching and api.cache and not xrn: - rspec = api.cache.get(version_string) - if rspec: - api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn) - return rspec - - #panos: passing user-defined options - #print "manager options = ",options - aggregate = Aggregate(api, options) - rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version) - - # cache the result - if caching and api.cache and not xrn: - api.cache.add(version_string, rspec) - - return rspec - - -def get_ticket(api, xrn, creds, rspec, users): - - (slice_hrn, _) = urn_to_hrn(xrn) - slices = Slices(api) - peer = slices.get_peer(slice_hrn) - sfa_peer = slices.get_sfa_peer(slice_hrn) - - # get the slice record - credential = api.getCredential() - interface = api.registries[api.hrn] - registry = api.server_proxy(interface, credential) - records = registry.Resolve(xrn, credential) - - # make sure we get a local slice record - record = None - for tmp_record in records: - if tmp_record['type'] == 'slice' and \ - not tmp_record['peer_authority']: -#Error (E0602, get_ticket): Undefined variable 'SliceRecord' - record = SliceRecord(dict=tmp_record) - if not record: - raise RecordNotFound(slice_hrn) - - # similar to CreateSliver, we must verify that the required records exist - # at this aggregate before we can issue a ticket - # parse rspec - rspec = RSpec(rspec_string) - requested_attributes = rspec.version.get_slice_attributes() - - # ensure site record exists - site = slices.verify_site(hrn, slice_record, peer, sfa_peer) - # ensure slice record exists - slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) - # ensure person records exists - persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) - # ensure slice attributes exists - slices.verify_slice_attributes(slice, requested_attributes) - - # get sliver info - slivers = slices.get_slivers(slice_hrn) - - if not slivers: - raise SliverDoesNotExist(slice_hrn) - - # get initscripts - initscripts = [] - data = { - 'timestamp': int(time.time()), - 'initscripts': initscripts, - 'slivers': slivers - } - - # create the ticket - object_gid = record.get_gid_object() - new_ticket = SfaTicket(subject = object_gid.get_subject()) - new_ticket.set_gid_caller(api.auth.client_gid) - new_ticket.set_gid_object(object_gid) - new_ticket.set_issuer(key=api.key, subject=api.hrn) - new_ticket.set_pubkey(object_gid.get_pubkey()) - new_ticket.set_attributes(data) - new_ticket.set_rspec(rspec) - #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) - new_ticket.encode() - new_ticket.sign() - - return new_ticket.save_to_string(save_parents=True) - - - -#def main(): -# """ -# rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan') -# #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz') -# #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius') -# print rspec -# """ -# api = PlcSfaApi() -# f = open(sys.argv[1]) -# xml = f.read() -# f.close() -##Error (E1120, main): No value passed for parameter 'users' in function call -##Error (E1120, main): No value passed for parameter 'call_id' in function call -# CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan') -# -#if __name__ == "__main__": -# main() + def reset_slice(self, api, xrn): + # XX not implemented at this interface + return 1 + + def DeleteSliver(self, api, xrn, creds, call_id): + if Callids().already_handled(call_id): return "" + (hrn, _) = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.driver.GetSlices({'name': slicename}) + if not slices: + return 1 + slice = slices[0] + + # determine if this is a peer slice + peer = peers.get_peer(api, hrn) + try: + if peer: + api.driver.UnBindObjectFromPeer('slice', slice['slice_id'], peer) + api.driver.DeleteSliceFromNodes(slicename, slice['node_ids']) + finally: + if peer: + api.driver.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id']) + return 1 + + def ListSlices(self, api, creds, call_id): + if Callids().already_handled(call_id): return [] + # look in cache first + if self.caching and api.cache: + slices = api.cache.get('slices') + if slices: + return slices + + # get data from db + slices = api.driver.GetSlices({'peer_id': None}, ['name']) + slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices] + slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns] + + # cache the result + if self.caching and api.cache: + api.cache.add('slices', slice_urns) + + return slice_urns + + def ListResources(self, api, creds, options, call_id): + if Callids().already_handled(call_id): return "" + # get slice's hrn from options + xrn = options.get('geni_slice_urn', None) + (hrn, _) = urn_to_hrn(xrn) + + version_manager = VersionManager() + # get the rspec's return format from options + rspec_version = version_manager.get_version(options.get('rspec_version')) + version_string = "rspec_%s" % (rspec_version.to_string()) + + #panos adding the info option to the caching key (can be improved) + if options.get('info'): + version_string = version_string + "_"+options.get('info', 'default') + + # look in cache first + if self.caching and api.cache and not xrn: + rspec = api.cache.get(version_string) + if rspec: + api.logger.info("aggregate.ListResources: returning cached value for hrn %s"%hrn) + return rspec + + #panos: passing user-defined options + #print "manager options = ",options + aggregate = Aggregate(api, options) + rspec = aggregate.get_rspec(slice_xrn=xrn, version=rspec_version) + + # cache the result + if self.caching and api.cache and not xrn: + api.cache.add(version_string, rspec) + + return rspec + + + def get_ticket(self, api, xrn, creds, rspec, users): + + (slice_hrn, _) = urn_to_hrn(xrn) + slices = Slices(api) + peer = slices.get_peer(slice_hrn) + sfa_peer = slices.get_sfa_peer(slice_hrn) + + # get the slice record + credential = api.getCredential() + interface = api.registries[api.hrn] + registry = api.server_proxy(interface, credential) + records = registry.Resolve(xrn, credential) + + # make sure we get a local slice record + record = None + for tmp_record in records: + if tmp_record['type'] == 'slice' and \ + not tmp_record['peer_authority']: + #Error (E0602, get_ticket): Undefined variable 'SliceRecord' + record = SliceRecord(dict=tmp_record) + if not record: + raise RecordNotFound(slice_hrn) + + # similar to CreateSliver, we must verify that the required records exist + # at this aggregate before we can issue a ticket + # parse rspec + rspec = RSpec(rspec_string) + requested_attributes = rspec.version.get_slice_attributes() + + # ensure site record exists + site = slices.verify_site(hrn, slice_record, peer, sfa_peer) + # ensure slice record exists + slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) + # ensure person records exists + persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) + # ensure slice attributes exists + slices.verify_slice_attributes(slice, requested_attributes) + + # get sliver info + slivers = slices.get_slivers(slice_hrn) + + if not slivers: + raise SliverDoesNotExist(slice_hrn) + + # get initscripts + initscripts = [] + data = { + 'timestamp': int(time.time()), + 'initscripts': initscripts, + 'slivers': slivers + } + + # create the ticket + object_gid = record.get_gid_object() + new_ticket = SfaTicket(subject = object_gid.get_subject()) + new_ticket.set_gid_caller(api.auth.client_gid) + new_ticket.set_gid_object(object_gid) + new_ticket.set_issuer(key=api.key, subject=api.hrn) + new_ticket.set_pubkey(object_gid.get_pubkey()) + new_ticket.set_attributes(data) + new_ticket.set_rspec(rspec) + #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) + new_ticket.encode() + new_ticket.sign() + + return new_ticket.save_to_string(save_parents=True) diff --git a/sfa/managers/aggregate_manager_eucalyptus.py b/sfa/managers/aggregate_manager_eucalyptus.py index 3f04ce94..4d73ab17 100644 --- a/sfa/managers/aggregate_manager_eucalyptus.py +++ b/sfa/managers/aggregate_manager_eucalyptus.py @@ -15,11 +15,12 @@ from xmlbuilder import XMLBuilder from lxml import etree as ET from sqlobject import * -from sfa.util.faults import * +from sfa.util.faults import InvalidRSpec, from sfa.util.xrn import urn_to_hrn, Xrn from sfa.util.plxrn import hrn_to_pl_slicename, slicename_to_hrn from sfa.util.callids import Callids -from sfa.util.sfalogging import logger +#comes with its own logging +#from sfa.util.sfalogging import logger from sfa.util.version import version_core from sfa.trust.credential import Credential @@ -27,21 +28,9 @@ from sfa.trust.credential import Credential from sfa.server.sfaapi import SfaApi from sfa.plc.aggregate import Aggregate -from sfa.plc.slices import * -from sfa.rspecs.sfa_rspec import sfa_rspec_version - - -## -# The data structure used to represent a cloud. -# It contains the cloud name, its ip address, image information, -# key pairs, and clusters information. -# -cloud = {} - -## -# The location of the RelaxNG schema. -# -EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng' +from sfa.plc.slices import Slice, Slices +# not sure what this used to be nor where it is now defined +#from sfa.rspecs.sfa_rspec import sfa_rspec_version ## # Meta data of an instance. @@ -80,9 +69,6 @@ class EucaInstance(SQLObject): (self.image_id, self.kernel_id, self.ramdisk_id, self.inst_type, self.key_pair)) - # XXX The return statement is for testing. REMOVE in production - #return - try: reservation = botoConn.run_instances(self.image_id, kernel_id = self.kernel_id, @@ -109,143 +95,6 @@ class Slice(SQLObject): #slice_index = DatabaseIndex('slice_hrn') instances = MultipleJoin('EucaInstance') -## -# Initialize the aggregate manager by reading a configuration file. -# -def init_server(): - logger = logging.getLogger('EucaAggregate') - fileHandler = logging.FileHandler('/var/log/euca.log') - fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) - logger.addHandler(fileHandler) - fileHandler.setLevel(logging.DEBUG) - logger.setLevel(logging.DEBUG) - - configParser = ConfigParser() - configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf']) - if len(configParser.sections()) < 1: - logger.error('No cloud defined in the config file') - raise Exception('Cannot find cloud definition in configuration file.') - - # Only read the first section. - cloudSec = configParser.sections()[0] - cloud['name'] = cloudSec - cloud['access_key'] = configParser.get(cloudSec, 'access_key') - cloud['secret_key'] = configParser.get(cloudSec, 'secret_key') - cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url') - cloudURL = cloud['cloud_url'] - if cloudURL.find('https://') >= 0: - cloudURL = cloudURL.replace('https://', '') - elif cloudURL.find('http://') >= 0: - cloudURL = cloudURL.replace('http://', '') - (cloud['ip'], parts) = cloudURL.split(':') - - # Create image bundles - images = getEucaConnection().get_all_images() - cloud['images'] = images - cloud['imageBundles'] = {} - for i in images: - if i.type != 'machine' or i.kernel_id is None: continue - name = os.path.dirname(i.location) - detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} - cloud['imageBundles'][name] = detail - - # Initialize sqlite3 database and tables. - dbPath = '/etc/sfa/db' - dbName = 'euca_aggregate.db' - - if not os.path.isdir(dbPath): - logger.info('%s not found. Creating directory ...' % dbPath) - os.mkdir(dbPath) - - conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName)) - sqlhub.processConnection = conn - Slice.createTable(ifNotExists=True) - EucaInstance.createTable(ifNotExists=True) - Meta.createTable(ifNotExists=True) - - # Start the update process to keep track of the meta data - # about Eucalyptus instance. - Process(target=updateMeta).start() - - # Make sure the schema exists. - if not os.path.exists(EUCALYPTUS_RSPEC_SCHEMA): - err = 'Cannot location schema at %s' % EUCALYPTUS_RSPEC_SCHEMA - logger.error(err) - raise Exception(err) - -## -# Creates a connection to Eucalytpus. This function is inspired by -# the make_connection() in Euca2ools. -# -# @return A connection object or None -# -def getEucaConnection(): - global cloud - accessKey = cloud['access_key'] - secretKey = cloud['secret_key'] - eucaURL = cloud['cloud_url'] - useSSL = False - srvPath = '/' - eucaPort = 8773 - logger = logging.getLogger('EucaAggregate') - - if not accessKey or not secretKey or not eucaURL: - logger.error('Please set ALL of the required environment ' \ - 'variables by sourcing the eucarc file.') - return None - - # Split the url into parts - if eucaURL.find('https://') >= 0: - useSSL = True - eucaURL = eucaURL.replace('https://', '') - elif eucaURL.find('http://') >= 0: - useSSL = False - eucaURL = eucaURL.replace('http://', '') - (eucaHost, parts) = eucaURL.split(':') - if len(parts) > 1: - parts = parts.split('/') - eucaPort = int(parts[0]) - parts = parts[1:] - srvPath = '/'.join(parts) - - return boto.connect_ec2(aws_access_key_id=accessKey, - aws_secret_access_key=secretKey, - is_secure=useSSL, - region=RegionInfo(None, 'eucalyptus', eucaHost), - port=eucaPort, - path=srvPath) - -## -# Returns a string of keys that belong to the users of the given slice. -# @param sliceHRN The hunman readable name of the slice. -# @return sting() -# -# This method is no longer needed because the user keys are passed into -# CreateSliver -# -def getKeysForSlice(api, sliceHRN): - logger = logging.getLogger('EucaAggregate') - cred = api.getCredential() - registry = api.registries[api.hrn] - keys = [] - - # Get the slice record - records = registry.Resolve(sliceHRN, cred) - if not records: - logging.warn('Cannot find any record for slice %s' % sliceHRN) - return [] - - # Find who can log into this slice - persons = records[0]['persons'] - - # Extract the keys from persons records - for p in persons: - sliceUser = registry.Resolve(p, cred) - userKeys = sliceUser[0]['keys'] - keys += userKeys - - return '\n'.join(keys) - ## # A class that builds the RSpec for Eucalyptus. # @@ -422,322 +271,423 @@ class ZoneResultParser(object): return clusterList -def ListResources(api, creds, options, call_id): - if Callids().already_handled(call_id): return "" - global cloud - # get slice's hrn from options - xrn = options.get('geni_slice_urn', '') - hrn, type = urn_to_hrn(xrn) - logger = logging.getLogger('EucaAggregate') - - # get hrn of the original caller - origin_hrn = options.get('origin_hrn', None) - if not origin_hrn: - origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() - - conn = getEucaConnection() - - if not conn: - logger.error('Cannot create a connection to Eucalyptus') - return 'Cannot create a connection to Eucalyptus' - - try: - # Zones - zones = conn.get_all_zones(['verbose']) - p = ZoneResultParser(zones) - clusters = p.parse() - cloud['clusters'] = clusters - - # Images - images = conn.get_all_images() - cloud['images'] = images - cloud['imageBundles'] = {} +class AggregateManagerEucalyptus: + + # The data structure used to represent a cloud. + # It contains the cloud name, its ip address, image information, + # key pairs, and clusters information. + cloud = {} + + # The location of the RelaxNG schema. + EUCALYPTUS_RSPEC_SCHEMA='/etc/sfa/eucalyptus.rng' + + _inited=False + + # the init_server mechanism has vanished + def __init__ (self): + if AggregateManagerEucalyptus._inited: return + AggregateManagerEucalyptus.init_server() + + # Initialize the aggregate manager by reading a configuration file. + @staticmethod + def init_server(): + logger = logging.getLogger('EucaAggregate') + fileHandler = logging.FileHandler('/var/log/euca.log') + fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(fileHandler) + fileHandler.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + + configParser = ConfigParser() + configParser.read(['/etc/sfa/eucalyptus_aggregate.conf', 'eucalyptus_aggregate.conf']) + if len(configParser.sections()) < 1: + logger.error('No cloud defined in the config file') + raise Exception('Cannot find cloud definition in configuration file.') + + # Only read the first section. + cloudSec = configParser.sections()[0] + AggregateManagerEucalyptus.cloud['name'] = cloudSec + AggregateManagerEucalyptus.cloud['access_key'] = configParser.get(cloudSec, 'access_key') + AggregateManagerEucalyptus.cloud['secret_key'] = configParser.get(cloudSec, 'secret_key') + AggregateManagerEucalyptus.cloud['cloud_url'] = configParser.get(cloudSec, 'cloud_url') + cloudURL = AggregateManagerEucalyptus.cloud['cloud_url'] + if cloudURL.find('https://') >= 0: + cloudURL = cloudURL.replace('https://', '') + elif cloudURL.find('http://') >= 0: + cloudURL = cloudURL.replace('http://', '') + (AggregateManagerEucalyptus.cloud['ip'], parts) = cloudURL.split(':') + + # Create image bundles + images = self.getEucaConnection().get_all_images() + AggregateManagerEucalyptus.cloud['images'] = images + AggregateManagerEucalyptus.cloud['imageBundles'] = {} for i in images: if i.type != 'machine' or i.kernel_id is None: continue name = os.path.dirname(i.location) detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} - cloud['imageBundles'][name] = detail - - # Key Pairs - keyPairs = conn.get_all_key_pairs() - cloud['keypairs'] = keyPairs - - if hrn: - instanceId = [] - instances = [] - - # Get the instances that belong to the given slice from sqlite3 - # XXX use getOne() in production because the slice's hrn is supposed - # to be unique. For testing, uniqueness is turned off in the db. - # If the slice isn't found in the database, create a record for the - # slice. - matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn)) - if matchedSlices: - theSlice = matchedSlices[-1] - else: - theSlice = Slice(slice_hrn = hrn) - for instance in theSlice.instances: - instanceId.append(instance.instance_id) - - # Get the information about those instances using their ids. - if len(instanceId) > 0: - reservations = conn.get_all_instances(instanceId) - else: - reservations = [] + AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail + + # Initialize sqlite3 database and tables. + dbPath = '/etc/sfa/db' + dbName = 'euca_aggregate.db' + + if not os.path.isdir(dbPath): + logger.info('%s not found. Creating directory ...' % dbPath) + os.mkdir(dbPath) + + conn = connectionForURI('sqlite://%s/%s' % (dbPath, dbName)) + sqlhub.processConnection = conn + Slice.createTable(ifNotExists=True) + EucaInstance.createTable(ifNotExists=True) + Meta.createTable(ifNotExists=True) + + # Start the update process to keep track of the meta data + # about Eucalyptus instance. + Process(target=AggregateManagerEucalyptus.updateMeta).start() + + # Make sure the schema exists. + if not os.path.exists(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA): + err = 'Cannot location schema at %s' % AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA + logger.error(err) + raise Exception(err) + + # + # A separate process that will update the meta data. + # + @staticmethod + def updateMeta(): + logger = logging.getLogger('EucaMeta') + fileHandler = logging.FileHandler('/var/log/euca_meta.log') + fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) + logger.addHandler(fileHandler) + fileHandler.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + + while True: + sleep(30) + + # Get IDs of the instances that don't have IPs yet. + dbResults = Meta.select( + AND(Meta.q.pri_addr == None, + Meta.q.state != 'deleted') + ) + dbResults = list(dbResults) + logger.debug('[update process] dbResults: %s' % dbResults) + instids = [] + for r in dbResults: + if not r.instance: + continue + instids.append(r.instance.instance_id) + logger.debug('[update process] Instance Id: %s' % ', '.join(instids)) + + # Get instance information from Eucalyptus + conn = self.getEucaConnection() + vmInstances = [] + reservations = conn.get_all_instances(instids) for reservation in reservations: - for instance in reservation.instances: - instances.append(instance) - - # Construct a dictionary for the EucaRSpecBuilder - instancesDict = {} - for instance in instances: - instList = instancesDict.setdefault(instance.instance_type, []) - instInfoDict = {} - - instInfoDict['id'] = instance.id - instInfoDict['public_dns'] = instance.public_dns_name - instInfoDict['state'] = instance.state - instInfoDict['key'] = instance.key_name - - instList.append(instInfoDict) - cloud['instances'] = instancesDict - - except EC2ResponseError, ec2RespErr: - errTree = ET.fromstring(ec2RespErr.body) - errMsgE = errTree.find('.//Message') - logger.error(errMsgE.text) - - rspec = EucaRSpecBuilder(cloud).toXML() - - # Remove the instances records so next time they won't - # show up. - if 'instances' in cloud: - del cloud['instances'] - - return rspec - -""" -Hook called via 'sfi.py create' -""" -def CreateSliver(api, slice_xrn, creds, xml, users, call_id): - if Callids().already_handled(call_id): return "" - - global cloud - logger = logging.getLogger('EucaAggregate') - logger.debug("In CreateSliver") - - aggregate = Aggregate(api) - slices = Slices(api) - (hrn, type) = urn_to_hrn(slice_xrn) - peer = slices.get_peer(hrn) - sfa_peer = slices.get_sfa_peer(hrn) - slice_record=None - if users: - slice_record = users[0].get('slice_record', {}) - - conn = getEucaConnection() - if not conn: - logger.error('Cannot create a connection to Eucalyptus') - return "" - - # Validate RSpec - schemaXML = ET.parse(EUCALYPTUS_RSPEC_SCHEMA) - rspecValidator = ET.RelaxNG(schemaXML) - rspecXML = ET.XML(xml) - for network in rspecXML.iterfind("./network"): - if network.get('name') != cloud['name']: - # Throw away everything except my own RSpec - # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id')) - network.getparent().remove(network) - if not rspecValidator(rspecXML): - error = rspecValidator.error_log.last_error - message = '%s (line %s)' % (error.message, error.line) - raise InvalidRSpec(message) - + vmInstances += reservation.instances + + # Check the IPs + instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name} + for i in vmInstances if i.private_dns_name != '0.0.0.0' ] + logger.debug('[update process] IP dict: %s' % str(instIPs)) + + # Update the local DB + for ipData in instIPs: + dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None) + if not dbInst: + logger.info('[update process] Could not find %s in DB' % ipData['id']) + continue + dbInst.meta.pri_addr = ipData['pri_addr'] + dbInst.meta.pub_addr = ipData['pub_addr'] + dbInst.meta.state = 'running' + + self.dumpinstanceInfo() + + ## + # Creates a connection to Eucalytpus. This function is inspired by + # the make_connection() in Euca2ools. + # + # @return A connection object or None + # + def getEucaConnection(): + accessKey = AggregateManagerEucalyptus.cloud['access_key'] + secretKey = AggregateManagerEucalyptus.cloud['secret_key'] + eucaURL = AggregateManagerEucalyptus.cloud['cloud_url'] + useSSL = False + srvPath = '/' + eucaPort = 8773 + logger = logging.getLogger('EucaAggregate') + + if not accessKey or not secretKey or not eucaURL: + logger.error('Please set ALL of the required environment ' \ + 'variables by sourcing the eucarc file.') + return None + + # Split the url into parts + if eucaURL.find('https://') >= 0: + useSSL = True + eucaURL = eucaURL.replace('https://', '') + elif eucaURL.find('http://') >= 0: + useSSL = False + eucaURL = eucaURL.replace('http://', '') + (eucaHost, parts) = eucaURL.split(':') + if len(parts) > 1: + parts = parts.split('/') + eucaPort = int(parts[0]) + parts = parts[1:] + srvPath = '/'.join(parts) + + return boto.connect_ec2(aws_access_key_id=accessKey, + aws_secret_access_key=secretKey, + is_secure=useSSL, + region=RegionInfo(None, 'eucalyptus', eucaHost), + port=eucaPort, + path=srvPath) + + def ListResources(api, creds, options, call_id): + if Callids().already_handled(call_id): return "" + # get slice's hrn from options + xrn = options.get('geni_slice_urn', '') + hrn, type = urn_to_hrn(xrn) + logger = logging.getLogger('EucaAggregate') + + # get hrn of the original caller + origin_hrn = options.get('origin_hrn', None) + if not origin_hrn: + origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() + + conn = self.getEucaConnection() + + if not conn: + logger.error('Cannot create a connection to Eucalyptus') + return 'Cannot create a connection to Eucalyptus' + + try: + # Zones + zones = conn.get_all_zones(['verbose']) + p = ZoneResultParser(zones) + clusters = p.parse() + AggregateManagerEucalyptus.cloud['clusters'] = clusters + + # Images + images = conn.get_all_images() + AggregateManagerEucalyptus.cloud['images'] = images + AggregateManagerEucalyptus.cloud['imageBundles'] = {} + for i in images: + if i.type != 'machine' or i.kernel_id is None: continue + name = os.path.dirname(i.location) + detail = {'imageID' : i.id, 'kernelID' : i.kernel_id, 'ramdiskID' : i.ramdisk_id} + AggregateManagerEucalyptus.cloud['imageBundles'][name] = detail + + # Key Pairs + keyPairs = conn.get_all_key_pairs() + AggregateManagerEucalyptus.cloud['keypairs'] = keyPairs + + if hrn: + instanceId = [] + instances = [] + + # Get the instances that belong to the given slice from sqlite3 + # XXX use getOne() in production because the slice's hrn is supposed + # to be unique. For testing, uniqueness is turned off in the db. + # If the slice isn't found in the database, create a record for the + # slice. + matchedSlices = list(Slice.select(Slice.q.slice_hrn == hrn)) + if matchedSlices: + theSlice = matchedSlices[-1] + else: + theSlice = Slice(slice_hrn = hrn) + for instance in theSlice.instances: + instanceId.append(instance.instance_id) + + # Get the information about those instances using their ids. + if len(instanceId) > 0: + reservations = conn.get_all_instances(instanceId) + else: + reservations = [] + for reservation in reservations: + for instance in reservation.instances: + instances.append(instance) + + # Construct a dictionary for the EucaRSpecBuilder + instancesDict = {} + for instance in instances: + instList = instancesDict.setdefault(instance.instance_type, []) + instInfoDict = {} + + instInfoDict['id'] = instance.id + instInfoDict['public_dns'] = instance.public_dns_name + instInfoDict['state'] = instance.state + instInfoDict['key'] = instance.key_name + + instList.append(instInfoDict) + AggregateManagerEucalyptus.cloud['instances'] = instancesDict + + except EC2ResponseError, ec2RespErr: + errTree = ET.fromstring(ec2RespErr.body) + errMsgE = errTree.find('.//Message') + logger.error(errMsgE.text) + + rspec = EucaRSpecBuilder(AggregateManagerEucalyptus.cloud).toXML() + + # Remove the instances records so next time they won't + # show up. + if 'instances' in AggregateManagerEucalyptus.cloud: + del AggregateManagerEucalyptus.cloud['instances'] + + return rspec + """ - Create the sliver[s] (slice) at this aggregate. - Verify HRN and initialize the slice record in PLC if necessary. + Hook called via 'sfi.py create' """ - - # ensure site record exists - site = slices.verify_site(hrn, slice_record, peer, sfa_peer) - # ensure slice record exists - slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) - # ensure person records exists - persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) - - # Get the slice from db or create one. - s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None) - if s is None: - s = Slice(slice_hrn = hrn) - - # Process any changes in existing instance allocation - pendingRmInst = [] - for sliceInst in s.instances: - pendingRmInst.append(sliceInst.instance_id) - existingInstGroup = rspecXML.findall(".//euca_instances") - for instGroup in existingInstGroup: - for existingInst in instGroup: - if existingInst.get('id') in pendingRmInst: - pendingRmInst.remove(existingInst.get('id')) - for inst in pendingRmInst: - dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None) - if dbInst.meta.state != 'deleted': - logger.debug('Instance %s will be terminated' % inst) - # Terminate instances one at a time for robustness - conn.terminate_instances([inst]) - # Only change the state but do not remove the entry from the DB. - dbInst.meta.state = 'deleted' - #dbInst.destroySelf() - - # Process new instance requests - requests = rspecXML.findall(".//request") - if requests: - # Get all the public keys associate with slice. - keys = [] - for user in users: - keys += user['keys'] - logger.debug("Keys: %s" % user['keys']) - pubKeys = '\n'.join(keys) - logger.debug('Passing the following keys to the instance:\n%s' % pubKeys) - for req in requests: - vmTypeElement = req.getparent() - instType = vmTypeElement.get('name') - numInst = int(req.find('instances').text) - - bundleName = req.find('bundle').text - if not cloud['imageBundles'][bundleName]: - logger.error('Cannot find bundle %s' % bundleName) - bundleInfo = cloud['imageBundles'][bundleName] - instKernel = bundleInfo['kernelID'] - instDiskImg = bundleInfo['imageID'] - instRamDisk = bundleInfo['ramdiskID'] - instKey = None - - # Create the instances - for i in range(0, numInst): - eucaInst = EucaInstance(slice = s, - kernel_id = instKernel, - image_id = instDiskImg, - ramdisk_id = instRamDisk, - key_pair = instKey, - inst_type = instType, - meta = Meta(start_time=datetime.datetime.now())) - eucaInst.reserveInstance(conn, pubKeys) - - # xxx - should return altered rspec - # with enough data for the client to understand what's happened - return xml - -## -# Return information on the IP addresses bound to each slice's instances -# -def dumpInstanceInfo(): - logger = logging.getLogger('EucaMeta') - outdir = "/var/www/html/euca/" - outfile = outdir + "instances.txt" - - try: - os.makedirs(outdir) - except OSError, e: - if e.errno != errno.EEXIST: - raise - - dbResults = Meta.select( - AND(Meta.q.pri_addr != None, - Meta.q.state == 'running') - ) - dbResults = list(dbResults) - f = open(outfile, "w") - for r in dbResults: - instId = r.instance.instance_id - ipaddr = r.pri_addr - hrn = r.instance.slice.slice_hrn - logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn)) - f.write("%s %s %s\n" % (instId, ipaddr, hrn)) - f.close() - -## -# A separate process that will update the meta data. -# -def updateMeta(): - logger = logging.getLogger('EucaMeta') - fileHandler = logging.FileHandler('/var/log/euca_meta.log') - fileHandler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) - logger.addHandler(fileHandler) - fileHandler.setLevel(logging.DEBUG) - logger.setLevel(logging.DEBUG) - - while True: - sleep(30) - - # Get IDs of the instances that don't have IPs yet. + def CreateSliver(api, slice_xrn, creds, xml, users, call_id): + if Callids().already_handled(call_id): return "" + + logger = logging.getLogger('EucaAggregate') + logger.debug("In CreateSliver") + + aggregate = Aggregate(api) + slices = Slices(api) + (hrn, type) = urn_to_hrn(slice_xrn) + peer = slices.get_peer(hrn) + sfa_peer = slices.get_sfa_peer(hrn) + slice_record=None + if users: + slice_record = users[0].get('slice_record', {}) + + conn = self.getEucaConnection() + if not conn: + logger.error('Cannot create a connection to Eucalyptus') + return "" + + # Validate RSpec + schemaXML = ET.parse(AggregateManagerEucalyptus.EUCALYPTUS_RSPEC_SCHEMA) + rspecValidator = ET.RelaxNG(schemaXML) + rspecXML = ET.XML(xml) + for network in rspecXML.iterfind("./network"): + if network.get('name') != AggregateManagerEucalyptus.cloud['name']: + # Throw away everything except my own RSpec + # sfa_logger().error("CreateSliver: deleting %s from rspec"%network.get('id')) + network.getparent().remove(network) + if not rspecValidator(rspecXML): + error = rspecValidator.error_log.last_error + message = '%s (line %s)' % (error.message, error.line) + raise InvalidRSpec(message) + + """ + Create the sliver[s] (slice) at this aggregate. + Verify HRN and initialize the slice record in PLC if necessary. + """ + + # ensure site record exists + site = slices.verify_site(hrn, slice_record, peer, sfa_peer) + # ensure slice record exists + slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) + # ensure person records exists + persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) + + # Get the slice from db or create one. + s = Slice.select(Slice.q.slice_hrn == hrn).getOne(None) + if s is None: + s = Slice(slice_hrn = hrn) + + # Process any changes in existing instance allocation + pendingRmInst = [] + for sliceInst in s.instances: + pendingRmInst.append(sliceInst.instance_id) + existingInstGroup = rspecXML.findall(".//euca_instances") + for instGroup in existingInstGroup: + for existingInst in instGroup: + if existingInst.get('id') in pendingRmInst: + pendingRmInst.remove(existingInst.get('id')) + for inst in pendingRmInst: + dbInst = EucaInstance.select(EucaInstance.q.instance_id == inst).getOne(None) + if dbInst.meta.state != 'deleted': + logger.debug('Instance %s will be terminated' % inst) + # Terminate instances one at a time for robustness + conn.terminate_instances([inst]) + # Only change the state but do not remove the entry from the DB. + dbInst.meta.state = 'deleted' + #dbInst.destroySelf() + + # Process new instance requests + requests = rspecXML.findall(".//request") + if requests: + # Get all the public keys associate with slice. + keys = [] + for user in users: + keys += user['keys'] + logger.debug("Keys: %s" % user['keys']) + pubKeys = '\n'.join(keys) + logger.debug('Passing the following keys to the instance:\n%s' % pubKeys) + for req in requests: + vmTypeElement = req.getparent() + instType = vmTypeElement.get('name') + numInst = int(req.find('instances').text) + + bundleName = req.find('bundle').text + if not AggregateManagerEucalyptus.cloud['imageBundles'][bundleName]: + logger.error('Cannot find bundle %s' % bundleName) + bundleInfo = AggregateManagerEucalyptus.cloud['imageBundles'][bundleName] + instKernel = bundleInfo['kernelID'] + instDiskImg = bundleInfo['imageID'] + instRamDisk = bundleInfo['ramdiskID'] + instKey = None + + # Create the instances + for i in range(0, numInst): + eucaInst = EucaInstance(slice = s, + kernel_id = instKernel, + image_id = instDiskImg, + ramdisk_id = instRamDisk, + key_pair = instKey, + inst_type = instType, + meta = Meta(start_time=datetime.datetime.now())) + eucaInst.reserveInstance(conn, pubKeys) + + # xxx - should return altered rspec + # with enough data for the client to understand what's happened + return xml + + ## + # Return information on the IP addresses bound to each slice's instances + # + def dumpInstanceInfo(): + logger = logging.getLogger('EucaMeta') + outdir = "/var/www/html/euca/" + outfile = outdir + "instances.txt" + + try: + os.makedirs(outdir) + except OSError, e: + if e.errno != errno.EEXIST: + raise + dbResults = Meta.select( - AND(Meta.q.pri_addr == None, - Meta.q.state != 'deleted') - ) + AND(Meta.q.pri_addr != None, + Meta.q.state == 'running') + ) dbResults = list(dbResults) - logger.debug('[update process] dbResults: %s' % dbResults) - instids = [] + f = open(outfile, "w") for r in dbResults: - if not r.instance: - continue - instids.append(r.instance.instance_id) - logger.debug('[update process] Instance Id: %s' % ', '.join(instids)) - - # Get instance information from Eucalyptus - conn = getEucaConnection() - vmInstances = [] - reservations = conn.get_all_instances(instids) - for reservation in reservations: - vmInstances += reservation.instances - - # Check the IPs - instIPs = [ {'id':i.id, 'pri_addr':i.private_dns_name, 'pub_addr':i.public_dns_name} - for i in vmInstances if i.private_dns_name != '0.0.0.0' ] - logger.debug('[update process] IP dict: %s' % str(instIPs)) - - # Update the local DB - for ipData in instIPs: - dbInst = EucaInstance.select(EucaInstance.q.instance_id == ipData['id']).getOne(None) - if not dbInst: - logger.info('[update process] Could not find %s in DB' % ipData['id']) - continue - dbInst.meta.pri_addr = ipData['pri_addr'] - dbInst.meta.pub_addr = ipData['pub_addr'] - dbInst.meta.state = 'running' - - dumpInstanceInfo() - -def GetVersion(api): - xrn=Xrn(api.hrn) - request_rspec_versions = [dict(sfa_rspec_version)] - ad_rspec_versions = [dict(sfa_rspec_version)] - version_more = {'interface':'aggregate', - 'testbed':'myplc', - 'hrn':xrn.get_hrn(), - 'request_rspec_versions': request_rspec_versions, - 'ad_rspec_versions': ad_rspec_versions, - 'default_ad_rspec': dict(sfa_rspec_version) - } - return version_core(version_more) - -#def main(): -# init_server() -# -# #theRSpec = None -# #with open(sys.argv[1]) as xml: -# # theRSpec = xml.read() -# #CreateSliver(None, 'planetcloud.pc.test', theRSpec, 'call-id-cloudtest') -# -# #rspec = ListResources('euca', 'planetcloud.pc.test', 'planetcloud.pc.marcoy', 'test_euca') -# #print rspec -# -# server_key_file = '/var/lib/sfa/authorities/server.key' -# server_cert_file = '/var/lib/sfa/authorities/server.cert' -# api = PlcSfaApi(key_file = server_key_file, cert_file = server_cert_file, interface='aggregate') -# print getKeysForSlice(api, 'gc.gc.test1') -# -#if __name__ == "__main__": -# main() + instId = r.instance.instance_id + ipaddr = r.pri_addr + hrn = r.instance.slice.slice_hrn + logger.debug('[dumpInstanceInfo] %s %s %s' % (instId, ipaddr, hrn)) + f.write("%s %s %s\n" % (instId, ipaddr, hrn)) + f.close() + + def GetVersion(api): + xrn=Xrn(api.hrn) + request_rspec_versions = [dict(sfa_rspec_version)] + ad_rspec_versions = [dict(sfa_rspec_version)] + version_more = {'interface':'aggregate', + 'testbed':'myplc', + 'hrn':xrn.get_hrn(), + 'request_rspec_versions': request_rspec_versions, + 'ad_rspec_versions': ad_rspec_versions, + 'default_ad_rspec': dict(sfa_rspec_version) + } + return version_core(version_more) diff --git a/sfa/managers/aggregate_manager_max.py b/sfa/managers/aggregate_manager_max.py index 315f5438..ac3a8faf 100644 --- a/sfa/managers/aggregate_manager_max.py +++ b/sfa/managers/aggregate_manager_max.py @@ -1,270 +1,263 @@ -import os -import time -import re - -from sfa.util.faults import * -from sfa.util.sfalogging import logger -from sfa.util.config import Config -from sfa.util.sfatime import utcparse -from sfa.util.callids import Callids -from sfa.util.version import version_core -from sfa.util.xrn import urn_to_hrn, hrn_to_urn, get_authority, Xrn -from sfa.util.plxrn import hrn_to_pl_slicename - -from sfa.server.sfaapi import SfaApi -from sfa.server.registry import Registries -from sfa.rspecs.rspec_version import RSpecVersion -from sfa.rspecs.sfa_rspec import sfa_rspec_version -from sfa.rspecs.rspec_parser import parse_rspec - -from sfa.managers.aggregate_manager import __get_registry_objects, ListSlices - -from sfa.plc.slices import Slices - - -RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec" - -# execute shell command and return both exit code and text output -def shell_execute(cmd, timeout): - pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r') - pipe = os.popen(cmd + ' 2>&1', 'r') - text = '' - while timeout: - line = pipe.read() - text += line - time.sleep(1) - timeout = timeout-1 - code = pipe.close() - if code is None: code = 0 - if text[-1:] == '\n': text = text[:-1] - return code, text - -""" - call AM API client with command like in the following example: - cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \ - net.geni.aggregate.client.examples.CreateSliceNetworkClient \ - ./repo https://geni:8443/axis2/services/AggregateGENI \ - ... params ... -""" - -def call_am_apiclient(client_app, params, timeout): - (client_path, am_url) = Config().get_max_aggrMgr_info() - sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params) - ret = shell_execute(sys_cmd, timeout) - logger.debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret)) - return ret - -# save request RSpec xml content to a tmp file -def save_rspec_to_file(rspec): - path = RSPEC_TMP_FILE_PREFIX + "_" + time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml" - file = open(path, "w") - file.write(rspec) - file.close() - return path - -# get stripped down slice id/name plc.maxpl.xislice1 --> maxpl_xislice1 -def get_plc_slice_id(cred, xrn): - (hrn, type) = urn_to_hrn(xrn) - slice_id = hrn.find(':') - sep = '.' - if hrn.find(':') != -1: - sep=':' - elif hrn.find('+') != -1: - sep='+' - else: - sep='.' - slice_id = hrn.split(sep)[-2] + '_' + hrn.split(sep)[-1] - return slice_id - -# extract xml -def get_xml_by_tag(text, tag): - indx1 = text.find('<'+tag) - indx2 = text.find('/'+tag+'>') - xml = None - if indx1!=-1 and indx2>indx1: - xml = text[indx1:indx2+len(tag)+2] - return xml - -def prepare_slice(api, slice_xrn, creds, users): - reg_objects = __get_registry_objects(slice_xrn, creds, users) - (hrn, type) = urn_to_hrn(slice_xrn) - slices = Slices(api) - peer = slices.get_peer(hrn) - sfa_peer = slices.get_sfa_peer(hrn) - slice_record=None - if users: - slice_record = users[0].get('slice_record', {}) - registry = api.registries[api.hrn] - credential = api.getCredential() - # ensure site record exists - site = slices.verify_site(hrn, slice_record, peer, sfa_peer) - # ensure slice record exists - slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) - # ensure person records exists - persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) - -def parse_resources(text, slice_xrn): - resources = [] - urn = hrn_to_urn(slice_xrn, 'sliver') - plc_slice = re.search("Slice Status => ([^\n]+)", text) - if plc_slice.group(1) != 'NONE': - res = {} - res['geni_urn'] = urn + '_plc_slice' - res['geni_error'] = '' - res['geni_status'] = 'unknown' - if plc_slice.group(1) == 'CREATED': - res['geni_status'] = 'ready' - resources.append(res) - vlans = re.findall("GRI => ([^\n]+)\n\t Status => ([^\n]+)", text) - for vlan in vlans: - res = {} - res['geni_error'] = '' - res['geni_urn'] = urn + '_vlan_' + vlan[0] - if vlan[1] == 'ACTIVE': - res['geni_status'] = 'ready' - elif vlan[1] == 'FAILED': - res['geni_status'] = 'failed' - else: - res['geni_status'] = 'configuring' - resources.append(res) - return resources - -def slice_status(api, slice_xrn, creds): - urn = hrn_to_urn(slice_xrn, 'slice') - result = {} - top_level_status = 'unknown' - slice_id = get_plc_slice_id(creds, urn) - (ret, output) = call_am_apiclient("QuerySliceNetworkClient", [slice_id,], 5) - # parse output into rspec XML - if output.find("Unkown Rspec:") > 0: - top_level_staus = 'failed' - result['geni_resources'] = '' - else: - has_failure = 0 - all_active = 0 - if output.find("Status => FAILED") > 0: - top_level_staus = 'failed' - elif ( output.find("Status => ACCEPTED") > 0 or output.find("Status => PENDING") > 0 - or output.find("Status => INSETUP") > 0 or output.find("Status => INCREATE") > 0 - ): - top_level_status = 'configuring' - else: - top_level_status = 'ready' - result['geni_resources'] = parse_resources(output, slice_xrn) - result['geni_urn'] = urn - result['geni_status'] = top_level_status - return result - -def create_slice(api, xrn, cred, rspec, users): - indx1 = rspec.find("") - if indx1 > -1 and indx2 > indx1: - rspec = rspec[indx1+len(""):indx2-1] - rspec_path = save_rspec_to_file(rspec) - prepare_slice(api, xrn, cred, users) - slice_id = get_plc_slice_id(cred, xrn) - sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" +slice_id+ "/g\" " + rspec_path + ";sed -i \"s/:rspec=[^:'<\\\" ]*/:rspec=" +slice_id+ "/g\" " + rspec_path - ret = shell_execute(sys_cmd, 1) - sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + rspec_path + "/g\"" - ret = shell_execute(sys_cmd, 1) - (ret, output) = call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3) - # parse output ? - rspec = " Done! " - return True - -def delete_slice(api, xrn, cred): - slice_id = get_plc_slice_id(cred, xrn) - (ret, output) = call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3) - # parse output ? - return 1 - - -def get_rspec(api, cred, slice_urn): - logger.debug("#### called max-get_rspec") - #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1 - if slice_urn == None: - (ret, output) = call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5) - else: - slice_id = get_plc_slice_id(cred, slice_urn) - (ret, output) = call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5) - # parse output into rspec XML - if output.find("No resouce found") > 0: - rspec = " No resource found " - else: - comp_rspec = get_xml_by_tag(output, 'computeResource') - logger.debug("#### computeResource %s" % comp_rspec) - topo_rspec = get_xml_by_tag(output, 'topology') - logger.debug("#### topology %s" % topo_rspec) - rspec = " "; - if comp_rspec != None: - rspec = rspec + get_xml_by_tag(output, 'computeResource') - if topo_rspec != None: - rspec = rspec + get_xml_by_tag(output, 'topology') - rspec = rspec + " " - return (rspec) - -def start_slice(api, xrn, cred): - # service not supported - return None - -def stop_slice(api, xrn, cred): - # service not supported - return None - -def reset_slices(api, xrn): - # service not supported - return None - -""" - GENI AM API Methods -""" - -def GetVersion(api): - xrn=Xrn(api.hrn) - request_rspec_versions = [dict(sfa_rspec_version)] - ad_rspec_versions = [dict(sfa_rspec_version)] - #TODO: MAX-AM specific - version_more = {'interface':'aggregate', - 'testbed':'myplc', - 'hrn':xrn.get_hrn(), - 'request_rspec_versions': request_rspec_versions, - 'ad_rspec_versions': ad_rspec_versions, - 'default_ad_rspec': dict(sfa_rspec_version) - } - return version_core(version_more) - -def SliverStatus(api, slice_xrn, creds, call_id): - if Callids().already_handled(call_id): return {} - return slice_status(api, slice_xrn, creds) - -def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id): - if Callids().already_handled(call_id): return "" - #TODO: create real CreateSliver response rspec - ret = create_slice(api, slice_xrn, creds, rspec_string, users) - if ret: - return get_rspec(api, creds, slice_xrn) - else: - return " Error! " - -def DeleteSliver(api, xrn, creds, call_id): - if Callids().already_handled(call_id): return "" - return delete_slice(api, xrn, creds) - -# no caching -def ListResources(api, creds, options,call_id): - if Callids().already_handled(call_id): return "" - # version_string = "rspec_%s" % (rspec_version.get_version_name()) - slice_urn = options.get('geni_slice_urn') - return get_rspec(api, creds, slice_urn) - -def fetch_context(slice_hrn, user_hrn, contexts): - """ - Returns the request context required by sfatables. At some point, this mechanism should be changed - to refer to "contexts", which is the information that sfatables is requesting. But for now, we just - return the basic information needed in a dict. - """ - base_context = {'sfa':{'user':{'hrn':user_hrn}}} - return base_context - api = SfaApi() - create_slice(api, "plc.maxpl.test000", None, rspec_xml, None) - +import os +import time +import re + +#from sfa.util.faults import * +from sfa.util.sfalogging import logger +from sfa.util.config import Config +from sfa.util.callids import Callids +from sfa.util.version import version_core +from sfa.util.xrn import urn_to_hrn, hrn_to_urn, Xrn + +# xxx the sfa.rspecs module is dead - this symbol is now undefined +#from sfa.rspecs.sfa_rspec import sfa_rspec_version + +from sfa.managers.aggregate_manager import AggregateManager + +from sfa.plc.slices import Slices + +class AggregateManagerMax (AggregateManager): + + RSPEC_TMP_FILE_PREFIX = "/tmp/max_rspec" + + # execute shell command and return both exit code and text output + def shell_execute(self, cmd, timeout): + pipe = os.popen('{ ' + cmd + '; } 2>&1', 'r') + pipe = os.popen(cmd + ' 2>&1', 'r') + text = '' + while timeout: + line = pipe.read() + text += line + time.sleep(1) + timeout = timeout-1 + code = pipe.close() + if code is None: code = 0 + if text[-1:] == '\n': text = text[:-1] + return code, text + + + def call_am_apiclient(self, client_app, params, timeout): + """ + call AM API client with command like in the following example: + cd aggregate_client; java -classpath AggregateWS-client-api.jar:lib/* \ + net.geni.aggregate.client.examples.CreateSliceNetworkClient \ + ./repo https://geni:8443/axis2/services/AggregateGENI \ + ... params ... + """ + (client_path, am_url) = Config().get_max_aggrMgr_info() + sys_cmd = "cd " + client_path + "; java -classpath AggregateWS-client-api.jar:lib/* net.geni.aggregate.client.examples." + client_app + " ./repo " + am_url + " " + ' '.join(params) + ret = self.shell_execute(sys_cmd, timeout) + logger.debug("shell_execute cmd: %s returns %s" % (sys_cmd, ret)) + return ret + + # save request RSpec xml content to a tmp file + def save_rspec_to_file(self, rspec): + path = AggregateManagerMax.RSPEC_TMP_FILE_PREFIX + "_" + \ + time.strftime('%Y%m%dT%H:%M:%S', time.gmtime(time.time())) +".xml" + file = open(path, "w") + file.write(rspec) + file.close() + return path + + # get stripped down slice id/name plc.maxpl.xislice1 --> maxpl_xislice1 + def get_plc_slice_id(self, cred, xrn): + (hrn, type) = urn_to_hrn(xrn) + slice_id = hrn.find(':') + sep = '.' + if hrn.find(':') != -1: + sep=':' + elif hrn.find('+') != -1: + sep='+' + else: + sep='.' + slice_id = hrn.split(sep)[-2] + '_' + hrn.split(sep)[-1] + return slice_id + + # extract xml + def get_xml_by_tag(self, text, tag): + indx1 = text.find('<'+tag) + indx2 = text.find('/'+tag+'>') + xml = None + if indx1!=-1 and indx2>indx1: + xml = text[indx1:indx2+len(tag)+2] + return xml + + def prepare_slice(self, api, slice_xrn, creds, users): + reg_objects = self._get_registry_objects(slice_xrn, creds, users) + (hrn, type) = urn_to_hrn(slice_xrn) + slices = Slices(api) + peer = slices.get_peer(hrn) + sfa_peer = slices.get_sfa_peer(hrn) + slice_record=None + if users: + slice_record = users[0].get('slice_record', {}) + registry = api.registries[api.hrn] + credential = api.getCredential() + # ensure site record exists + site = slices.verify_site(hrn, slice_record, peer, sfa_peer) + # ensure slice record exists + slice = slices.verify_slice(hrn, slice_record, peer, sfa_peer) + # ensure person records exists + persons = slices.verify_persons(hrn, slice, users, peer, sfa_peer) + + def parse_resources(self, text, slice_xrn): + resources = [] + urn = hrn_to_urn(slice_xrn, 'sliver') + plc_slice = re.search("Slice Status => ([^\n]+)", text) + if plc_slice.group(1) != 'NONE': + res = {} + res['geni_urn'] = urn + '_plc_slice' + res['geni_error'] = '' + res['geni_status'] = 'unknown' + if plc_slice.group(1) == 'CREATED': + res['geni_status'] = 'ready' + resources.append(res) + vlans = re.findall("GRI => ([^\n]+)\n\t Status => ([^\n]+)", text) + for vlan in vlans: + res = {} + res['geni_error'] = '' + res['geni_urn'] = urn + '_vlan_' + vlan[0] + if vlan[1] == 'ACTIVE': + res['geni_status'] = 'ready' + elif vlan[1] == 'FAILED': + res['geni_status'] = 'failed' + else: + res['geni_status'] = 'configuring' + resources.append(res) + return resources + + def slice_status(self, api, slice_xrn, creds): + urn = hrn_to_urn(slice_xrn, 'slice') + result = {} + top_level_status = 'unknown' + slice_id = self.get_plc_slice_id(creds, urn) + (ret, output) = self.call_am_apiclient("QuerySliceNetworkClient", [slice_id,], 5) + # parse output into rspec XML + if output.find("Unkown Rspec:") > 0: + top_level_staus = 'failed' + result['geni_resources'] = '' + else: + has_failure = 0 + all_active = 0 + if output.find("Status => FAILED") > 0: + top_level_staus = 'failed' + elif ( output.find("Status => ACCEPTED") > 0 or output.find("Status => PENDING") > 0 + or output.find("Status => INSETUP") > 0 or output.find("Status => INCREATE") > 0 + ): + top_level_status = 'configuring' + else: + top_level_status = 'ready' + result['geni_resources'] = self.parse_resources(output, slice_xrn) + result['geni_urn'] = urn + result['geni_status'] = top_level_status + return result + + def create_slice(self, api, xrn, cred, rspec, users): + indx1 = rspec.find("") + if indx1 > -1 and indx2 > indx1: + rspec = rspec[indx1+len(""):indx2-1] + rspec_path = self.save_rspec_to_file(rspec) + self.prepare_slice(api, xrn, cred, users) + slice_id = self.get_plc_slice_id(cred, xrn) + sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" +slice_id+ "/g\" " + rspec_path + ";sed -i \"s/:rspec=[^:'<\\\" ]*/:rspec=" +slice_id+ "/g\" " + rspec_path + ret = self.shell_execute(sys_cmd, 1) + sys_cmd = "sed -i \"s/rspec id=\\\"[^\\\"]*/rspec id=\\\"" + rspec_path + "/g\"" + ret = self.shell_execute(sys_cmd, 1) + (ret, output) = self.call_am_apiclient("CreateSliceNetworkClient", [rspec_path,], 3) + # parse output ? + rspec = " Done! " + return True + + def delete_slice(self, api, xrn, cred): + slice_id = self.get_plc_slice_id(cred, xrn) + (ret, output) = self.call_am_apiclient("DeleteSliceNetworkClient", [slice_id,], 3) + # parse output ? + return 1 + + + def get_rspec(self, api, cred, slice_urn): + logger.debug("#### called max-get_rspec") + #geni_slice_urn: urn:publicid:IDN+plc:maxpl+slice+xi_rspec_test1 + if slice_urn == None: + (ret, output) = self.call_am_apiclient("GetResourceTopology", ['all', '\"\"'], 5) + else: + slice_id = self.get_plc_slice_id(cred, slice_urn) + (ret, output) = self.call_am_apiclient("GetResourceTopology", ['all', slice_id,], 5) + # parse output into rspec XML + if output.find("No resouce found") > 0: + rspec = " No resource found " + else: + comp_rspec = self.get_xml_by_tag(output, 'computeResource') + logger.debug("#### computeResource %s" % comp_rspec) + topo_rspec = self.get_xml_by_tag(output, 'topology') + logger.debug("#### topology %s" % topo_rspec) + rspec = " " + if comp_rspec != None: + rspec = rspec + self.get_xml_by_tag(output, 'computeResource') + if topo_rspec != None: + rspec = rspec + self.get_xml_by_tag(output, 'topology') + rspec = rspec + " " + return (rspec) + + def start_slice(self, api, xrn, cred): + # service not supported + return None + + def stop_slice(self, api, xrn, cred): + # service not supported + return None + + def reset_slices(self, api, xrn): + # service not supported + return None + + ### GENI AM API Methods + + def GetVersion(self, api): + xrn=Xrn(api.hrn) + request_rspec_versions = [dict(sfa_rspec_version)] + ad_rspec_versions = [dict(sfa_rspec_version)] + #TODO: MAX-AM specific + version_more = {'interface':'aggregate', + 'testbed':'myplc', + 'hrn':xrn.get_hrn(), + 'request_rspec_versions': request_rspec_versions, + 'ad_rspec_versions': ad_rspec_versions, + 'default_ad_rspec': dict(sfa_rspec_version) + } + return version_core(version_more) + + def SliverStatus(self, api, slice_xrn, creds, call_id): + if Callids().already_handled(call_id): return {} + return self.slice_status(api, slice_xrn, creds) + + def CreateSliver(self, api, slice_xrn, creds, rspec_string, users, call_id): + if Callids().already_handled(call_id): return "" + #TODO: create real CreateSliver response rspec + ret = self.create_slice(api, slice_xrn, creds, rspec_string, users) + if ret: + return self.get_rspec(api, creds, slice_xrn) + else: + return " Error! " + + def DeleteSliver(self, api, xrn, creds, call_id): + if Callids().already_handled(call_id): return "" + return self.delete_slice(api, xrn, creds) + + # no caching + def ListResources(self, api, creds, options,call_id): + if Callids().already_handled(call_id): return "" + # version_string = "rspec_%s" % (rspec_version.get_version_name()) + slice_urn = options.get('geni_slice_urn') + return self.get_rspec(api, creds, slice_urn) + + def fetch_context(self, slice_hrn, user_hrn, contexts): + """ + Returns the request context required by sfatables. At some point, this mechanism should be changed + to refer to "contexts", which is the information that sfatables is requesting. But for now, we just + return the basic information needed in a dict. + """ + base_context = {'sfa':{'user':{'hrn':user_hrn}}} + return base_context + diff --git a/sfa/managers/aggregate_manager_openflow.py b/sfa/managers/aggregate_manager_openflow.py deleted file mode 100755 index a804a651..00000000 --- a/sfa/managers/aggregate_manager_openflow.py +++ /dev/null @@ -1,178 +0,0 @@ -import sys - -import socket -import struct - -#The following is not essential -#from soaplib.wsgi_soap import SimpleWSGISoapApp -#from soaplib.serializers.primitive import * -#from soaplib.serializers.clazz import * - -from sfa.util.faults import * -from sfa.util.xrn import urn_to_hrn -from sfa.server.registry import Registries -from sfa.util.config import Config -from sfa.plc.nodes import * -from sfa.util.callids import Callids - -# Message IDs for all the SFA light calls -# This will be used by the aggrMgr controller -SFA_GET_RESOURCES = 101 -SFA_CREATE_SLICE = 102 -SFA_START_SLICE = 103 -SFA_STOP_SLICE = 104 -SFA_DELETE_SLICE = 105 -SFA_GET_SLICES = 106 -SFA_RESET_SLICES = 107 - -DEBUG = 1 - -def print_buffer(buf): - for i in range(0,len(buf)): - print('%x' % buf[i]) - -def extract(sock): - # Shud we first obtain the message length? - # msg_len = socket.ntohs(sock.recv(2)) - msg = "" - - while (1): - try: - chunk = sock.recv(1) - except socket.error, message: - if 'timed out' in message: - break - else: - sys.exit("Socket error: " + message) - - if len(chunk) == 0: - break - msg += chunk - - print 'Done extracting %d bytes of response from aggrMgr' % len(msg) - return msg - -def connect(server, port): - '''Connect to the Aggregate Manager module''' - sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM ) - sock.connect ( ( server, port) ) - sock.settimeout(1) - if DEBUG: print 'Connected!' - return sock - -def connect_aggrMgr(): - (aggr_mgr_ip, aggr_mgr_port) = Config().get_openflow_aggrMgr_info() - if DEBUG: print """Connecting to port %d of %s""" % (aggr_mgr_port, aggr_mgr_ip) - return connect(aggr_mgr_ip, aggr_mgr_port) - -def generate_slide_id(cred, hrn): - if cred == None: - cred = "" - if hrn == None: - hrn = "" - #return cred + '_' + hrn - return str(hrn) - -def msg_aggrMgr(cred, hrn, msg_id): - slice_id = generate_slide_id(cred, hrn) - - msg = struct.pack('> B%ds' % len(slice_id), msg_id, slice_id) - buf = struct.pack('> H', len(msg)+2) + msg - - try: - aggrMgr_sock = connect_aggrMgr() - aggrMgr_sock.send(buf) - aggrMgr_sock.close() - return 1 - except socket.error, message: - print "Socket error" - except IOerror, message: - print "IO error" - return 0 - -def start_slice(cred, xrn): - hrn = urn_to_hrn(xrn)[0] - if DEBUG: print "Received start_slice call" - return msg_aggrMgr(SFA_START_SLICE) - -def stop_slice(cred, xrn): - hrn = urn_to_hrn(xrn)[0] - if DEBUG: print "Received stop_slice call" - return msg_aggrMgr(SFA_STOP_SLICE) - -def DeleteSliver(cred, xrn, call_id): - if Callids().already_handled(call_id): return "" - hrn = urn_to_hrn(xrn)[0] - if DEBUG: print "Received DeleteSliver call" - return msg_aggrMgr(SFA_DELETE_SLICE) - -def reset_slices(cred, xrn): - hrn = urn_to_hrn(xrn)[0] - if DEBUG: print "Received reset_slices call" - return msg_aggrMgr(SFA_RESET_SLICES) - -### Thierry: xxx this should ahve api as a first arg - probably outdated -def CreateSliver(cred, xrn, rspec, call_id): - if Callids().already_handled(call_id): return "" - - hrn = urn_to_hrn(xrn)[0] - if DEBUG: print "Received CreateSliver call" - slice_id = generate_slide_id(cred, hrn) - - msg = struct.pack('> B%ds%ds' % (len(slice_id)+1, len(rspec)), SFA_CREATE_SLICE, slice_id, rspec) - buf = struct.pack('> H', len(msg)+2) + msg - - try: - aggrMgr_sock = connect_aggrMgr() - aggrMgr_sock.send(buf) - if DEBUG: print "Sent %d bytes and closing connection" % len(buf) - aggrMgr_sock.close() - - if DEBUG: print "----------------" - return rspec - except socket.error, message: - print "Socket error" - except IOerror, message: - print "IO error" - return "" - -# Thierry : xxx this would need to handle call_id like the other AMs but is outdated... -def ListResources(cred, xrn=None): - hrn = urn_to_hrn(xrn)[0] - if DEBUG: print "Received ListResources call" - slice_id = generate_slide_id(cred, hrn) - - msg = struct.pack('> B%ds' % len(slice_id), SFA_GET_RESOURCES, slice_id) - buf = struct.pack('> H', len(msg)+2) + msg - - try: - aggrMgr_sock = connect_aggrMgr() - aggrMgr_sock.send(buf) - resource_list = extract(aggrMgr_sock); - aggrMgr_sock.close() - - if DEBUG: print "----------------" - return resource_list - except socket.error, message: - print "Socket error" - except IOerror, message: - print "IO error" - return None - -""" -Returns the request context required by sfatables. At some point, this mechanism should be changed -to refer to "contexts", which is the information that sfatables is requesting. But for now, we just -return the basic information needed in a dict. -""" -def fetch_context(slice_hrn, user_hrn, contexts): - base_context = {'sfa':{'user':{'hrn':user_hrn}}} - return base_context - -def main(): - r = RSpec() - r.parseFile(sys.argv[1]) - rspec = r.toDict() - CreateSliver(None,'plc',rspec,'call-id-plc') - -if __name__ == "__main__": - main() diff --git a/sfa/managers/managerwrapper.py b/sfa/managers/managerwrapper.py index 5231c2aa..b0326d67 100644 --- a/sfa/managers/managerwrapper.py +++ b/sfa/managers/managerwrapper.py @@ -1,4 +1,6 @@ -from sfa.util.faults import SfaNotImplemented +from types import ModuleType, ClassType + +from sfa.util.faults import SfaNotImplemented, SfaAPIError from sfa.util.sfalogging import logger #################### @@ -14,7 +16,15 @@ class ManagerWrapper: the standard AttributeError """ def __init__(self, manager, interface): - self.manager = manager + if isinstance (manager, ModuleType): + # old-fashioned module implementation + self.manager = manager + elif isinstance (manager, ClassType): + # create an instance; we don't pass the api in argument as it is passed + # to the actual method calls anyway + self.manager = manager() + else: + raise SfaAPIError,"Argument to ManagerWrapper must be a module or class" self.interface = interface def __getattr__(self, method): diff --git a/sfa/managers/slice_manager.py b/sfa/managers/slice_manager.py index 685e67a7..72a9c698 100644 --- a/sfa/managers/slice_manager.py +++ b/sfa/managers/slice_manager.py @@ -19,525 +19,519 @@ from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec from sfa.client.client_helper import sfa_to_pg_users_arg -def _call_id_supported(api, server): - """ - Returns true if server support the optional call_id arg, false otherwise. - """ - server_version = api.get_cached_server_version(server) - - if 'sfa' in server_version: - code_tag = server_version['code_tag'] - code_tag_parts = code_tag.split("-") - - version_parts = code_tag_parts[0].split(".") - major, minor = version_parts[0:2] - rev = code_tag_parts[1] - if int(major) > 1: - if int(minor) > 0 or int(rev) > 20: - return True - return False - -# we have specialized xmlrpclib.ServerProxy to remember the input url -# OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances -def get_serverproxy_url (server): - try: - return server.get_url() - except: - logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") - return server._ServerProxy__host + server._ServerProxy__handler - -def GetVersion(api): - # peers explicitly in aggregates.xml - peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() - if peername != api.hrn]) - version_manager = VersionManager() - ad_rspec_versions = [] - request_rspec_versions = [] - for rspec_version in version_manager.versions: - if rspec_version.content_type in ['*', 'ad']: - ad_rspec_versions.append(rspec_version.to_dict()) - if rspec_version.content_type in ['*', 'request']: - request_rspec_versions.append(rspec_version.to_dict()) - default_rspec_version = version_manager.get_version("sfa 1").to_dict() - xrn=Xrn(api.hrn, 'authority+sa') - version_more = {'interface':'slicemgr', - 'hrn' : xrn.get_hrn(), - 'urn' : xrn.get_urn(), - 'peers': peers, - 'request_rspec_versions': request_rspec_versions, - 'ad_rspec_versions': ad_rspec_versions, - 'default_ad_rspec': default_rspec_version +class SliceManager: + def __init__ (self): + # self.caching=False + self.caching=True + + + def _call_id_supported(self, api, server): + """ + Returns true if server support the optional call_id arg, false otherwise. + """ + server_version = api.get_cached_server_version(server) + + if 'sfa' in server_version: + code_tag = server_version['code_tag'] + code_tag_parts = code_tag.split("-") + + version_parts = code_tag_parts[0].split(".") + major, minor = version_parts[0:2] + rev = code_tag_parts[1] + if int(major) > 1: + if int(minor) > 0 or int(rev) > 20: + return True + return False + + # we have specialized xmlrpclib.ServerProxy to remember the input url + # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances + def get_serverproxy_url (self, server): + try: + return server.get_url() + except: + logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") + return server._ServerProxy__host + server._ServerProxy__handler + + def GetVersion(self, api): + # peers explicitly in aggregates.xml + peers =dict ([ (peername,self.get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + if peername != api.hrn]) + version_manager = VersionManager() + ad_rspec_versions = [] + request_rspec_versions = [] + for rspec_version in version_manager.versions: + if rspec_version.content_type in ['*', 'ad']: + ad_rspec_versions.append(rspec_version.to_dict()) + if rspec_version.content_type in ['*', 'request']: + request_rspec_versions.append(rspec_version.to_dict()) + default_rspec_version = version_manager.get_version("sfa 1").to_dict() + xrn=Xrn(api.hrn, 'authority+sa') + version_more = {'interface':'slicemgr', + 'hrn' : xrn.get_hrn(), + 'urn' : xrn.get_urn(), + 'peers': peers, + 'request_rspec_versions': request_rspec_versions, + 'ad_rspec_versions': ad_rspec_versions, + 'default_ad_rspec': default_rspec_version } - sm_version=version_core(version_more) - # local aggregate if present needs to have localhost resolved - if api.hrn in api.aggregates: - local_am_url=get_serverproxy_url(api.aggregates[api.hrn]) - sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) - return sm_version - -def drop_slicemgr_stats(rspec): - try: - stats_elements = rspec.xml.xpath('//statistics') - for node in stats_elements: - node.getparent().remove(node) - except Exception, e: - logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) - -def add_slicemgr_stat(rspec, callname, aggname, elapsed, status): - try: - stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) - if stats_tags: - stats_tag = stats_tags[0] - else: - stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname) - - etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) - except Exception, e: - logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) - -def ListResources(api, creds, options, call_id): - version_manager = VersionManager() - def _ListResources(aggregate, server, credential, opts, call_id): - - my_opts = copy(opts) - args = [credential, my_opts] - tStart = time.time() + sm_version=version_core(version_more) + # local aggregate if present needs to have localhost resolved + if api.hrn in api.aggregates: + local_am_url=self.get_serverproxy_url(api.aggregates[api.hrn]) + sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) + return sm_version + + def drop_slicemgr_stats(self, rspec): try: - if _call_id_supported(api, server): - args.append(call_id) - version = api.get_cached_server_version(server) - # force ProtoGENI aggregates to give us a v2 RSpec - if 'sfa' not in version.keys(): - my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict() - rspec = server.ListResources(*args) - return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + stats_elements = rspec.xml.xpath('//statistics') + for node in stats_elements: + node.getparent().remove(node) except Exception, e: - api.logger.log_exc("ListResources failed at %s" %(server.url)) - return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} - - if Callids().already_handled(call_id): return "" - - # get slice's hrn from options - xrn = options.get('geni_slice_urn', '') - (hrn, type) = urn_to_hrn(xrn) - if 'geni_compressed' in options: - del(options['geni_compressed']) - - # get the rspec's return format from options - rspec_version = version_manager.get_version(options.get('rspec_version')) - version_string = "rspec_%s" % (rspec_version.to_string()) - - # look in cache first - if caching and api.cache and not xrn: - rspec = api.cache.get(version_string) - if rspec: - return rspec - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - - # get the rspec from the aggregate - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run(_ListResources, aggregate, server, [cred], options, call_id) - - - results = threads.get_results() - rspec_version = version_manager.get_version(options.get('rspec_version')) - if xrn: - result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest') - else: - result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad') - rspec = RSpec(version=result_version) - for result in results: - add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"]) - if result["status"]=="success": - try: - rspec.version.merge(result["rspec"]) - except: - api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") - - # cache the result - if caching and api.cache and not xrn: - api.cache.add(version_string, rspec.toxml()) - - return rspec.toxml() - - -def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - - version_manager = VersionManager() - def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id): - tStart = time.time() + logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) + + def add_slicemgr_stat(self, rspec, callname, aggname, elapsed, status): try: - # Need to call GetVersion at an aggregate to determine the supported - # rspec type/format beofre calling CreateSliver at an Aggregate. + stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) + if stats_tags: + stats_tag = stats_tags[0] + else: + stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname) + + etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) + except Exception, e: + logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) + + def ListResources(self, api, creds, options, call_id): + version_manager = VersionManager() + def _ListResources(aggregate, server, credential, opts, call_id): + + my_opts = copy(opts) + args = [credential, my_opts] + tStart = time.time() + try: + if self._call_id_supported(api, server): + args.append(call_id) + version = api.get_cached_server_version(server) + # force ProtoGENI aggregates to give us a v2 RSpec + if 'sfa' not in version.keys(): + my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict() + rspec = server.ListResources(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + except Exception, e: + api.logger.log_exc("ListResources failed at %s" %(server.url)) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} + + if Callids().already_handled(call_id): return "" + + # get slice's hrn from options + xrn = options.get('geni_slice_urn', '') + (hrn, type) = urn_to_hrn(xrn) + if 'geni_compressed' in options: + del(options['geni_compressed']) + + # get the rspec's return format from options + rspec_version = version_manager.get_version(options.get('rspec_version')) + version_string = "rspec_%s" % (rspec_version.to_string()) + + # look in cache first + if self.caching and api.cache and not xrn: + rspec = api.cache.get(version_string) + if rspec: + return rspec + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + + # get the rspec from the aggregate + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run(_ListResources, aggregate, server, [cred], options, call_id) + + + results = threads.get_results() + rspec_version = version_manager.get_version(options.get('rspec_version')) + if xrn: + result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest') + else: + result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad') + rspec = RSpec(version=result_version) + for result in results: + self.add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.version.merge(result["rspec"]) + except: + api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") + + # cache the result + if self.caching and api.cache and not xrn: + api.cache.add(version_string, rspec.toxml()) + + return rspec.toxml() + + + def CreateSliver(self, api, xrn, creds, rspec_str, users, call_id): + + version_manager = VersionManager() + def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id): + tStart = time.time() + try: + # Need to call GetVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + server_version = api.get_cached_server_version(server) + requested_users = users + if 'sfa' not in server_version and 'geni_api' in server_version: + # sfa aggregtes support both sfa and pg rspecs, no need to convert + # if aggregate supports sfa rspecs. otherwise convert to pg rspec + rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request')) + filter = {'component_manager_id': server_version['urn']} + rspec.filter(filter) + rspec = rspec.toxml() + requested_users = sfa_to_pg_users_arg(users) + args = [xrn, credential, rspec, requested_users] + if self._call_id_supported(api, server): + args.append(call_id) + rspec = server.CreateSliver(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + except: + logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} + + if Callids().already_handled(call_id): return "" + # Validate the RSpec against PlanetLab's schema --disabled for now + # The schema used here needs to aggregate the PL and VINI schemas + # schema = "/var/www/html/schemas/pl.rng" + rspec = RSpec(rspec_str) + # schema = None + # if schema: + # rspec.validate(schema) + + # if there is a section, the aggregates don't care about it, + # so delete it. + self.drop_slicemgr_stats(rspec) + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + + # get the callers hrn + hrn, type = urn_to_hrn(xrn) + valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + # Just send entire RSpec to each aggregate + threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id) + + results = threads.get_results() + manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') + result_rspec = RSpec(version=manifest_version) + for result in results: + self.add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + result_rspec.version.merge(result["rspec"]) + except: + api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") + return result_rspec.toxml() + + def RenewSliver(self, api, xrn, creds, expiration_time, call_id): + def _RenewSliver(server, xrn, creds, expiration_time, call_id): server_version = api.get_cached_server_version(server) - requested_users = users - if 'sfa' not in server_version and 'geni_api' in server_version: - # sfa aggregtes support both sfa and pg rspecs, no need to convert - # if aggregate supports sfa rspecs. otherwise convert to pg rspec - rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request')) - filter = {'component_manager_id': server_version['urn']} - rspec.filter(filter) - rspec = rspec.toxml() - requested_users = sfa_to_pg_users_arg(users) - args = [xrn, credential, rspec, requested_users] - if _call_id_supported(api, server): + args = [xrn, creds, expiration_time, call_id] + if self._call_id_supported(api, server): args.append(call_id) - rspec = server.CreateSliver(*args) - return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} - except: - logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) - return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} - - if Callids().already_handled(call_id): return "" - # Validate the RSpec against PlanetLab's schema --disabled for now - # The schema used here needs to aggregate the PL and VINI schemas - # schema = "/var/www/html/schemas/pl.rng" - rspec = RSpec(rspec_str) -# schema = None -# if schema: -# rspec.validate(schema) - - # if there is a section, the aggregates don't care about it, - # so delete it. - drop_slicemgr_stats(rspec) - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - - # get the callers hrn - hrn, type = urn_to_hrn(xrn) - valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id) + return server.RenewSliver(*args) + + if Callids().already_handled(call_id): return True + + (hrn, type) = urn_to_hrn(xrn) + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id) + # 'and' the results + return reduce (lambda x,y: x and y, threads.get_results() , True) + + def DeleteSliver(self, api, xrn, creds, call_id): + def _DeleteSliver(server, xrn, creds, call_id): + server_version = api.get_cached_server_version(server) + args = [xrn, creds] + if self._call_id_supported(api, server): + args.append(call_id) + return server.DeleteSliver(*args) + + if Callids().already_handled(call_id): return "" + (hrn, type) = urn_to_hrn(xrn) + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run(_DeleteSliver, server, xrn, [cred], call_id) + threads.get_results() + return 1 + + + # first draft at a merging SliverStatus + def SliverStatus(self, api, slice_xrn, creds, call_id): + def _SliverStatus(server, xrn, creds, call_id): + server_version = api.get_cached_server_version(server) + args = [xrn, creds] + if self._call_id_supported(api, server): + args.append(call_id) + return server.SliverStatus(*args) + + if Callids().already_handled(call_id): return {} + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run (_SliverStatus, server, slice_xrn, [cred], call_id) + results = threads.get_results() + + # get rid of any void result - e.g. when call_id was hit where by convention we return {} + results = [ result for result in results if result and result['geni_resources']] + + # do not try to combine if there's no result + if not results : return {} + + # otherwise let's merge stuff + overall = {} + + # mmh, it is expected that all results carry the same urn + overall['geni_urn'] = results[0]['geni_urn'] + overall['pl_login'] = results[0]['pl_login'] + # append all geni_resources + overall['geni_resources'] = \ + reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) + overall['status'] = 'unknown' + if overall['geni_resources']: + overall['status'] = 'ready' + + return overall + + def ListSlices(self, api, creds, call_id): + def _ListSlices(server, creds, call_id): + server_version = api.get_cached_server_version(server) + args = [creds] + if self._call_id_supported(api, server): + args.append(call_id) + return server.ListSlices(*args) + + if Callids().already_handled(call_id): return [] + + # look in cache first + if self.caching and api.cache: + slices = api.cache.get('slices') + if slices: + return slices + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + cred= api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + # fetch from aggregates + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run(_ListSlices, server, [cred], call_id) + + # combime results + results = threads.get_results() + slices = [] + for result in results: + slices.extend(result) + + # cache the result + if self.caching and api.cache: + api.cache.add('slices', slices) + + return slices + + + def get_ticket(self, api, xrn, creds, rspec, users): + slice_hrn, type = urn_to_hrn(xrn) + # get the netspecs contained within the clients rspec + aggregate_rspecs = {} + tree= etree.parse(StringIO(rspec)) + elements = tree.findall('./network') + for element in elements: + aggregate_hrn = element.values()[0] + aggregate_rspecs[aggregate_hrn] = rspec + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems(): + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue - results = threads.get_results() - manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') - result_rspec = RSpec(version=manifest_version) - for result in results: - add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"]) - if result["status"]=="success": - try: - result_rspec.version.merge(result["rspec"]) - except: - api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") - return result_rspec.toxml() - -def RenewSliver(api, xrn, creds, expiration_time, call_id): - def _RenewSliver(server, xrn, creds, expiration_time, call_id): - server_version = api.get_cached_server_version(server) - args = [xrn, creds, expiration_time, call_id] - if _call_id_supported(api, server): - args.append(call_id) - return server.RenewSliver(*args) - - if Callids().already_handled(call_id): return True - - (hrn, type) = urn_to_hrn(xrn) - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id) - # 'and' the results - return reduce (lambda x,y: x and y, threads.get_results() , True) - -def DeleteSliver(api, xrn, creds, call_id): - def _DeleteSliver(server, xrn, creds, call_id): - server_version = api.get_cached_server_version(server) - args = [xrn, creds] - if _call_id_supported(api, server): - args.append(call_id) - return server.DeleteSliver(*args) - - if Callids().already_handled(call_id): return "" - (hrn, type) = urn_to_hrn(xrn) - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run(_DeleteSliver, server, xrn, [cred], call_id) - threads.get_results() - return 1 - - -# first draft at a merging SliverStatus -def SliverStatus(api, slice_xrn, creds, call_id): - def _SliverStatus(server, xrn, creds, call_id): - server_version = api.get_cached_server_version(server) - args = [xrn, creds] - if _call_id_supported(api, server): - args.append(call_id) - return server.SliverStatus(*args) - - if Callids().already_handled(call_id): return {} - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run (_SliverStatus, server, slice_xrn, [cred], call_id) - results = threads.get_results() - - # get rid of any void result - e.g. when call_id was hit where by convention we return {} - results = [ result for result in results if result and result['geni_resources']] - - # do not try to combine if there's no result - if not results : return {} - - # otherwise let's merge stuff - overall = {} - - # mmh, it is expected that all results carry the same urn - overall['geni_urn'] = results[0]['geni_urn'] - overall['pl_login'] = results[0]['pl_login'] - # append all geni_resources - overall['geni_resources'] = \ - reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) - overall['status'] = 'unknown' - if overall['geni_resources']: - overall['status'] = 'ready' - - return overall - -caching=True -#caching=False -def ListSlices(api, creds, call_id): - def _ListSlices(server, creds, call_id): - server_version = api.get_cached_server_version(server) - args = [creds] - if _call_id_supported(api, server): - args.append(call_id) - return server.ListSlices(*args) - - if Callids().already_handled(call_id): return [] - - # look in cache first - if caching and api.cache: - slices = api.cache.get('slices') - if slices: - return slices - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred= api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - # fetch from aggregates - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run(_ListSlices, server, [cred], call_id) - - # combime results - results = threads.get_results() - slices = [] - for result in results: - slices.extend(result) - - # cache the result - if caching and api.cache: - api.cache.add('slices', slices) - - return slices - - -def get_ticket(api, xrn, creds, rspec, users): - slice_hrn, type = urn_to_hrn(xrn) - # get the netspecs contained within the clients rspec - aggregate_rspecs = {} - tree= etree.parse(StringIO(rspec)) - elements = tree.findall('./network') - for element in elements: - aggregate_hrn = element.values()[0] - aggregate_rspecs[aggregate_hrn] = rspec - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems(): - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users) + + results = threads.get_results() - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users) - - results = threads.get_results() - - # gather information from each ticket - rspec = None - initscripts = [] - slivers = [] - object_gid = None - for result in results: - agg_ticket = SfaTicket(string=result) - attrs = agg_ticket.get_attributes() - if not object_gid: - object_gid = agg_ticket.get_gid_object() - if not rspec: - rspec = RSpec(agg_ticket.get_rspec()) - else: - rspec.version.merge(agg_ticket.get_rspec()) - initscripts.extend(attrs.get('initscripts', [])) - slivers.extend(attrs.get('slivers', [])) - - # merge info - attributes = {'initscripts': initscripts, - 'slivers': slivers} - - # create a new ticket - ticket = SfaTicket(subject = slice_hrn) - ticket.set_gid_caller(api.auth.client_gid) - ticket.set_issuer(key=api.key, subject=api.hrn) - ticket.set_gid_object(object_gid) - ticket.set_pubkey(object_gid.get_pubkey()) - #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) - ticket.set_attributes(attributes) - ticket.set_rspec(rspec.toxml()) - ticket.encode() - ticket.sign() - return ticket.save_to_string(save_parents=True) - -def start_slice(api, xrn, creds): - hrn, type = urn_to_hrn(xrn) - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run(server.Start, xrn, cred) - threads.get_results() - return 1 - -def stop_slice(api, xrn, creds): - hrn, type = urn_to_hrn(xrn) - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.server_proxy(interface, cred) - threads.run(server.Stop, xrn, cred) - threads.get_results() - return 1 - -def reset_slice(api, xrn): - """ - Not implemented - """ - return 1 - -def shutdown(api, xrn, creds): - """ - Not implemented - """ - return 1 - -def status(api, xrn, creds): - """ - Not implemented - """ - return 1 - -# this is plain broken -#def main(): -# r = RSpec() -# r.parseFile(sys.argv[1]) -# rspec = r.toDict() -# CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice') - -if __name__ == "__main__": - main() + # gather information from each ticket + rspec = None + initscripts = [] + slivers = [] + object_gid = None + for result in results: + agg_ticket = SfaTicket(string=result) + attrs = agg_ticket.get_attributes() + if not object_gid: + object_gid = agg_ticket.get_gid_object() + if not rspec: + rspec = RSpec(agg_ticket.get_rspec()) + else: + rspec.version.merge(agg_ticket.get_rspec()) + initscripts.extend(attrs.get('initscripts', [])) + slivers.extend(attrs.get('slivers', [])) + + # merge info + attributes = {'initscripts': initscripts, + 'slivers': slivers} + + # create a new ticket + ticket = SfaTicket(subject = slice_hrn) + ticket.set_gid_caller(api.auth.client_gid) + ticket.set_issuer(key=api.key, subject=api.hrn) + ticket.set_gid_object(object_gid) + ticket.set_pubkey(object_gid.get_pubkey()) + #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) + ticket.set_attributes(attributes) + ticket.set_rspec(rspec.toxml()) + ticket.encode() + ticket.sign() + return ticket.save_to_string(save_parents=True) + + def start_slice(self, api, xrn, creds): + hrn, type = urn_to_hrn(xrn) + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run(server.Start, xrn, cred) + threads.get_results() + return 1 + + def stop_slice(self, api, xrn, creds): + hrn, type = urn_to_hrn(xrn) + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + threads.run(server.Stop, xrn, cred) + threads.get_results() + return 1 + + def reset_slice(self, api, xrn): + """ + Not implemented + """ + return 1 + + def shutdown(self, api, xrn, creds): + """ + Not implemented + """ + return 1 + + def status(self, api, xrn, creds): + """ + Not implemented + """ + return 1 diff --git a/sfa/plc/aggregate.py b/sfa/plc/aggregate.py index 654a84f0..abd118f8 100644 --- a/sfa/plc/aggregate.py +++ b/sfa/plc/aggregate.py @@ -1,11 +1,14 @@ #!/usr/bin/python from sfa.util.xrn import hrn_to_urn, urn_to_hrn -from sfa.util.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename +from sfa.util.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, urn_to_sliver_id from sfa.rspecs.rspec import RSpec +from sfa.rspecs.elements.hardware_type import HardwareType from sfa.rspecs.elements.link import Link +from sfa.rspecs.elements.login import Login from sfa.rspecs.elements.interface import Interface - +from sfa.rspecs.elements.services import Services +from sfa.rspecs.elements.pltag import PLTag from sfa.util.topology import Topology from sfa.rspecs.version_manager import VersionManager from sfa.plc.vlink import get_tc_rate @@ -13,13 +16,6 @@ from sfa.plc.vlink import get_tc_rate class Aggregate: api = None - sites = {} - nodes = {} - interfaces = {} - links = {} - node_tags = {} - pl_initscripts = {} - prepared=False #panos new user options variable user_options = {} @@ -27,198 +23,204 @@ class Aggregate: self.api = api self.user_options = user_options - def prepare_sites(self, filter={}, force=False): - if not self.sites or force: - for site in self.api.driver.GetSites(filter): - self.sites[site['site_id']] = site - - def prepare_nodes(self, filter={}, force=False): - if not self.nodes or force: - filter.update({'peer_id': None}) - nodes = self.api.driver.GetNodes(filter) - site_ids = [] - interface_ids = [] - tag_ids = [] - for node in nodes: - site_ids.append(node['site_id']) - interface_ids.extend(node['interface_ids']) - tag_ids.extend(node['node_tag_ids']) - self.prepare_sites({'site_id': site_ids}) - self.prepare_interfaces({'interface_id': interface_ids}) - self.prepare_node_tags({'node_tag_id': tag_ids}) - for node in nodes: - # add site/interface info to nodes. - # assumes that sites, interfaces and tags have already been prepared. - site = self.sites[node['site_id']] - interfaces = [self.interfaces[interface_id] for interface_id in node['interface_ids']] - tags = [self.node_tags[tag_id] for tag_id in node['node_tag_ids']] - node['network'] = self.api.hrn - node['network_urn'] = hrn_to_urn(self.api.hrn, 'authority+am') - node['urn'] = hostname_to_urn(self.api.hrn, site['login_base'], node['hostname']) - node['site_urn'] = hrn_to_urn(PlXrn.site_hrn(self.api.hrn, site['login_base']), 'authority+sa') - node['site'] = site - node['interfaces'] = interfaces - node['tags'] = tags - self.nodes[node['node_id']] = node - - def prepare_interfaces(self, filter={}, force=False): - if not self.interfaces or force: - for interface in self.api.driver.GetInterfaces(filter): - self.interfaces[interface['interface_id']] = interface - - def prepare_links(self, filter={}, force=False): - # we're aobut to deprecate sfa_aggregate_type, need to get this right - # with the generic framework - if not self.links or force: - if not self.api.config.SFA_AGGREGATE_TYPE.lower() == 'vini': - return - - topology = Topology() - for (site_id1, site_id2) in topology: - link = Link() - if not site_id1 in self.sites or site_id2 not in self.sites: + def get_sites(self, filter={}): + sites = {} + for site in self.api.driver.GetSites(filter): + sites[site['site_id']] = site + return sites + + def get_interfaces(self, filter={}): + interfaces = {} + for interface in self.api.driver.GetInterfaces(filter): + iface = Interface() + iface['interface_id'] = interface['interface_id'] + iface['node_id'] = interface['node_id'] + iface['ipv4'] = interface['ip'] + iface['bwlimit'] = interface['bwlimit'] + interfaces[iface['interface_id']] = iface + return interfaces + + def get_links(self, filter={}): + + if not self.api.config.SFA_AGGREGATE_TYPE.lower() == 'vini': + return + + topology = Topology() + links = {} + for (site_id1, site_id2) in topology: + link = Link() + if not site_id1 in self.sites or site_id2 not in self.sites: + continue + site1 = self.sites[site_id1] + site2 = self.sites[site_id2] + # get hrns + site1_hrn = self.api.hrn + '.' + site1['login_base'] + site2_hrn = self.api.hrn + '.' + site2['login_base'] + # get the first node + node1 = self.nodes[site1['node_ids'][0]] + node2 = self.nodes[site2['node_ids'][0]] + + # set interfaces + # just get first interface of the first node + if1_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node1['node_id'])) + if1_ipv4 = self.interfaces[node1['interface_ids'][0]]['ip'] + if2_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node2['node_id'])) + if2_ipv4 = self.interfaces[node2['interface_ids'][0]]['ip'] + + if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} ) + if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} ) + + # set link + link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'}) + link['interface1'] = if1 + link['interface2'] = if2 + link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base']) + link['component_id'] = PlXrn(auth=self.api.hrn, interface=link['component_name']).get_urn() + link['component_manager_id'] = hrn_to_urn(self.api.hrn, 'authority+am') + links[link['component_name']] = link + + return links + + def get_node_tags(self, filter={}): + node_tags = {} + for node_tag in self.api.driver.GetNodeTags(filter): + node_tags[node_tag['node_tag_id']] = node_tag + return node_tags + + def get_pl_initscripts(self, filter={}): + pl_initscripts = {} + filter.update({'enabled': True}) + for initscript in self.api.driver.GetInitScripts(filter): + pl_initscripts[initscript['initscript_id']] = initscript + return pl_initscripts + + + def get_slice_and_slivers(self, slice_xrn): + """ + Returns a dict of slivers keyed on the sliver's node_id + """ + slivers = {} + slice = None + if not slice_xrn: + return (slice, slivers) + slice_urn = hrn_to_urn(slice_xrn) + slice_hrn, _ = urn_to_hrn(slice_xrn) + slice_name = hrn_to_pl_slicename(slice_hrn) + slices = self.api.driver.GetSlices(slice_name) + if not slices: + return (slice, slivers) + slice = slices[0] + + # sort slivers by node id + for node_id in slice['node_ids']: + sliver = Sliver({'sliver_id': urn_to_sliver_id(slice_urn, slice['slice_id'], node_id), + 'name': 'plab-vserver', + 'tags': []}) + slivers[node_id]= sliver + + # sort sliver attributes by node id + tags = self.api.driver.GetSliceTags({'slice_tag_id': slice['slice_tag_ids']}) + for tag in tags: + # most likely a default/global sliver attribute (node_id == None) + if tag['node_id'] not in slivers: + sliver = Sliver({'sliver_id': urn_to_sliver_id(slice_urn, slice['slice_id'], ""), + 'name': 'plab-vserver', + 'tags': []}) + slivers[tag['node_id']] = sliver + slivers[tag['node_id']]['tags'].append(tag) + + return (slice, slivers) + + def get_nodes(self, slice=None): + filter = {} + if slice and 'node_ids' in slice and slice['node_ids']: + filter['node_id'] = slice['node_ids'] + + filter.update({'peer_id': None}) + nodes = self.api.driver.GetNodes(filter) + + site_ids = [] + interface_ids = [] + tag_ids = [] + for node in nodes: + site_ids.append(node['site_id']) + interface_ids.extend(node['interface_ids']) + tag_ids.extend(node['node_tag_ids']) + + # get sites + sites_dict = self.get_sites({'site_id': site_ids}) + # get interfaces + interfaces = self.get_interfaces({'interface_id':interface_ids}) + # get slivers + slivers = self.get_slivers(slice) + # get tags + node_tags = self.get_node_tags({'node_id': node_ids}) + # get initscripts + pl_initscripts = self.get_pl_initscripts() + + rspec_nodes = [] + for node in nodes: + # skip whitelisted nodes + if node['slice_ids_whitelist']: + if not slice or slice['slice_id'] not in node['slice_ids_whitelist']: continue - site1 = self.sites[site_id1] - site2 = self.sites[site_id2] - # get hrns - site1_hrn = self.api.hrn + '.' + site1['login_base'] - site2_hrn = self.api.hrn + '.' + site2['login_base'] - # get the first node - node1 = self.nodes[site1['node_ids'][0]] - node2 = self.nodes[site2['node_ids'][0]] - - # set interfaces - # just get first interface of the first node - if1_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node1['node_id'])) - if1_ipv4 = self.interfaces[node1['interface_ids'][0]]['ip'] - if2_xrn = PlXrn(auth=self.api.hrn, interface='node%s:eth0' % (node2['node_id'])) - if2_ipv4 = self.interfaces[node2['interface_ids'][0]]['ip'] - - if1 = Interface({'component_id': if1_xrn.urn, 'ipv4': if1_ipv4} ) - if2 = Interface({'component_id': if2_xrn.urn, 'ipv4': if2_ipv4} ) - - # set link - link = Link({'capacity': '1000000', 'latency': '0', 'packet_loss': '0', 'type': 'ipv4'}) - link['interface1'] = if1 - link['interface2'] = if2 - link['component_name'] = "%s:%s" % (site1['login_base'], site2['login_base']) - link['component_id'] = PlXrn(auth=self.api.hrn, interface=link['component_name']).get_urn() - link['component_manager_id'] = hrn_to_urn(self.api.hrn, 'authority+am') - self.links[link['component_name']] = link - - - def prepare_node_tags(self, filter={}, force=False): - if not self.node_tags or force: - for node_tag in self.api.driver.GetNodeTags(filter): - self.node_tags[node_tag['node_tag_id']] = node_tag - - def prepare_pl_initscripts(self, filter={}, force=False): - if not self.pl_initscripts or force: - filter.update({'enabled': True}) - for initscript in self.api.driver.GetInitScripts(filter): - self.pl_initscripts[initscript['initscript_id']] = initscript - - def prepare(self, slice = None, force=False): - if not self.prepared or force or slice: - if not slice: - self.prepare_sites(force=force) - self.prepare_interfaces(force=force) - self.prepare_node_tags(force=force) - self.prepare_nodes(force=force) - self.prepare_links(force=force) - self.prepare_pl_initscripts(force=force) - else: - self.prepare_sites({'site_id': slice['site_id']}) - self.prepare_interfaces({'node_id': slice['node_ids']}) - self.prepare_node_tags({'node_id': slice['node_ids']}) - self.prepare_nodes({'node_id': slice['node_ids']}) - self.prepare_links({'slice_id': slice['slice_id']}) - self.prepare_pl_initscripts() - self.prepared = True - + rspec_node = Node() + rspec_node['component_id'] = hostname_to_urn(self.api.hrn, site['login_base'], node['hostname']) + rspec_node['component_name'] = node['hostname'] + rspec_node['component_manager_id'] = self.api.hrn + rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.api.hrn, site['login_base']), 'authority+sa') + rspec_node['boot_state'] = node['boot_state'] + rspec_node['exclusive'] = 'False' + rspec_node['hardware_types'].append(HardwareType({'name': 'plab-vserver'})) + # only doing this because protogeni rspec needs + # to advertise available initscripts + rspec_node['pl_initscripts'] = pl_initscripts + # add site/interface info to nodes. + # assumes that sites, interfaces and tags have already been prepared. + site = sites_dict[node['site_id']] + location = Location({'longitude': site['longitude'], 'latitude': site['latitude']}) + rspec_node['location'] = location + rspec_node['interfaces'] = [] + for if_id in node['interface_ids']: + interface = Interface(interfaces[if_id]) + interface['ipv4'] = interface['ip'] + rspec_node['interfaces'].append(interface) + tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids']] + rspec_node['tags'] = tags + if node['node_id'] in slivers: + # add sliver info + sliver = slivers[node['node_id']] + rspec_node['sliver_id'] = sliver['sliver_id'] + rspec_node['client_id'] = node['hostname'] + rspec_node['slivers'] = [slivers[node['node_id']]] + + # slivers always provide the ssh service + login = Login({'authentication': 'ssh-keys', 'hostname': node['hostname'], port:'22'}) + service = Services({'login': login}) + rspec_node['services'].append(service) + rspec_nodes.append(rspec_node) + return rspec_nodes + + def get_rspec(self, slice_xrn=None, version = None): + version_manager = VersionManager() version = version_manager.get_version(version) if not slice_xrn: rspec_version = version_manager._get_version(version.type, version.version, 'ad') else: rspec_version = version_manager._get_version(version.type, version.version, 'manifest') - - rspec = RSpec(version=rspec_version, user_options=self.user_options) - # get slice details if specified - slice = None - if slice_xrn: - slice_hrn, _ = urn_to_hrn(slice_xrn) - slice_name = hrn_to_pl_slicename(slice_hrn) - slices = self.api.driver.GetSlices(slice_name) - if slices: - slice = slices[0] - self.prepare(slice=slice) - else: - self.prepare() - - # filter out nodes with a whitelist: - valid_nodes = [] - for node in self.nodes.values(): - # only doing this because protogeni rspec needs - # to advertise available initscripts - node['pl_initscripts'] = self.pl_initscripts - - if slice and node['node_id'] in slice['node_ids']: - valid_nodes.append(node) - elif slice and slice['slice_id'] in node['slice_ids_whitelist']: - valid_nodes.append(node) - elif not slice and not node['slice_ids_whitelist']: - valid_nodes.append(node) - - rspec.version.add_nodes(valid_nodes) - rspec.version.add_interfaces(self.interfaces.values()) - rspec.version.add_links(self.links.values()) - - # add slivers - if slice_xrn and slice: - slivers = [] - tags = self.api.driver.GetSliceTags(slice['slice_tag_ids']) - - # add default tags - for tag in tags: - # if tag isn't bound to a node then it applies to all slivers - # and belongs in the tag - if not tag['node_id']: - rspec.version.add_default_sliver_attribute(tag['tagname'], tag['value'], self.api.hrn) - if tag['tagname'] == 'topo_rspec' and tag['node_id']: - node = self.nodes[tag['node_id']] - value = eval(tag['value']) - for (id, realip, bw, lvip, rvip, vnet) in value: - bps = get_tc_rate(bw) - remote = self.nodes[id] - site1 = self.sites[node['site_id']] - site2 = self.sites[remote['site_id']] - link1_name = '%s:%s' % (site1['login_base'], site2['login_base']) - link2_name = '%s:%s' % (site2['login_base'], site1['login_base']) - p_link = None - if link1_name in self.links: - link = self.links[link1_name] - elif link2_name in self.links: - link = self.links[link2_name] - v_link = Link() - - link.capacity = bps - for node_id in slice['node_ids']: - try: - sliver = {} - sliver['hostname'] = self.nodes[node_id]['hostname'] - sliver['node_id'] = node_id - sliver['slice_id'] = slice['slice_id'] - sliver['tags'] = [] - slivers.append(sliver) - - # add tags for this node only - for tag in tags: - if tag['node_id'] and (tag['node_id'] == node_id): - sliver['tags'].append(tag) - except: - self.api.logger.log_exc('unable to add sliver %s to node %s' % (slice['name'], node_id)) - rspec.version.add_slivers(slivers, sliver_urn=slice_xrn) + slice, slivers = self.get_slice_and_slivers(slice_xrn) + rspec = RSpec(version=rspec_version, user_options=self.user_options) + rspec.version.add_nodes(self.get_nodes(slice, slivers)) + rspec.version.add_links(self.get_links(slice)) + + # add sliver defaults + default_sliver_attribs = slivers.get(None, []) + for sliver_attrib in default_sliver_attribs: + rspec.version.add_default_sliver_attribute(sliver_attrib['name'], sliver_attrib['value']) + return rspec.toxml() + + diff --git a/sfa/rspecs/elements/node.py b/sfa/rspecs/elements/node.py index a2e11cd5..f90fff1b 100644 --- a/sfa/rspecs/elements/node.py +++ b/sfa/rspecs/elements/node.py @@ -6,6 +6,8 @@ class Node(Element): 'component_id': None, 'component_name': None, 'component_manager_id': None, + 'client_id': None, + 'sliver_id': None, 'authority_id': None, 'exclusive': None, 'location': None, @@ -13,10 +15,12 @@ class Node(Element): 'bw_limit': None, 'boot_state': None, 'slivers': [], - 'hardware_type': [], - 'disk_image': [], + 'hardware_types': [], + 'disk_images': [], 'interfaces': [], + 'services': [], 'tags': [], + 'pl_initscripts': [], } diff --git a/sfa/rspecs/elements/sliver.py b/sfa/rspecs/elements/sliver.py index febb0681..bf2cc1f7 100644 --- a/sfa/rspecs/elements/sliver.py +++ b/sfa/rspecs/elements/sliver.py @@ -2,8 +2,8 @@ from sfa.rspecs.elements.element import Element class Sliver(Element): fields = { + 'sliver_id': None, + 'client_id': None, 'name': None, 'tags': [], - 'slice_id': None, - } diff --git a/sfa/rspecs/elements/versions/pgv2Node.py b/sfa/rspecs/elements/versions/pgv2Node.py index e69de29b..4ddafc1f 100644 --- a/sfa/rspecs/elements/versions/pgv2Node.py +++ b/sfa/rspecs/elements/versions/pgv2Node.py @@ -0,0 +1,146 @@ + +from lxml import etree +from sfa.util.plxrn import PlXrn +from sfa.util.xrn import Xrn +from sfa.rspecs.elements.node import Node +from sfa.rspecs.elements.sliver import Sliver +from sfa.rspecs.elements.network import Network +from sfa.rspecs.elements.location import Location +from sfa.rspecs.elements.hardware_type import HardwareType +from sfa.rspecs.elements.disk_image import DiskImage +from sfa.rspecs.elements.interface import Interface +from sfa.rspecs.elements.bwlimit import BWlimit +from sfa.rspecs.elements.pl_tag import PLTag +from sfa.rspecs.rspec_elements import RSpecElement, RSpecElements +from sfa.rspecs.elements.versions.pgv2Service import PGv2Service + +class PGv2Node: + elements = { + 'node': RSpecElement(RSpecElements.NODE, '//default:node | //node'), + 'sliver': RSpecElement(RSpecElements.SLIVER, './default:sliver_type | ./sliver_type'), + 'interface': RSpecElement(RSpecElements.INTERFACE, './default:interface | ./interface'), + 'location': RSpecElement(RSpecElements.LOCATION, './default:location | ./location'), + 'hardware_type': RSpecElement(RSpecElements.HARDWARE_TYPE, './default:hardware_type | ./hardware_type'), + 'available': RSpecElement(RSpecElements.AVAILABLE, './default:available | ./available'), + } + + @staticmethod + def add_nodes(xml, nodes): + node_elems = [] + for node in nodes: + node_elem = etree.SubElement(xml, 'node') + node_elems.append(node_elem) + if node.get('component_manager_id'): + node_elem.set('component_manager_id', node['component_manager_id']) + if node.get('component_id'): + node_elem.set('component_id', node['component_id']) + component_name = Xrn(node['component_id']).get_leaf() + node_elem.set('component_nama', component_name) + if node.get('client_id'): + node_elem.set('client_id', node['client_id']) + if node.get('sliver_id'): + node_elem.set('sliver_id', node['sliver_id']) + if node.get('exclusive'): + node_elem.set('exclusive', node['exclusive']) + hardware_types = node.get('hardware_type', []) + for hardware_type in hardware_types: + hw_type_elem = etree.SubElement(node_elem, 'hardware_type') + if hardware_type.get('name'): + hw_type_elem.set('name', hardware_type['name']) + if node.get('boot_state', '').lower() == 'boot': + available_elem = etree.SubElement(node_elem, 'available', now='True') + else: + available_elem = etree.SubElement(node_elem, 'available', now='False') + + if node.get('services'): + PGv2Services.add_services(node_elem, node.get('services')) + + slivers = node.get('slivers', []) + pl_initscripts = node.get('pl_initscripts', {}) + for sliver in slivers: + sliver_elem = etree.SubElement(node_elem, 'sliver_type') + if sliver.get('name'): + sliver_elem.set('name', sliver['name']) + if sliver.get('client_id'): + sliver_elem.set('client_id', sliver['client_id']) + for pl_initscript in pl_initscripts.values(): + etree.SubElement(sliver_elem, '{%s}initscript' % xml.namespaces['planetlab'], \ + name=pl_initscript['name']) + location = node.get('location') + #only add locaiton if long and lat are not null + if location.get('longitute') and location.get('latitude'): + location_elem = etree.SubElement(node_elem, country=location['country'], + latitude=location['latitude'], longitude=location['longiutde']) + return node_elems + + @staticmethod + def get_nodes(xml): + nodes = [] + node_elems = xml.xpath(PGv2Node.elements['node'].path) + for node_elem in node_elems: + node = Node(node_elem.attrib, node_elem) + nodes.append(node) + if 'component_id' in node_elem.attrib: + node['authority_id'] = Xrn(node_elem.attrib['component_id']).get_authority_urn() + + # set hardware type + node['hardware_types'] = [] + hardware_type_elems = node_elem.xpath(PGv2Node.elements['hardware_type'].path, xml.namespaces) + for hardware_type_elem in hardware_type_elems: + node['hardware_types'].append(HardwareType(hardware_type_elem.attrib, hardware_type_elem)) + + # set location + location_elems = node_elem.xpath(PGv2Node.elements['location'].path, xml.namespaces) + if len(location_elems) > 0: + node['location'] = Location(location_elems[0].attrib, location_elems[0]) + + # set services + services_elems = node_elem.xpath(PGv2Service.elements['services'].path, xml.namespaces) + node['services'] = [] + for services_elem in services_elems: + # services element has no useful info, but the child elements do + for child in services_elem.iterchildren(): + + # set interfaces + interface_elems = node_elem.xpath(PGv2Node.elements['interface'].path, xml.namespaces) + node['interfaces'] = [] + for interface_elem in interface_elems: + node['interfaces'].append(Interface(interface_elem.attrib, interface_elem)) + + # set available + available = node_elem.xpath(PGv2Node.elements['available'].path, xml.namespaces) + if len(available) > 0: + if available[0].attrib.get('now', '').lower() == 'true': + node['boot_state'] = 'boot' + else: + node['boot_state'] = 'disabled' + + # set the slivers + sliver_elems = node_elem.xpath(PGv2Node.elements['sliver'].path, xml.namespaces) + node['slivers'] = [] + for sliver_elem in sliver_elems: + node['slivers'].append(Sliver(sliver_elem.attrib, sliver_elem)) + + return nodes + + + @staticmethod + def add_slivers(xml, slivers): + pass + + @staticmethod + def get_nodes_with_slivers(xml): + nodes = PGv2Node.get_nodes(xml) + nodes_with_slivers = [node for node in nodes if node['slivers']] + return nodes_with_slivers + +if __name__ == '__main__': + from sfa.rspecs.rspec import RSpec + import pdb + r = RSpec('/tmp/emulab.rspec') + r2 = RSpec(version = 'ProtoGENI') + nodes = PGv2Node.get_nodes(r.xml) + PGv2Node.add_nodes(r2.xml.root, nodes) + #pdb.set_trace() + + diff --git a/sfa/rspecs/elements/versions/pgv2Services.py b/sfa/rspecs/elements/versions/pgv2Services.py index 741fbac0..470a2e65 100644 --- a/sfa/rspecs/elements/versions/pgv2Services.py +++ b/sfa/rspecs/elements/versions/pgv2Services.py @@ -10,8 +10,8 @@ class PGv2Services: elements = { 'services': RSpecElement(RSpecElements.SERVICES, '//default:services | //services'), 'install': RSpecElement(RspecElements.INSTALL, './default:install | ./install'), - 'execute': RSpecElement(RspecElements.INSTALL, './default:execute | ./execute'), - 'login': RSpecElement(RspecElements.INSTALL, './default:login | ./login'), + 'execute': RSpecElement(RspecElements.EXECUTE, './default:execute | ./execute'), + 'login': RSpecElement(RspecElements.LOGIN, './default:login | ./login'), } @staticmethod diff --git a/sfa/rspecs/elements/versions/sfav1Node.py b/sfa/rspecs/elements/versions/sfav1Node.py index b8fe27ef..6cdf9954 100644 --- a/sfa/rspecs/elements/versions/sfav1Node.py +++ b/sfa/rspecs/elements/versions/sfav1Node.py @@ -13,6 +13,7 @@ from sfa.rspecs.elements.bwlimit import BWlimit from sfa.rspecs.elements.pl_tag import PLTag from sfa.rspecs.rspec_elements import RSpecElement, RSpecElements from sfa.rspecs.elements.versions.sfav1Network import SFAv1Network +from sfa.rspecs.elements.versions.pgv2Services import PGv2Services class SFAv1Node: @@ -55,7 +56,6 @@ class SFAv1Node: for field in Location.fields: if field in node['location'] and node['location'][field]: location_elem.set(field, node['location'][field]) - if 'interfaces' in node and node['interfaces']: i = 0 for interface in node['interfaces']: @@ -68,17 +68,23 @@ class SFAv1Node: if 'bw_unallocated' in node and node['bw_unallocated']: bw_unallocated = etree.SubElement(node_elem, 'bw_unallocated', units='kbps').text = str(int(node['bw_unallocated'])/1000) + if node.get('services'): + PGv2Services.add_services(node_elem, node.get('services')) + if 'tags' in node: for tag in node['tags']: # expose this hard wired list of tags, plus the ones that are marked 'sfa' in their category if tag['name'] in ['fcdistro', 'arch']: tag_element = etree.SubElement(node_elem, tag['name']).text=tag['value'] - if 'slivers' in node: + if node.get('slivers'): for sliver in node['slivers']: sliver_elem = etree.SubElement(node_elem, 'sliver') - if 'name' in sliver and sliver['name']: - sliver_elem.set('name', sliver['name']) + if sliver.get('sliver_id'): + sliver_id_leaf = Xrn(sliver.get('sliver_id')).get_leaf() + sliver_id_parts = sliver_id_leaf.split(':') + name = sliver_id_parts[0] + sliver_elem.set('name', name) @staticmethod def add_slivers(xml, slivers): @@ -92,7 +98,9 @@ class SFAv1Node: node = Node(node_elem.attrib, node_elem) if 'site_id' in node_elem.attrib: node['authority_id'] = node_elem.attrib['site_id'] - + if 'authority_id' in node_elem.attrib: + node['authority_id'] = node_elem.attrib['authority_id'] + # set the location location_elems = node_elem.xpath(SFAv1Node.elements['location'].path, xml.namespaces) if len(location_elems) > 0: diff --git a/sfa/rspecs/elements/versions/sfav1Sliver.py b/sfa/rspecs/elements/versions/sfav1Sliver.py index e69de29b..f12c9776 100644 --- a/sfa/rspecs/elements/versions/sfav1Sliver.py +++ b/sfa/rspecs/elements/versions/sfav1Sliver.py @@ -0,0 +1,18 @@ + +from lxml import etree + +from sfa.rspecs.elements.sliver import Sliver + +from sfa.util.xrn import Xrn +from sfa.util.plxrn import PlXrn +class SFAv1Sliver: + + @staticmethod + def add_slivers(xml, slivers): + for sliver in slivers: + sliver_elem = etree.SubElement(xml, 'sliver') + if sliver.get('component_id'): + name_full = Xrn(sliver.get('component_id')).get_leaf() + name = name_full.split(':') + sliver_elem.set('name', name) + diff --git a/sfa/rspecs/rspec_elements.py b/sfa/rspecs/rspec_elements.py index 096412e0..ce3cac76 100644 --- a/sfa/rspecs/rspec_elements.py +++ b/sfa/rspecs/rspec_elements.py @@ -2,10 +2,12 @@ from sfa.util.enumeration import Enum # recognized top level rspec elements RSpecElements = Enum( + AVAILABLE='AVAILABLE', BWLIMIT='BWLIMIT', EXECUTE='EXECUTE', NETWORK='NETWORK', COMPONENT_MANAGER='COMPONENT_MANAGER', + HARDWARE_TYPE='HARDWARE_TYPE', INSTALL='INSTALL', INTERFACE='INTERFACE', INTERFACE_REF='INTERFACE_REF', diff --git a/sfa/server/aggregate.py b/sfa/server/aggregate.py index e7340e10..90fcaf49 100644 --- a/sfa/server/aggregate.py +++ b/sfa/server/aggregate.py @@ -3,6 +3,7 @@ from sfa.util.xrn import hrn_to_urn from sfa.server.interface import Interfaces, Interface from sfa.util.config import Config +# this truly is a server-side object class Aggregate(SfaServer): ## @@ -15,9 +16,10 @@ class Aggregate(SfaServer): def __init__(self, ip, port, key_file, cert_file): SfaServer.__init__(self, ip, port, key_file, cert_file,'aggregate') -## +# # Aggregates is a dictionary of aggregate connections keyed on the aggregate hrn - +# as such it's more of a client-side thing for aggregate servers to reach their peers +# class Aggregates(Interfaces): default_dict = {'aggregates': {'aggregate': [Interfaces.default_fields]}} diff --git a/sfa/server/registry.py b/sfa/server/registry.py index 2a37c22a..bdad7df2 100644 --- a/sfa/server/registry.py +++ b/sfa/server/registry.py @@ -5,8 +5,10 @@ from sfa.server.sfaserver import SfaServer from sfa.server.interface import Interfaces, Interface from sfa.util.config import Config -## +# # Registry is a SfaServer that serves registry and slice operations at PLC. +# this truly is a server-side object +# class Registry(SfaServer): ## # Create a new registry object. @@ -19,10 +21,10 @@ class Registry(SfaServer): def __init__(self, ip, port, key_file, cert_file): SfaServer.__init__(self, ip, port, key_file, cert_file,'registry') -## -# Registries is a dictionary of registry connections keyed on the registry -# hrn - +# +# Registries is a dictionary of registry connections keyed on the registry hrn +# as such it's more of a client-side thing for registry servers to reach their peers +# class Registries(Interfaces): default_dict = {'registries': {'registry': [Interfaces.default_fields]}} diff --git a/sfa/util/xml.py b/sfa/util/xml.py index 81110506..ddb06e4f 100755 --- a/sfa/util/xml.py +++ b/sfa/util/xml.py @@ -60,10 +60,10 @@ class XML: # it hard for us to write xpath queries for the default naemspace because lxml # wont understand a None prefix. We will just associate the default namespeace # with a key named 'default'. - self.namespaces['default'] = self.namespaces[None] + self.namespaces['default'] = self.namespaces.pop(None) + else: self.namespaces['default'] = 'default' - # set schema for key in self.root.attrib.keys(): if key.endswith('schemaLocation'):