From: Sandrine Avakian Date: Fri, 21 Oct 2011 13:17:01 +0000 (+0200) Subject: Fixed problem from merging from mainstream in slice_manager_slab and api. X-Git-Tag: sfa-2.1-24~3^2~321 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=006eb0770abc4e6839841e452eff754c737b99c1;p=sfa.git Fixed problem from merging from mainstream in slice_manager_slab and api. --- diff --git a/sfa/managers/slice_manager_slab.py b/sfa/managers/slice_manager_slab.py index 25c4b0bc..81a99e22 100644 --- a/sfa/managers/slice_manager_slab.py +++ b/sfa/managers/slice_manager_slab.py @@ -7,7 +7,8 @@ from copy import deepcopy from copy import copy from lxml import etree -from sfa.util.sfalogging import sfa_logger +from sfa.util.sfalogging import logger +#from sfa.util.sfalogging import sfa_logger from sfa.util.rspecHelper import merge_rspecs from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn from sfa.util.plxrn import hrn_to_pl_slicename @@ -15,13 +16,17 @@ from sfa.util.rspec import * from sfa.util.specdict import * from sfa.util.faults import * from sfa.util.record import SfaRecord -from sfa.rspecs.pg_rspec import PGRSpec -from sfa.rspecs.sfa_rspec import SfaRSpec +#from sfa.rspecs.pg_rspec import PGRSpec +#from sfa.rspecs.sfa_rspec import SfaRSpec from sfa.rspecs.rspec_converter import RSpecConverter -from sfa.rspecs.rspec_parser import parse_rspec -from sfa.rspecs.rspec_version import RSpecVersion -from sfa.rspecs.sfa_rspec import sfa_rspec_version -from sfa.rspecs.pg_rspec import pg_rspec_ad_version, pg_rspec_request_version +#from sfa.rspecs.rspec_parser import parse_rspec +#from sfa.rspecs.rspec_version import RSpecVersion +#from sfa.rspecs.sfa_rspec import sfa_rspec_version +#from sfa.rspecs.pg_rspec import pg_rspec_ad_version, pg_rspec_request_version +from sfa.client.client_helper import sfa_to_pg_users_arg +from sfa.rspecs.version_manager import VersionManager + +from sfa.rspecs.rspec import RSpec from sfa.util.policy import Policy from sfa.util.prefixTree import prefixTree from sfa.util.sfaticket import * @@ -36,25 +41,32 @@ from sfa.util.callids import Callids # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances def get_serverproxy_url (server): try: - return server.url + return server.get_url() except: - sfa_logger().warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") + logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") return server._ServerProxy__host + server._ServerProxy__handler def GetVersion(api): # peers explicitly in aggregates.xml - peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() if peername != api.hrn]) - xrn=Xrn (api.hrn) - request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)] - ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)] + version_manager = VersionManager() + ad_rspec_versions = [] + request_rspec_versions = [] + for rspec_version in version_manager.versions: + if rspec_version.content_type in ['*', 'ad']: + ad_rspec_versions.append(rspec_version.to_dict()) + if rspec_version.content_type in ['*', 'request']: + request_rspec_versions.append(rspec_version.to_dict()) + default_rspec_version = version_manager.get_version("sfa 1").to_dict() + xrn=Xrn(api.hrn, 'authority+sa') version_more = {'interface':'slicemgr', 'hrn' : xrn.get_hrn(), 'urn' : xrn.get_urn(), 'peers': peers, 'request_rspec_versions': request_rspec_versions, 'ad_rspec_versions': ad_rspec_versions, - 'default_ad_rspec': dict(sfa_rspec_version) + 'default_ad_rspec': default_rspec_version } sm_version=version_core(version_more) # local aggregate if present needs to have localhost resolved @@ -62,69 +74,123 @@ def GetVersion(api): local_am_url=get_serverproxy_url(api.aggregates[api.hrn]) sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version - -def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - def _CreateSliver(aggregate, xrn, credential, rspec, users, call_id): - # Need to call ParseVersion at an aggregate to determine the supported - # rspec type/format beofre calling CreateSliver at an Aggregate. - # The Aggregate's verion info is cached - print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER _CreateSliver " - - server = api.aggregates[aggregate] - # get cached aggregate version - aggregate_version_key = 'version_'+ aggregate - aggregate_version = api.cache.get(aggregate_version_key) - print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER _CreateSliver aggregate_version WTF ? %s"%(aggregate_version ) - if aggregate_version is None: - # get current aggregate version anc cache it for 24 hours - print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER It s browwwwwn server" - aggregate_version = server.GetVersion() - print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER _CreateSliver GET aggregate_version %s"%(aggregate_version ) - api.cache.add(aggregate_version_key, aggregate_version, 60 * 60 * 24) - - if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version: - # sfa aggregtes support both sfa and pg rspecs, no need to convert - # if aggregate supports sfa rspecs. othewise convert to pg rspec - rspec = RSpecConverter.to_pg_rspec(rspec) - - return server.CreateSliver(xrn, credential, rspec, users, call_id) - - if Callids().already_handled(call_id): return "" - - # Validate the RSpec against PlanetLab's schema --disabled for now - # The schema used here needs to aggregate the PL and VINI schemas - # schema = "/var/www/html/schemas/pl.rng" - rspec = parse_rspec(rspec_str) - schema = None - if schema: - rspec.validate(schema) - - # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() - - # get the callers hrn - hrn, type = urn_to_hrn(xrn) - valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - - # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, aggregate, xrn, credential, rspec.toxml(), users, call_id) - - results = threads.get_results() - rspec = SfaRSpec() - for result in results: - rspec.merge(result) - return rspec.toxml() +#def GetVersion(api): + ## peers explicitly in aggregates.xml + #peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + #if peername != api.hrn]) + #xrn=Xrn (api.hrn) + #request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)] + #ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)] + #version_more = {'interface':'slicemgr', + #'hrn' : xrn.get_hrn(), + #'urn' : xrn.get_urn(), + #'peers': peers, + #'request_rspec_versions': request_rspec_versions, + #'ad_rspec_versions': ad_rspec_versions, + #'default_ad_rspec': dict(sfa_rspec_version) + #} + #sm_version=version_core(version_more) + ## local aggregate if present needs to have localhost resolved + #if api.hrn in api.aggregates: + #local_am_url=get_serverproxy_url(api.aggregates[api.hrn]) + #sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) + #return sm_version + + +def drop_slicemgr_stats(api,rspec): + try: + stats_elements = rspec.xml.xpath('//statistics') + for node in stats_elements: + node.getparent().remove(node) + except Exception, e: + api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) + + + + +def CreateSliver(api, xrn, creds, rspec_str, users, call_id): + version_manager = VersionManager() + def _CreateSliver(aggregate, xrn, credential, rspec, users, call_id): + # Need to call ParseVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + # The Aggregate's verion info is cached + + tStart = time.time() + try: + # Need to call GetVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + server_version = api.get_cached_server_version(server) + requested_users = users + if 'sfa' not in server_version and 'geni_api' in server_version: + # sfa aggregtes support both sfa and pg rspecs, no need to convert + # if aggregate supports sfa rspecs. otherwise convert to pg rspec + rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request')) + filter = {'component_manager_id': server_version['urn']} + rspec.filter(filter) + rspec = rspec.toxml() + requested_users = sfa_to_pg_users_arg(users) + args = [xrn, credential, rspec, requested_users] + if _call_id_supported(api, server): + args.append(call_id) + rspec = server.CreateSliver(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + except: + logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} + + + if Callids().already_handled(call_id): return "" + + # Validate the RSpec against PlanetLab's schema --disabled for now + # The schema used here needs to aggregate the PL and VINI schemas + # schema = "/var/www/html/schemas/pl.rng" + rspec = RSpec(rspec_str) + schema = None + if schema: + rspec.validate(schema) + + print>>sys.stderr, " \r\n \r\n \t\t =======SLICE MANAGER _CreateSliver api %s" %(api) + # if there is a section, the aggregates don't care about it, + # so delete it. + drop_slicemgr_stats(api,rspec) + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + + # get the callers hrn + hrn, type = urn_to_hrn(xrn) + valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + threads = ThreadManager() + + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.get_server(interface, credential) + # Just send entire RSpec to each aggregate + threads.run(_CreateSliver, aggregate, xrn, [credential], rspec.toxml(), users, call_id) + + results = threads.get_results() + manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') + result_rspec = RSpec(version=manifest_version) + #rspec = SfaRSpec() + for result in results: + add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + result_rspec.version.merge(result["rspec"]) + except: + api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") + return result_rspec.toxml() + #rspec.merge(result) + #return rspec.toxml() def RenewSliver(api, xrn, creds, expiration_time, call_id): if Callids().already_handled(call_id): return True @@ -355,85 +421,180 @@ def ListSlices(api, creds, call_id): return slices +def add_slicemgr_stat(rspec, callname, aggname, elapsed, status): + try: + stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) + if stats_tags: + stats_tag = stats_tags[0] + else: + stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname) + + etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) + except Exception, e: + api.logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) + + + def ListResources(api, creds, options, call_id): + version_manager = VersionManager() + def _ListResources(aggregate, server, credential, opts, call_id): + + my_opts = copy(opts) + args = [credential, my_opts] + tStart = time.time() + try: + if _call_id_supported(api, server): + args.append(call_id) + version = api.get_cached_server_version(server) + # force ProtoGENI aggregates to give us a v2 RSpec + if 'sfa' not in version.keys(): + my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict() + rspec = server.ListResources(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + except Exception, e: + api.logger.log_exc("ListResources failed at %s" %(server.url)) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" # get slice's hrn from options xrn = options.get('geni_slice_urn', '') (hrn, type) = urn_to_hrn(xrn) - print >>sys.stderr, " SM_ListResources xrn " , xrn - #print >>sys.stderr, " SM ListResources api.__dict__ " , api.__dict__.keys() - #print >>sys.stderr, " SM ListResources dir(api)" , dir(api) - print >>sys.stderr, " \r\n avant RspecVersion \r\n \r\n" - # get the rspec's return format from options - rspec_version = RSpecVersion(options.get('rspec_version')) - print >>sys.stderr, " \r\n \r\n ListResources RSpecVersion ", rspec_version - version_string = "rspec_%s" % (rspec_version.get_version_name()) + if 'geni_compressed' in options: + del(options['geni_compressed']) - #panos adding the info option to the caching key (can be improved) - if options.get('info'): - version_string = version_string + "_"+options.get('info') - - print>>sys.stderr,"version string = ",version_string + # get the rspec's return format from options + rspec_version = version_manager.get_version(options.get('rspec_version')) + version_string = "rspec_%s" % (rspec_version.to_string()) # look in cache first if caching and api.cache and not xrn: - print>>sys.stderr," \r\n caching %s and api.cache %s and not xrn %s"%(caching , api.cache,xrn) rspec = api.cache.get(version_string) if rspec: return rspec # get the callers hrn - print >>sys.stderr, " SM ListResources get the callers hrn " valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - print >>sys.stderr, " \r\n SM ListResources get the callers caller_hrn hrn %s "%(caller_hrn) + # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - print >>sys.stderr, " \r\n SM ListResources get the callers credential %s "%(credential) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() - print >>sys.stderr, " \r\n SM ListResources get the callers api.aggregates %s "%(api.aggregates) for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue + # get the rspec from the aggregate - server = api.aggregates[aggregate] - print >>sys.stderr, " Slice Mgr ListResources, server" ,server - my_opts = copy(options) - my_opts['geni_compressed'] = False - threads.run(server.ListResources, credential, my_opts, call_id) - print >>sys.stderr, "\r\n !!!!!!!!!!!!!!!! \r\n" - results = threads.get_results() - #results.append(open('/root/protogeni.rspec', 'r').read()) - rspec_version = RSpecVersion(my_opts.get('rspec_version')) - if rspec_version['type'].lower() == 'protogeni': - rspec = PGRSpec() - else: - rspec = SfaRSpec() + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(_ListResources, aggregate, server, [cred], options, call_id) + + results = threads.get_results() + rspec_version = version_manager.get_version(options.get('rspec_version')) + if xrn: + result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest') + else: + result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad') + rspec = RSpec(version=result_version) for result in results: - print >>sys.stderr, "\r\n slice_manager result" , result - try: - print >>sys.stderr, "avant merge" , rspec - rspec.merge(result) - print >>sys.stderr, "AFTERMERGE" , rspec - except: - raise - api.logger.info("SM.ListResources: Failed to merge aggregate rspec") + add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.version.merge(result["rspec"]) + except: + api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") # cache the result if caching and api.cache and not xrn: api.cache.add(version_string, rspec.toxml()) - + print >>sys.stderr, "\r\n slice_manager \r\n" , rspec return rspec.toxml() +#def ListResources(api, creds, options, call_id): + + #if Callids().already_handled(call_id): return "" + + ## get slice's hrn from options + #xrn = options.get('geni_slice_urn', '') + #(hrn, type) = urn_to_hrn(xrn) + #print >>sys.stderr, " SM_ListResources xrn " , xrn + ##print >>sys.stderr, " SM ListResources api.__dict__ " , api.__dict__.keys() + ##print >>sys.stderr, " SM ListResources dir(api)" , dir(api) + #print >>sys.stderr, " \r\n avant RspecVersion \r\n \r\n" + ## get the rspec's return format from options + #rspec_version = RSpecVersion(options.get('rspec_version')) + #print >>sys.stderr, " \r\n \r\n ListResources RSpecVersion ", rspec_version + #version_string = "rspec_%s" % (rspec_version.get_version_name()) + + ##panos adding the info option to the caching key (can be improved) + #if options.get('info'): + #version_string = version_string + "_"+options.get('info') + + #print>>sys.stderr,"version string = ",version_string + + ## look in cache first + #if caching and api.cache and not xrn: + #print>>sys.stderr," \r\n caching %s and api.cache %s and not xrn %s"%(caching , api.cache,xrn) + #rspec = api.cache.get(version_string) + #if rspec: + #return rspec + + ## get the callers hrn + #print >>sys.stderr, " SM ListResources get the callers hrn " + #valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] + #caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + #print >>sys.stderr, " \r\n SM ListResources get the callers caller_hrn hrn %s "%(caller_hrn) + ## attempt to use delegated credential first + #credential = api.getDelegatedCredential(creds) + #print >>sys.stderr, " \r\n SM ListResources get the callers credential %s "%(credential) + #if not credential: + #credential = api.getCredential() + #threads = ThreadManager() + #print >>sys.stderr, " \r\n SM ListResources get the callers api.aggregates %s "%(api.aggregates) + #for aggregate in api.aggregates: + ## prevent infinite loop. Dont send request back to caller + ## unless the caller is the aggregate's SM + #if caller_hrn == aggregate and aggregate != api.hrn: + #continue + ## get the rspec from the aggregate + #server = api.aggregates[aggregate] + #print >>sys.stderr, " Slice Mgr ListResources, server" ,server + #my_opts = copy(options) + #my_opts['geni_compressed'] = False + #threads.run(server.ListResources, credential, my_opts, call_id) + #print >>sys.stderr, "\r\n !!!!!!!!!!!!!!!! \r\n" + #results = threads.get_results() + ##results.append(open('/root/protogeni.rspec', 'r').read()) + #rspec_version = RSpecVersion(my_opts.get('rspec_version')) + #if rspec_version['type'].lower() == 'protogeni': + #rspec = PGRSpec() + #else: + #rspec = SfaRSpec() + + #for result in results: + #print >>sys.stderr, "\r\n slice_manager result" , result + #try: + #print >>sys.stderr, "avant merge" , rspec + #rspec.merge(result) + #print >>sys.stderr, "AFTERMERGE" , rspec + #except: + #raise + #api.logger.info("SM.ListResources: Failed to merge aggregate rspec") + + ## cache the result + #if caching and api.cache and not xrn: + #api.cache.add(version_string, rspec.toxml()) + + #print >>sys.stderr, "\r\n slice_manager \r\n" , rspec + #return rspec.toxml() + # first draft at a merging SliverStatus def SliverStatus(api, slice_xrn, creds, call_id): if Callids().already_handled(call_id): return {} diff --git a/sfa/senslab/SenslabImportUsers.py b/sfa/senslab/SenslabImportUsers.py index 16d2055c..9296c2e8 100644 --- a/sfa/senslab/SenslabImportUsers.py +++ b/sfa/senslab/SenslabImportUsers.py @@ -50,11 +50,11 @@ class SenslabImportUsers: def InitPersons(self): persons_per_site = {} person_id = 7 - persons_per_site[person_id] = {'person_id': person_id,'site_ids': [3],'email': 'a_rioot@senslab.fr', 'key_ids':[1], 'roles': ['pi'], 'role_ids':[20]} + persons_per_site[person_id] = {'person_id': person_id,'site_ids': [3],'email': 'a_rioot@senslab.fr', 'key_ids':[1], 'roles': ['pi'], 'role_ids':[20],'first_name':'A','last_name':'rioot'} person_id = 8 - persons_per_site[person_id] = {'person_id': person_id,'site_ids': [3],'email': 'lost@senslab.fr','key_ids':[1],'roles': ['pi'], 'role_ids':[20]} + persons_per_site[person_id] = {'person_id': person_id,'site_ids': [3],'email': 'lost@senslab.fr','key_ids':[1],'roles': ['pi'], 'role_ids':[20],'first_name':'L','last_name':'lost'} person_id = 9 - persons_per_site[person_id] = {'person_id': person_id,'site_ids': [3],'email': 'user@senslab.fr','key_ids':[1],'roles': ['user'], 'role_ids':[1]} + persons_per_site[person_id] = {'person_id': person_id,'site_ids': [3],'email': 'user@senslab.fr','key_ids':[1],'roles': ['user'], 'role_ids':[1],'first_name':'U','last_name':'senslab'} for person_id in persons_per_site.keys(): person = persons_per_site[person_id] if person['person_id'] not in self.person_list: diff --git a/sfa/senslab/api.py b/sfa/senslab/api.py index ca191050..caa9a884 100644 --- a/sfa/senslab/api.py +++ b/sfa/senslab/api.py @@ -160,16 +160,36 @@ class SfaAPI(BaseAPI): return cred.save_to_string(save_parents=True) + def get_server(self, interface, cred, timeout=30): + """ + Returns a connection to the specified interface. Use the specified + credential to determine the caller and look for the caller's key/cert + in the registry hierarchy cache. + """ + from sfa.trust.hierarchy import Hierarchy + if not isinstance(cred, Credential): + cred_obj = Credential(string=cred) + else: + cred_obj = cred + caller_gid = cred_obj.get_gid_caller() + hierarchy = Hierarchy() + auth_info = hierarchy.get_auth_info(caller_gid.get_hrn()) + key_file = auth_info.get_privkey_filename() + cert_file = auth_info.get_gid_filename() + server = interface.get_server(key_file, cert_file, timeout) + return server + + def getDelegatedCredential(self, creds): """ Attempt to find a credential delegated to us in the specified list of creds. """ - if creds and not isinstance(creds, list): - creds = [creds] + if creds and not isinstance(creds, list): + creds = [creds] delegated_creds = filter_creds_by_caller(creds, [self.hrn, self.hrn + '.slicemanager']) if not delegated_creds: - return None + return None return delegated_creds[0] def __getCredential(self):