From 0b3760751f16b053758f11ac5eed23078820585f Mon Sep 17 00:00:00 2001 From: Sandrine Avakian Date: Wed, 14 Sep 2011 11:37:33 +0200 Subject: [PATCH] Added manager files (slab) --- sfa/managers/aggregate_manager_slab.py | 430 ++++++++++++++++++++++ sfa/managers/registry_manager_slab.py | 458 +++++++++++++++++++++++ sfa/managers/slice_manager_slab.py | 479 +++++++++++++++++++++++++ 3 files changed, 1367 insertions(+) create mode 100644 sfa/managers/aggregate_manager_slab.py create mode 100644 sfa/managers/registry_manager_slab.py create mode 100644 sfa/managers/slice_manager_slab.py diff --git a/sfa/managers/aggregate_manager_slab.py b/sfa/managers/aggregate_manager_slab.py new file mode 100644 index 00000000..291a65eb --- /dev/null +++ b/sfa/managers/aggregate_manager_slab.py @@ -0,0 +1,430 @@ +#!/usr/bin/python + +import datetime +import time +import traceback +import sys +import re +from types import StringTypes + +from sfa.util.faults import * +from sfa.util.xrn import get_authority, hrn_to_urn, urn_to_hrn, Xrn +from sfa.util.plxrn import slicename_to_hrn, hrn_to_pl_slicename, hostname_to_urn +from sfa.util.rspec import * +from sfa.util.specdict import * +from sfa.util.record import SfaRecord +from sfa.util.policy import Policy +from sfa.util.record import * +from sfa.util.sfaticket import SfaTicket +from sfa.plc.slices import Slices +from sfa.trust.credential import Credential +import sfa.plc.peers as peers +from sfa.plc.network import * + +from sfa.senslab.OARrestapi import * +from sfa.senslab.api import SfaAPI +#from sfa.plc.aggregate import Aggregate +from sfa.plc.slices import * +from sfa.util.version import version_core +from sfa.rspecs.rspec_version import RSpecVersion +from sfa.rspecs.sfa_rspec import sfa_rspec_version +from sfa.rspecs.pg_rspec import pg_rspec_version +from sfa.rspecs.rspec_parser import parse_rspec +from sfa.util.sfatime import utcparse +from sfa.util.callids import Callids + + +from sfa.plc.OARrspec import * +#from sfa.plc.aggregate import Aggregate + +def GetVersion(api): + xrn=Xrn(api.hrn) + supported_rspecs = [dict(pg_rspec__request_version), dict(sfa_rspec_version)] + ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)] + version_more = {'interface':'aggregate', + 'testbed':'senslab', + 'hrn':xrn.get_hrn(), + 'request_rspec_versions': request_rspec_versions, + 'ad_rspec_versions': ad_rspec_versions, + 'default_ad_rspec': dict(sfa_rspec_version) + } + return version_core(version_more) + +def __get_registry_objects(slice_xrn, creds, users): + """ + + """ + hrn, type = urn_to_hrn(slice_xrn) + + hrn_auth = get_authority(hrn) + + # Build up objects that an SFA registry would return if SFA + # could contact the slice's registry directly + reg_objects = None + + if users: + # dont allow special characters in the site login base + #only_alphanumeric = re.compile('[^a-zA-Z0-9]+') + #login_base = only_alphanumeric.sub('', hrn_auth[:20]).lower() + slicename = hrn_to_pl_slicename(hrn) + login_base = slicename.split('_')[0] + reg_objects = {} + site = {} + site['site_id'] = 0 + site['name'] = 'geni.%s' % login_base + site['enabled'] = True + site['max_slices'] = 100 + + # Note: + # Is it okay if this login base is the same as one already at this myplc site? + # Do we need uniqueness? Should use hrn_auth instead of just the leaf perhaps? + site['login_base'] = login_base + site['abbreviated_name'] = login_base + site['max_slivers'] = 1000 + reg_objects['site'] = site + + slice = {} + + extime = Credential(string=creds[0]).get_expiration() + # If the expiration time is > 60 days from now, set the expiration time to 60 days from now + if extime > datetime.datetime.utcnow() + datetime.timedelta(days=60): + extime = datetime.datetime.utcnow() + datetime.timedelta(days=60) + slice['expires'] = int(time.mktime(extime.timetuple())) + slice['hrn'] = hrn + slice['name'] = hrn_to_pl_slicename(hrn) + slice['url'] = hrn + slice['description'] = hrn + slice['pointer'] = 0 + reg_objects['slice_record'] = slice + + reg_objects['users'] = {} + for user in users: + user['key_ids'] = [] + hrn, _ = urn_to_hrn(user['urn']) + user['email'] = hrn_to_pl_slicename(hrn) + "@geni.net" + user['first_name'] = hrn + user['last_name'] = hrn + reg_objects['users'][user['email']] = user + + return reg_objects + +def __get_hostnames(nodes): + hostnames = [] + for node in nodes: + hostnames.append(node.hostname) + return hostnames + +def SliverStatus(api, slice_xrn, creds, call_id): + if Callids().already_handled(call_id): return {} + + (hrn, type) = urn_to_hrn(slice_xrn) + # find out where this slice is currently running + api.logger.info(hrn) + slicename = hrn_to_pl_slicename(hrn) + + slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires']) + if len(slices) == 0: + raise Exception("Slice %s not found (used %s as slicename internally)" % slice_xrn, slicename) + slice = slices[0] + + # report about the local nodes only + nodes = api.plshell.GetNodes(api.plauth, {'node_id':slice['node_ids'],'peer_id':None}, + ['hostname', 'site_id', 'boot_state', 'last_contact']) + site_ids = [node['site_id'] for node in nodes] + sites = api.plshell.GetSites(api.plauth, site_ids, ['site_id', 'login_base']) + sites_dict = dict ( [ (site['site_id'],site['login_base'] ) for site in sites ] ) + + result = {} + top_level_status = 'unknown' + if nodes: + top_level_status = 'ready' + result['geni_urn'] = Xrn(slice_xrn, 'slice').get_urn() + result['pl_login'] = slice['name'] + result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime() + + resources = [] + for node in nodes: + res = {} + res['pl_hostname'] = node['hostname'] + res['pl_boot_state'] = node['boot_state'] + res['pl_last_contact'] = node['last_contact'] + if node['last_contact'] is not None: + res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime() + res['geni_urn'] = hostname_to_urn(api.hrn, sites_dict[node['site_id']], node['hostname']) + if node['boot_state'] == 'boot': + res['geni_status'] = 'ready' + else: + res['geni_status'] = 'failed' + top_level_staus = 'failed' + + res['geni_error'] = '' + + resources.append(res) + + result['geni_status'] = top_level_status + result['geni_resources'] = resources + # XX remove me + #api.logger.info(result) + # XX remove me + return result + +def CreateSliver(api, slice_xrn, creds, rspec_string, users, call_id): + """ + Create the sliver[s] (slice) at this aggregate. + Verify HRN and initialize the slice record in PLC if necessary. + """ + if Callids().already_handled(call_id): return "" + + reg_objects = __get_registry_objects(slice_xrn, creds, users) + + (hrn, type) = urn_to_hrn(slice_xrn) + peer = None + aggregate = Aggregate(api) + slices = Slices(api) + peer = slices.get_peer(hrn) + sfa_peer = slices.get_sfa_peer(hrn) + registry = api.registries[api.hrn] + credential = api.getCredential() + (site_id, remote_site_id) = slices.verify_site(registry, credential, hrn, + peer, sfa_peer, reg_objects) + + slice = slices.verify_slice(registry, credential, hrn, site_id, + remote_site_id, peer, sfa_peer, reg_objects) + + nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'], ['hostname']) + current_slivers = [node['hostname'] for node in nodes] + rspec = parse_rspec(rspec_string) + requested_slivers = [str(host) for host in rspec.get_nodes_with_slivers()] + # remove nodes not in rspec + deleted_nodes = list(set(current_slivers).difference(requested_slivers)) + + # add nodes from rspec + added_nodes = list(set(requested_slivers).difference(current_slivers)) + + try: + if peer: + api.plshell.UnBindObjectFromPeer(api.plauth, 'slice', slice['slice_id'], peer) + + api.plshell.AddSliceToNodes(api.plauth, slice['name'], added_nodes) + api.plshell.DeleteSliceFromNodes(api.plauth, slice['name'], deleted_nodes) + + # TODO: update slice tags + #network.updateSliceTags() + + finally: + if peer: + api.plshell.BindObjectToPeer(api.plauth, 'slice', slice.id, peer, + slice.peer_id) + + return aggregate.get_rspec(slice_xrn=slice_xrn, version=rspec.version) + + +def RenewSliver(api, xrn, creds, expiration_time, call_id): + if Callids().already_handled(call_id): return True + (hrn, type) = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice = slices[0] + requested_time = utcparse(expiration_time) + record = {'expires': int(time.mktime(requested_time.timetuple()))} + try: + api.plshell.UpdateSlice(api.plauth, slice['slice_id'], record) + return True + except: + return False + +def start_slice(api, xrn, creds): + hrn, type = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice_id = slices[0]['slice_id'] + slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id']) + # just remove the tag if it exists + if slice_tags: + api.plshell.DeleteSliceTag(api.plauth, slice_tags[0]['slice_tag_id']) + + return 1 + +def stop_slice(api, xrn, creds): + hrn, type = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id']) + if not slices: + raise RecordNotFound(hrn) + slice_id = slices[0]['slice_id'] + slice_tags = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'tagname': 'enabled'}) + if not slice_tags: + api.plshell.AddSliceTag(api.plauth, slice_id, 'enabled', '0') + elif slice_tags[0]['value'] != "0": + tag_id = attributes[0]['slice_tag_id'] + api.plshell.UpdateSliceTag(api.plauth, tag_id, '0') + return 1 + +def reset_slice(api, xrn): + # XX not implemented at this interface + return 1 + +def DeleteSliver(api, xrn, creds, call_id): + if Callids().already_handled(call_id): return "" + (hrn, type) = urn_to_hrn(xrn) + slicename = hrn_to_pl_slicename(hrn) + slices = api.plshell.GetSlices(api.plauth, {'name': slicename}) + if not slices: + return 1 + slice = slices[0] + + # determine if this is a peer slice + peer = peers.get_peer(api, hrn) + try: + if peer: + api.plshell.UnBindObjectFromPeer(api.plauth, 'slice', slice['slice_id'], peer) + api.plshell.DeleteSliceFromNodes(api.plauth, slicename, slice['node_ids']) + finally: + if peer: + api.plshell.BindObjectToPeer(api.plauth, 'slice', slice['slice_id'], peer, slice['peer_slice_id']) + return 1 + +# xxx Thierry : caching at the aggregate level sounds wrong... +#caching=True +caching=False +def ListSlices(api, creds, call_id): + if Callids().already_handled(call_id): return [] + # look in cache first + if caching and api.cache: + slices = api.cache.get('slices') + if slices: + return slices + + # get data from db + slices = api.plshell.GetSlices(api.plauth, {'peer_id': None}, ['name']) + slice_hrns = [slicename_to_hrn(api.hrn, slice['name']) for slice in slices] + slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns] + + # cache the result + if caching and api.cache: + api.cache.add('slices', slice_urns) + + return slice_urns + + #ajouter caching cf pl manager +def ListResources(api, creds, options,call_id): + + print >>sys.stderr, 'RESOURCES AGGREGATE' + OARImporter = OARapi() + + if Callids().already_handled(call_id): return "" + # get slice's hrn from options + xrn = options.get('geni_slice_urn', '') + (hrn, type) = urn_to_hrn(xrn) + + # get the rspec's return format from options + rspec_version = RSpecVersion(options.get('rspec_version')) + version_string = "rspec_%s" % (rspec_version.get_version_name()) + + #panos adding the info option to the caching key (can be improved) + if options.get('info'): + version_string = version_string + "_"+options.get('info') + + print >>sys.stderr, "[aggregate] version string = %s "%(version_string) + + rspec = None + + if rspec_version['type'].lower() == 'protogeni': + spec = PGRSpec() + #panos pass user options to SfaRSpec + elif rspec_version['type'].lower() == 'sfa': + rspec = SfaRSpec("",{},options) + else: + rspec = SfaRSpec("",{},options) + + + OAR_rspec = OARrspec() + + rspec = OAR_rspec.get_rspec(slice_xrn=xrn, version=rspec_version) + print >>sys.stderr, '\r\n OARImporter.GetNodes()', OARImporter.GetNodes() + + print >>sys.stderr, ' \r\n **************RSPEC' , rspec + + + return rspec + + +def get_ticket(api, xrn, creds, rspec, users): + + reg_objects = __get_registry_objects(xrn, creds, users) + + slice_hrn, type = urn_to_hrn(xrn) + slices = Slices(api) + peer = slices.get_peer(slice_hrn) + sfa_peer = slices.get_sfa_peer(slice_hrn) + + # get the slice record + registry = api.registries[api.hrn] + credential = api.getCredential() + records = registry.Resolve(xrn, credential) + + # similar to CreateSliver, we must verify that the required records exist + # at this aggregate before we can issue a ticket + site_id, remote_site_id = slices.verify_site(registry, credential, slice_hrn, + peer, sfa_peer, reg_objects) + slice = slices.verify_slice(registry, credential, slice_hrn, site_id, + remote_site_id, peer, sfa_peer, reg_objects) + + # make sure we get a local slice record + record = None + for tmp_record in records: + if tmp_record['type'] == 'slice' and \ + not tmp_record['peer_authority']: + record = SliceRecord(dict=tmp_record) + if not record: + raise RecordNotFound(slice_hrn) + + # get sliver info + slivers = Slices(api).get_slivers(slice_hrn) + if not slivers: + raise SliverDoesNotExist(slice_hrn) + + # get initscripts + initscripts = [] + data = { + 'timestamp': int(time.time()), + 'initscripts': initscripts, + 'slivers': slivers + } + + # create the ticket + object_gid = record.get_gid_object() + new_ticket = SfaTicket(subject = object_gid.get_subject()) + new_ticket.set_gid_caller(api.auth.client_gid) + new_ticket.set_gid_object(object_gid) + new_ticket.set_issuer(key=api.key, subject=api.hrn) + new_ticket.set_pubkey(object_gid.get_pubkey()) + new_ticket.set_attributes(data) + new_ticket.set_rspec(rspec) + #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) + new_ticket.encode() + new_ticket.sign() + + return new_ticket.save_to_string(save_parents=True) + + + +def main(): + api = SfaAPI() + """ + rspec = ListResources(api, "plc.princeton.sapan", None, 'pl_test_sapan') + #rspec = ListResources(api, "plc.princeton.coblitz", None, 'pl_test_coblitz') + #rspec = ListResources(api, "plc.pl.sirius", None, 'pl_test_sirius') + print rspec + """ + f = open(sys.argv[1]) + xml = f.read() + f.close() + CreateSliver(api, "plc.princeton.sapan", xml, 'CreateSliver_sapan') + +if __name__ == "__main__": + main() diff --git a/sfa/managers/registry_manager_slab.py b/sfa/managers/registry_manager_slab.py new file mode 100644 index 00000000..622f2410 --- /dev/null +++ b/sfa/managers/registry_manager_slab.py @@ -0,0 +1,458 @@ +import types +import time +import sys + +from sfa.util.faults import * +from sfa.util.prefixTree import prefixTree +from sfa.util.record import SfaRecord +from sfa.util.table import SfaTable +from sfa.util.record import SfaRecord +from sfa.trust.gid import GID +from sfa.util.xrn import Xrn, get_leaf, get_authority, hrn_to_urn, urn_to_hrn +from sfa.util.plxrn import hrn_to_pl_login_base +from sfa.trust.credential import Credential +from sfa.trust.certificate import Certificate, Keypair +from sfa.trust.gid import create_uuid +from sfa.util.version import version_core + +# The GENI GetVersion call +def GetVersion(api): + peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.registries.iteritems() + if peername != api.hrn]) + xrn=Xrn(api.hrn) + return version_core({'interface':'registry', + 'hrn':xrn.get_hrn(), + 'urn':xrn.get_urn(), + 'peers':peers}) + +def get_credential(api, xrn, type, is_self=False): + # convert xrn to hrn + if type: + hrn = urn_to_hrn(xrn)[0] + else: + hrn, type = urn_to_hrn(xrn) + + # Is this a root or sub authority + auth_hrn = api.auth.get_authority(hrn) + print>> sys.stderr , " \r\n REGISTRY get_credential auth_hrn" , auth_hrn + if not auth_hrn or hrn == api.config.SFA_INTERFACE_HRN: + auth_hrn = hrn + # get record info + auth_info = api.auth.get_auth_info(auth_hrn) + table = SfaTable() + records = table.findObjects({'type': type, 'hrn': hrn}) + print>> sys.stderr , " \r\n ++ REGISTRY get_credential hrn %s records %s " %(hrn, records) + if not records: + raise RecordNotFound(hrn) + record = records[0] + + # verify_cancreate_credential requires that the member lists + # (researchers, pis, etc) be filled in + #api.fill_record_info(record) + record['enabled'] = True + print>> sys.stderr , " \r\n ++ REGISTRY get_credential hrn %s record['enabled'] %s is_self %s" %(hrn, record['enabled'], is_self) + if record['type']=='user': + if not record['enabled']: + print>> sys.stderr , " \r\n ++ REGISTRY get_credential hrn %s ACCOUNT Not enabled" + raise AccountNotEnabled(": PlanetLab account %s is not enabled. Please contact your site PI" %(record['email'])) + + # get the callers gid + # if this is a self cred the record's gid is the caller's gid + if is_self: + caller_hrn = hrn + caller_gid = record.get_gid_object() + else: + print>> sys.stderr , " \r\n ++ ELSE " + caller_gid = api.auth.client_cred.get_gid_caller() + print>> sys.stderr , " \r\n ++ ELSE caller_gid %s record %s" %(caller_gid, record) + caller_hrn = caller_gid.get_hrn() + print>> sys.stderr , " \r\n ++ ELSE caller_hrn %s " %(caller_hrn) + + object_hrn = record.get_gid_object().get_hrn() + print>> sys.stderr , " \r\n ++ ELSE object_hrn %s " %(object_hrn) + + rights = api.auth.determine_user_rights(caller_hrn, record) + print>> sys.stderr , " \r\n ++ After rights record: %s " %(record ) + + # make sure caller has rights to this object + if rights.is_empty(): + raise PermissionError(caller_hrn + " has no rights to " + record['name']) + + object_gid = GID(string=record['gid']) + new_cred = Credential(subject = object_gid.get_subject()) + new_cred.set_gid_caller(caller_gid) + new_cred.set_gid_object(object_gid) + new_cred.set_issuer_keys(auth_info.get_privkey_filename(), auth_info.get_gid_filename()) + #new_cred.set_pubkey(object_gid.get_pubkey()) + new_cred.set_privileges(rights) + new_cred.get_privileges().delegate_all_privileges(True) + if 'expires' in record: + new_cred.set_expiration(int(record['expires'])) + auth_kind = "authority,ma,sa" + # Parent not necessary, verify with certs + #new_cred.set_parent(api.auth.hierarchy.get_auth_cred(auth_hrn, kind=auth_kind)) + new_cred.encode() + new_cred.sign() + + return new_cred.save_to_string(save_parents=True) + + +def resolve(api, xrns, type=None, full=True): + + # load all known registry names into a prefix tree and attempt to find + # the longest matching prefix + print >>sys.stderr , '\t\t REGISTRY MANAGER : resolve=========xrns ', xrns + if not isinstance(xrns, types.ListType): + if not type: + type = Xrn(xrns).get_type() + xrns = [xrns] + hrns = [urn_to_hrn(xrn)[0] for xrn in xrns] + print >>sys.stderr , '\t\t =========hrns ', hrns + # create a dict where key is a registry hrn and its value is a + # hrns at that registry (determined by the known prefix tree). + xrn_dict = {} + print >>sys.stderr, '\r\n REGISTRY MANAGER : resolve xrns ' , xrns #api.__dict__.keys() + registries = api.registries + tree = prefixTree() + registry_hrns = registries.keys() + print >>sys.stderr, '\r\n \t\t REGISTRY MANAGER registry_hrns' , registry_hrns + tree.load(registry_hrns) + for xrn in xrns: + registry_hrn = tree.best_match(urn_to_hrn(xrn)[0]) + print >>sys.stderr, '\t\tREGISTRY MANAGER *****tree.best_match ', registry_hrn + if registry_hrn not in xrn_dict: + xrn_dict[registry_hrn] = [] + xrn_dict[registry_hrn].append(xrn) + print >>sys.stderr, '\t\tREGISTRY MANAGER *****xrn_dict[registry_hrn] ',xrn_dict[registry_hrn] + records = [] + + for registry_hrn in xrn_dict: + # skip the hrn without a registry hrn + # XX should we let the user know the authority is unknown? + print >>sys.stderr, '\t\t registry_hrn in xrn_dict ', registry_hrn + if not registry_hrn: + continue + + # if the best match (longest matching hrn) is not the local registry, + # forward the request + xrns = xrn_dict[registry_hrn] + if registry_hrn != api.hrn: + credential = api.getCredential() + peer_records = registries[registry_hrn].Resolve(xrns, credential) + print >>sys.stderr , '\t\t peer_records ', peer_records + records.extend([SfaRecord(dict=record).as_dict() for record in peer_records]) + + print >>sys.stderr,'\t\t hrns ' , hrns + # try resolving the remaining unfound records at the local registry + remaining_hrns = set(hrns).difference([record['hrn'] for record in records]) + + # convert set to list + remaining_hrns = [hrn for hrn in remaining_hrns] + print >>sys.stderr, '\t\t remaining_hrns', remaining_hrns + table = SfaTable() + local_records = table.findObjects({'hrn': remaining_hrns}) + + print >>sys.stderr, '\t\t LOCAL REC !', local_records + for rec in local_records: + print >>sys.stderr, '\t\t resolve regmanager : rec ', rec + + if full: + print >>sys.stderr, '\r\n \r\n REGISTRY:_FULL', api + api.fill_record_info(local_records) + + + # convert local record objects to dicts + records.extend([dict(record) for record in local_records]) + print >>sys.stderr, "\r\n \t\t records extends %s" %(records) + if not records: + raise RecordNotFound(str(hrns)) + + if type: + records = filter(lambda rec: rec['type'] in [type], records) + + return records + +def list(api, xrn, origin_hrn=None): + hrn, type = urn_to_hrn(xrn) + # load all know registry names into a prefix tree and attempt to find + # the longest matching prefix + records = [] + registries = api.registries + registry_hrns = registries.keys() + tree = prefixTree() + tree.load(registry_hrns) + registry_hrn = tree.best_match(hrn) + + #if there was no match then this record belongs to an unknow registry + if not registry_hrn: + raise MissingAuthority(xrn) + + # if the best match (longest matching hrn) is not the local registry, + # forward the request + records = [] + if registry_hrn != api.hrn: + credential = api.getCredential() + record_list = registries[registry_hrn].List(xrn, credential) + records = [SfaRecord(dict=record).as_dict() for record in record_list] + + # if we still have not found the record yet, try the local registry + if not records: + if not api.auth.hierarchy.auth_exists(hrn): + raise MissingAuthority(hrn) + + table = SfaTable() + records = table.find({'authority': hrn}) + + return records + + +def register(api, record): + + print>>sys.stderr, " \r\n \r\n ----------- registry_manager_sl register hrn %s"%(hrn) + hrn, type = record['hrn'], record['type'] + urn = hrn_to_urn(hrn,type) + # validate the type + if type not in ['authority', 'slice', 'node', 'user']: + raise UnknownSfaType(type) + + # check if record already exists + table = SfaTable() + existing_records = table.find({'type': type, 'hrn': hrn}) + if existing_records: + raise ExistingRecord(hrn) + + record = SfaRecord(dict = record) + record['authority'] = get_authority(record['hrn']) + type = record['type'] + hrn = record['hrn'] + api.auth.verify_object_permission(hrn) + auth_info = api.auth.get_auth_info(record['authority']) + pub_key = None + # make sure record has a gid + if 'gid' not in record: + uuid = create_uuid() + pkey = Keypair(create=True) + if 'key' in record and record['key']: + if isinstance(record['key'], types.ListType): + pub_key = record['key'][0] + else: + pub_key = record['key'] + pkey = convert_public_key(pub_key) + + gid_object = api.auth.hierarchy.create_gid(urn, uuid, pkey) + gid = gid_object.save_to_string(save_parents=True) + record['gid'] = gid + record.set_gid(gid) + + if type in ["authority"]: + # update the tree + if not api.auth.hierarchy.auth_exists(hrn): + api.auth.hierarchy.create_auth(hrn_to_urn(hrn,'authority')) + + # get the GID from the newly created authority + gid = auth_info.get_gid_object() + record.set_gid(gid.save_to_string(save_parents=True)) + pl_record = api.sfa_fields_to_pl_fields(type, hrn, record) + sites = api.plshell.GetSites(api.plauth, [pl_record['login_base']]) + if not sites: + pointer = api.plshell.AddSite(api.plauth, pl_record) + else: + pointer = sites[0]['site_id'] + + record.set_pointer(pointer) + record['pointer'] = pointer + + elif (type == "slice"): + acceptable_fields=['url', 'instantiation', 'name', 'description'] + pl_record = api.sfa_fields_to_pl_fields(type, hrn, record) + for key in pl_record.keys(): + if key not in acceptable_fields: + pl_record.pop(key) + slices = api.plshell.GetSlices(api.plauth, [pl_record['name']]) + if not slices: + pointer = api.plshell.AddSlice(api.plauth, pl_record) + else: + pointer = slices[0]['slice_id'] + record.set_pointer(pointer) + record['pointer'] = pointer + + elif (type == "user"): + persons = api.plshell.GetPersons(api.plauth, [record['email']]) + if not persons: + pointer = api.plshell.AddPerson(api.plauth, dict(record)) + else: + pointer = persons[0]['person_id'] + + if 'enabled' in record and record['enabled']: + api.plshell.UpdatePerson(api.plauth, pointer, {'enabled': record['enabled']}) + # add this persons to the site only if he is being added for the first + # time by sfa and doesont already exist in plc + if not persons or not persons[0]['site_ids']: + login_base = get_leaf(record['authority']) + api.plshell.AddPersonToSite(api.plauth, pointer, login_base) + + # What roles should this user have? + api.plshell.AddRoleToPerson(api.plauth, 'user', pointer) + # Add the user's key + if pub_key: + api.plshell.AddPersonKey(api.plauth, pointer, {'key_type' : 'ssh', 'key' : pub_key}) + + elif (type == "node"): + pl_record = api.sfa_fields_to_pl_fields(type, hrn, record) + login_base = hrn_to_pl_login_base(record['authority']) + nodes = api.plshell.GetNodes(api.plauth, [pl_record['hostname']]) + if not nodes: + pointer = api.plshell.AddNode(api.plauth, login_base, pl_record) + else: + pointer = nodes[0]['node_id'] + + record['pointer'] = pointer + record.set_pointer(pointer) + record_id = table.insert(record) + record['record_id'] = record_id + + # update membership for researchers, pis, owners, operators + api.update_membership(None, record) + + return record.get_gid_object().save_to_string(save_parents=True) + +def update(api, record_dict): + new_record = SfaRecord(dict = record_dict) + type = new_record['type'] + hrn = new_record['hrn'] + urn = hrn_to_urn(hrn,type) + api.auth.verify_object_permission(hrn) + table = SfaTable() + # make sure the record exists + records = table.findObjects({'type': type, 'hrn': hrn}) + if not records: + raise RecordNotFound(hrn) + record = records[0] + record['last_updated'] = time.gmtime() + + # Update_membership needs the membership lists in the existing record + # filled in, so it can see if members were added or removed + api.fill_record_info(record) + + # Use the pointer from the existing record, not the one that the user + # gave us. This prevents the user from inserting a forged pointer + pointer = record['pointer'] + # update the PLC information that was specified with the record + + if (type == "authority"): + api.plshell.UpdateSite(api.plauth, pointer, new_record) + + elif type == "slice": + pl_record=api.sfa_fields_to_pl_fields(type, hrn, new_record) + if 'name' in pl_record: + pl_record.pop('name') + api.plshell.UpdateSlice(api.plauth, pointer, pl_record) + + elif type == "user": + # SMBAKER: UpdatePerson only allows a limited set of fields to be + # updated. Ideally we should have a more generic way of doing + # this. I copied the field names from UpdatePerson.py... + update_fields = {} + all_fields = new_record + for key in all_fields.keys(): + if key in ['first_name', 'last_name', 'title', 'email', + 'password', 'phone', 'url', 'bio', 'accepted_aup', + 'enabled']: + update_fields[key] = all_fields[key] + api.plshell.UpdatePerson(api.plauth, pointer, update_fields) + + if 'key' in new_record and new_record['key']: + # must check this key against the previous one if it exists + persons = api.plshell.GetPersons(api.plauth, [pointer], ['key_ids']) + person = persons[0] + keys = person['key_ids'] + keys = api.plshell.GetKeys(api.plauth, person['key_ids']) + key_exists = False + if isinstance(new_record['key'], types.ListType): + new_key = new_record['key'][0] + else: + new_key = new_record['key'] + + # Delete all stale keys + for key in keys: + if new_record['key'] != key['key']: + api.plshell.DeleteKey(api.plauth, key['key_id']) + else: + key_exists = True + if not key_exists: + api.plshell.AddPersonKey(api.plauth, pointer, {'key_type': 'ssh', 'key': new_key}) + + # update the openssl key and gid + pkey = convert_public_key(new_key) + uuid = create_uuid() + gid_object = api.auth.hierarchy.create_gid(urn, uuid, pkey) + gid = gid_object.save_to_string(save_parents=True) + record['gid'] = gid + record = SfaRecord(dict=record) + table.update(record) + + elif type == "node": + api.plshell.UpdateNode(api.plauth, pointer, new_record) + + else: + raise UnknownSfaType(type) + + # update membership for researchers, pis, owners, operators + api.update_membership(record, new_record) + + return 1 + +# expecting an Xrn instance +def remove(api, xrn, origin_hrn=None): + + table = SfaTable() + filter = {'hrn': xrn.get_hrn()} + hrn=xrn.get_hrn() + type=xrn.get_type() + if type and type not in ['all', '*']: + filter['type'] = type + + records = table.find(filter) + if not records: raise RecordNotFound(hrn) + record = records[0] + type = record['type'] + + credential = api.getCredential() + registries = api.registries + + # Try to remove the object from the PLCDB of federated agg. + # This is attempted before removing the object from the local agg's PLCDB and sfa table + if hrn.startswith(api.hrn) and type in ['user', 'slice', 'authority']: + for registry in registries: + if registry not in [api.hrn]: + try: + result=registries[registry].remove_peer_object(credential, record, origin_hrn) + except: + pass + if type == "user": + persons = api.plshell.GetPersons(api.plauth, record['pointer']) + # only delete this person if he has site ids. if he doesnt, it probably means + # he was just removed from a site, not actually deleted + if persons and persons[0]['site_ids']: + api.plshell.DeletePerson(api.plauth, record['pointer']) + elif type == "slice": + if api.plshell.GetSlices(api.plauth, record['pointer']): + api.plshell.DeleteSlice(api.plauth, record['pointer']) + elif type == "node": + if api.plshell.GetNodes(api.plauth, record['pointer']): + api.plshell.DeleteNode(api.plauth, record['pointer']) + elif type == "authority": + if api.plshell.GetSites(api.plauth, record['pointer']): + api.plshell.DeleteSite(api.plauth, record['pointer']) + else: + raise UnknownSfaType(type) + + table.remove(record) + + return 1 + +def remove_peer_object(api, record, origin_hrn=None): + pass + +def register_peer_object(api, record, origin_hrn=None): + pass diff --git a/sfa/managers/slice_manager_slab.py b/sfa/managers/slice_manager_slab.py new file mode 100644 index 00000000..803c2ceb --- /dev/null +++ b/sfa/managers/slice_manager_slab.py @@ -0,0 +1,479 @@ +# +import sys +import time,datetime +from StringIO import StringIO +from types import StringTypes +from copy import deepcopy +from copy import copy +from lxml import etree + +from sfa.util.sfalogging import sfa_logger +from sfa.util.rspecHelper import merge_rspecs +from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn +from sfa.util.plxrn import hrn_to_pl_slicename +from sfa.util.rspec import * +from sfa.util.specdict import * +from sfa.util.faults import * +from sfa.util.record import SfaRecord +from sfa.rspecs.pg_rspec import PGRSpec +from sfa.rspecs.sfa_rspec import SfaRSpec +from sfa.rspecs.rspec_converter import RSpecConverter +from sfa.rspecs.rspec_parser import parse_rspec +from sfa.rspecs.rspec_version import RSpecVersion +from sfa.rspecs.sfa_rspec import sfa_rspec_version +from sfa.rspecs.pg_rspec import pg_rspec_version +from sfa.util.policy import Policy +from sfa.util.prefixTree import prefixTree +from sfa.util.sfaticket import * +from sfa.trust.credential import Credential +from sfa.util.threadmanager import ThreadManager +import sfa.util.xmlrpcprotocol as xmlrpcprotocol +import sfa.plc.peers as peers +from sfa.util.version import version_core +from sfa.util.callids import Callids + +# we have specialized xmlrpclib.ServerProxy to remember the input url +# OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances +def get_serverproxy_url (server): + try: + return server.url + except: + sfa_logger().warning("ParseVersion, falling back to xmlrpclib.ServerProxy internals") + return server._ServerProxy__host + server._ServerProxy__handler + +def ParseVersion(api): + # peers explicitly in aggregates.xml + peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + if peername != api.hrn]) + xrn=Xrn (api.hrn) + supported_rspecs = [dict(pg_rspec_version), dict(sfa_rspec_version)] + version_more = {'interface':'slicemgr', + 'hrn' : xrn.get_hrn(), + 'urn' : xrn.get_urn(), + 'peers': peers, + 'request_rspec_versions': supported_rspecs, + 'ad_rspec_versions': supported_rspecs, + 'default_ad_rspec': dict(sfa_rspec_version) + } + sm_version=version_core(version_more) + # local aggregate if present needs to have localhost resolved + if api.hrn in api.aggregates: + local_am_url=get_serverproxy_url(api.aggregates[api.hrn]) + sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) + return sm_version + +def CreateSliver(api, xrn, creds, rspec_str, users, call_id): + + def _CreateSliver(aggregate, xrn, credential, rspec, users, call_id): + # Need to call ParseVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + # The Aggregate's verion info is cached + server = api.aggregates[aggregate] + # get cached aggregate version + aggregate_version_key = 'version_'+ aggregate + aggregate_version = api.cache.get(aggregate_version_key) + if not aggregate_version: + # get current aggregate version anc cache it for 24 hours + aggregate_version = server.ParseVersion() + api.cache.add(aggregate_version_key, aggregate_version, 60 * 60 * 24) + + if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version: + # sfa aggregtes support both sfa and pg rspecs, no need to convert + # if aggregate supports sfa rspecs. othewise convert to pg rspec + rspec = RSpecConverter.to_pg_rspec(rspec) + + return server.CreateSliver(xrn, credential, rspec, users, call_id) + + + if Callids().already_handled(call_id): return "" + + # Validate the RSpec against PlanetLab's schema --disabled for now + # The schema used here needs to aggregate the PL and VINI schemas + # schema = "/var/www/html/schemas/pl.rng" + rspec = parse_rspec(rspec_str) + schema = None + if schema: + rspec.validate(schema) + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + + # get the callers hrn + hrn, type = urn_to_hrn(xrn) + valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + + # Just send entire RSpec to each aggregate + threads.run(_CreateSliver, aggregate, xrn, credential, rspec.toxml(), users, call_id) + + results = threads.get_results() + rspec = SfaRSpec() + for result in results: + rspec.merge(result) + return rspec.toxml() + +def RenewSliver(api, xrn, creds, expiration_time, call_id): + if Callids().already_handled(call_id): return True + + (hrn, type) = urn_to_hrn(xrn) + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + + server = api.aggregates[aggregate] + threads.run(server.RenewSliver, xrn, [credential], expiration_time, call_id) + # 'and' the results + return reduce (lambda x,y: x and y, threads.get_results() , True) + +def get_ticket(api, xrn, creds, rspec, users): + slice_hrn, type = urn_to_hrn(xrn) + # get the netspecs contained within the clients rspec + aggregate_rspecs = {} + tree= etree.parse(StringIO(rspec)) + elements = tree.findall('./network') + for element in elements: + aggregate_hrn = element.values()[0] + aggregate_rspecs[aggregate_hrn] = rspec + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems(): + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + server = None + if aggregate in api.aggregates: + server = api.aggregates[aggregate] + else: + net_urn = hrn_to_urn(aggregate, 'authority') + # we may have a peer that knows about this aggregate + for agg in api.aggregates: + target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn) + if not target_aggs or not 'hrn' in target_aggs[0]: + continue + # send the request to this address + url = target_aggs[0]['url'] + server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file) + # aggregate found, no need to keep looping + break + if server is None: + continue + threads.run(server.ParseTicket, xrn, credential, aggregate_rspec, users) + + results = threads.get_results() + + # gather information from each ticket + rspecs = [] + initscripts = [] + slivers = [] + object_gid = None + for result in results: + agg_ticket = SfaTicket(string=result) + attrs = agg_ticket.get_attributes() + if not object_gid: + object_gid = agg_ticket.get_gid_object() + rspecs.append(agg_ticket.get_rspec()) + initscripts.extend(attrs.get('initscripts', [])) + slivers.extend(attrs.get('slivers', [])) + + # merge info + attributes = {'initscripts': initscripts, + 'slivers': slivers} + merged_rspec = merge_rspecs(rspecs) + + # create a new ticket + ticket = SfaTicket(subject = slice_hrn) + ticket.set_gid_caller(api.auth.client_gid) + ticket.set_issuer(key=api.key, subject=api.hrn) + ticket.set_gid_object(object_gid) + ticket.set_pubkey(object_gid.get_pubkey()) + #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) + ticket.set_attributes(attributes) + ticket.set_rspec(merged_rspec) + ticket.encode() + ticket.sign() + return ticket.save_to_string(save_parents=True) + + +def DeleteSliver(api, xrn, creds, call_id): + if Callids().already_handled(call_id): return "" + (hrn, type) = urn_to_hrn(xrn) + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + server = api.aggregates[aggregate] + threads.run(server.DeleteSliver, xrn, credential, call_id) + threads.get_results() + return 1 + +def start_slice(api, xrn, creds): + hrn, type = urn_to_hrn(xrn) + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + server = api.aggregates[aggregate] + threads.run(server.Start, xrn, credential) + threads.get_results() + return 1 + +def stop_slice(api, xrn, creds): + hrn, type = urn_to_hrn(xrn) + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + server = api.aggregates[aggregate] + threads.run(server.Stop, xrn, credential) + threads.get_results() + return 1 + +def reset_slice(api, xrn): + """ + Not implemented + """ + return 1 + +def shutdown(api, xrn, creds): + """ + Not implemented + """ + return 1 + +def status(api, xrn, creds): + """ + Not implemented + """ + return 1 + +# Thierry : caching at the slicemgr level makes sense to some extent +#caching=True +caching=False +def ListSlices(api, creds, call_id): + + if Callids().already_handled(call_id): return [] + + # look in cache first + if caching and api.cache: + slices = api.cache.get('slices') + if slices: + return slices + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + # fetch from aggregates + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + server = api.aggregates[aggregate] + threads.run(server.ListSlices, credential, call_id) + + # combime results + results = threads.get_results() + slices = [] + for result in results: + slices.extend(result) + + # cache the result + if caching and api.cache: + api.cache.add('slices', slices) + + return slices + + +def ListResources(api, creds, options, call_id): + + if Callids().already_handled(call_id): return "" + + # get slice's hrn from options + xrn = options.get('geni_slice_urn', '') + (hrn, type) = urn_to_hrn(xrn) + print >>sys.stderr, " SM_ListResources xrn " , xrn + #print >>sys.stderr, " SM ListResources api.__dict__ " , api.__dict__.keys() + #print >>sys.stderr, " SM ListResources dir(api)" , dir(api) + print >>sys.stderr, " \r\n avant RspecVersion \r\n \r\n" + # get the rspec's return format from options + rspec_version = RSpecVersion(options.get('rspec_version')) + print >>sys.stderr, " \r\n \r\n ListResources RSpecVersion ", rspec_version + version_string = "rspec_%s" % (rspec_version.get_version_name()) + + #panos adding the info option to the caching key (can be improved) + if options.get('info'): + version_string = version_string + "_"+options.get('info') + + print>>sys.stderr,"version string = ",version_string + + # look in cache first + if caching and api.cache and not xrn: + print>>sys.stderr," \r\n caching %s and api.cache %s and not xrn %s"%(caching , api.cache,xrn) + rspec = api.cache.get(version_string) + if rspec: + return rspec + + # get the callers hrn + print >>sys.stderr, " SM ListResources get the callers hrn " + valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + print >>sys.stderr, " \r\n SM ListResources get the callers caller_hrn hrn %s "%(caller_hrn) + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + print >>sys.stderr, " \r\n SM ListResources get the callers credential %s "%(credential) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + print >>sys.stderr, " \r\n SM ListResources get the callers api.aggregates %s "%(api.aggregates) + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + # get the rspec from the aggregate + server = api.aggregates[aggregate] + print >>sys.stderr, " Slice Mgr ListResources, server" ,server + my_opts = copy(options) + my_opts['geni_compressed'] = False + threads.run(server.ListResources, credential, my_opts, call_id) + print >>sys.stderr, "\r\n !!!!!!!!!!!!!!!! \r\n" + results = threads.get_results() + #results.append(open('/root/protogeni.rspec', 'r').read()) + rspec_version = RSpecVersion(my_opts.get('rspec_version')) + if rspec_version['type'].lower() == 'protogeni': + rspec = PGRSpec() + else: + rspec = SfaRSpec() + + for result in results: + print >>sys.stderr, "\r\n slice_manager result" , result + try: + print >>sys.stderr, "avant merge" , rspec + rspec.merge(result) + print >>sys.stderr, "AFTERMERGE" , rspec + except: + raise + api.logger.info("SM.ListResources: Failed to merge aggregate rspec") + + # cache the result + if caching and api.cache and not xrn: + api.cache.add(version_string, rspec.toxml()) + + print >>sys.stderr, "\r\n slice_manager \r\n" , rspec + return rspec.toxml() + +# first draft at a merging SliverStatus +def SliverStatus(api, slice_xrn, creds, call_id): + if Callids().already_handled(call_id): return {} + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + server = api.aggregates[aggregate] + threads.run (server.SliverStatus, slice_xrn, credential, call_id) + results = threads.get_results() + + # get rid of any void result - e.g. when call_id was hit where by convention we return {} + results = [ result for result in results if result and result['geni_resources']] + + # do not try to combine if there's no result + if not results : return {} + + # otherwise let's merge stuff + overall = {} + + # mmh, it is expected that all results carry the same urn + overall['geni_urn'] = results[0]['geni_urn'] + + # consolidate geni_status - simple model using max on a total order + states = [ 'ready', 'configuring', 'failed', 'unknown' ] + # hash name to index + shash = dict ( zip ( states, range(len(states)) ) ) + def combine_status (x,y): + return shash [ max (shash(x),shash(y)) ] + overall['geni_status'] = reduce (combine_status, [ result['geni_status'] for result in results], 'ready' ) + + # {'ready':0,'configuring':1,'failed':2,'unknown':3} + # append all geni_resources + overall['geni_resources'] = \ + reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) + + return overall + +def main(): + r = RSpec() + r.parseFile(sys.argv[1]) + rspec = r.toDict() + CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice') + +if __name__ == "__main__": + main() + -- 2.43.0