From: Tony Mack Date: Mon, 20 Jun 2011 17:04:03 +0000 (-0400) Subject: only send call_id to interfaces that support it X-Git-Tag: sfa-1.0-28~24 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=9a5ebdefbc025f9bd05e1c8c0ffe798dc1410a6c;p=sfa.git only send call_id to interfaces that support it --- diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index 1077dcf8..1d801e38 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -32,6 +32,30 @@ import sfa.plc.peers as peers from sfa.util.version import version_core from sfa.util.callids import Callids + +def _call_id_supported(api, server): + """ + Returns true if server support the optional call_id arg, false otherwise. + """ + cache_key = server.url + "-version" + server_version = api.cache.get(cache_key) + if not server_version: + server_version = server.GetVersion() + # cache version for 24 hours + cache.add(cache_key, server_version, ttl= 60*60*24) + + if 'sfa' in server_version: + code_tag = server_version['code_tag'] + code_tag_parts = code_tag.split("-") + + version_parts = code_tag_parts[0].split(".") + major, minor = version_parts[0], version_parts[1] + rev = code_tag_parts[1] + if int(major) > 1: + if int(minor) > 0 or int(rev) > 20: + return True + return False + # we have specialized xmlrpclib.ServerProxy to remember the input url # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances def get_serverproxy_url (server): @@ -63,31 +87,86 @@ def GetVersion(api): sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version + +def ListResources(api, creds, options, call_id): + def _ListResources(server, credential, my_opts, call_id): + args = [credential, my_opts] + if _call_id_supported(api, server): + args.append(call_id) + return server.ListResources(*args) + + if Callids().already_handled(call_id): return "" + + # get slice's hrn from options + xrn = options.get('geni_slice_urn', '') + (hrn, type) = urn_to_hrn(xrn) + my_opts = copy(options) + my_opts['geni_compressed'] = False + + # get the rspec's return format from options + rspec_version = RSpecVersion(options.get('rspec_version')) + version_string = "rspec_%s" % (rspec_version.get_version_name()) + + # look in cache first + if caching and api.cache and not xrn: + rspec = api.cache.get(version_string) + if rspec: + return rspec + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + # get the rspec from the aggregate + server = api.aggregates[aggregate] + #threads.run(server.ListResources, credential, my_opts, call_id) + threads.run(_ListResources, server, credential, my_opts, call_id) + + results = threads.get_results() + rspec_version = RSpecVersion(my_opts.get('rspec_version')) + if rspec_version['type'] == pg_rspec_ad_version['type']: + rspec = PGRSpec() + else: + rspec = SfaRSpec() + for result in results: + try: + rspec.merge(result) + except: + api.logger.info("SM.ListResources: Failed to merge aggregate rspec") + + # cache the result + if caching and api.cache and not xrn: + api.cache.add(version_string, rspec.toxml()) + + return rspec.toxml() + + def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - def _CreateSliver(aggregate, xrn, credential, rspec, users, call_id): + def _CreateSliver(server, xrn, credential, rspec, users, call_id): # Need to call GetVersion at an aggregate to determine the supported # rspec type/format beofre calling CreateSliver at an Aggregate. - # The Aggregate's verion info is cached - server = api.aggregates[aggregate] - # get cached aggregate version - aggregate_version_key = 'version_'+ aggregate - aggregate_version = api.cache.get(aggregate_version_key) - if not aggregate_version: - # get current aggregate version anc cache it for 24 hours - aggregate_version = server.GetVersion() - api.cache.add(aggregate_version_key, aggregate_version, 60 * 60 * 24) - + server_version = _get_server_version(api, server) if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version: # sfa aggregtes support both sfa and pg rspecs, no need to convert # if aggregate supports sfa rspecs. othewise convert to pg rspec rspec = RSpecConverter.to_pg_rspec(rspec) - - return server.CreateSliver(xrn, credential, rspec, users, call_id) - + args = [xrn, credential, rspec, users] + if _call_id_supported(api, server): + args.append(call_id) + return server.CreateSliver(*args) if Callids().already_handled(call_id): return "" - # Validate the RSpec against PlanetLab's schema --disabled for now # The schema used here needs to aggregate the PL and VINI schemas # schema = "/var/www/html/schemas/pl.rng" @@ -111,9 +190,9 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - + server = api.aggregates[aggregate] # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, aggregate, xrn, credential, rspec.toxml(), users, call_id) + threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id) results = threads.get_results() rspec = SfaRSpec() @@ -122,6 +201,13 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): return rspec.toxml() def RenewSliver(api, xrn, creds, expiration_time, call_id): + def _RenewSliver(server, xrn, creds, expiration_time, call_id): + server_version = _get_server_version(api, server) + args = [xrn, creds, expiration_time, call_id] + if _call_id_supported(api, server): + args.append(call_id) + return server.RenewSliver(*args) + if Callids().already_handled(call_id): return True (hrn, type) = urn_to_hrn(xrn) @@ -139,12 +225,148 @@ def RenewSliver(api, xrn, creds, expiration_time, call_id): # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = api.aggregates[aggregate] - threads.run(server.RenewSliver, xrn, [credential], expiration_time, call_id) + threads.run(_RenewSliver, server, xrn, [credential], expiration_time, call_id) # 'and' the results return reduce (lambda x,y: x and y, threads.get_results() , True) +def DeleteSliver(api, xrn, creds, call_id): + def _DeleteSliver(server, xrn, creds, call_id): + server_version = _get_server_version(api, server) + args = [xrn, creds] + if _call_id_supported(api, server): + args.append(call_id) + return server.DeleteSliver(*args) + + if Callids().already_handled(call_id): return "" + (hrn, type) = urn_to_hrn(xrn) + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + server = api.aggregates[aggregate] + threads.run(_DeleteSliver, server, xrn, credential, call_id) + threads.get_results() + return 1 + + +# first draft at a merging SliverStatus +def SliverStatus(api, slice_xrn, creds, call_id): + def _SliverStatus(server, xrn, creds, call_id): + server_version = _get_server_version(api, server) + args = [xrn, creds] + if _call_id_supported(api, server): + args.append(call_id) + return server.SliverStatus(*args) + + if Callids().already_handled(call_id): return {} + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + server = api.aggregates[aggregate] + threads.run (_SliverStatus, server, slice_xrn, credential, call_id) + results = threads.get_results() + + # get rid of any void result - e.g. when call_id was hit where by convention we return {} + results = [ result for result in results if result and result['geni_resources']] + + # do not try to combine if there's no result + if not results : return {} + + # otherwise let's merge stuff + overall = {} + + # mmh, it is expected that all results carry the same urn + overall['geni_urn'] = results[0]['geni_urn'] + overall['pl_login'] = results[0]['pl_login'] + # append all geni_resources + overall['geni_resources'] = \ + reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) + overall['status'] = 'unknown' + if overall['geni_resources']: + overall['status'] = 'ready' + + return overall + +caching=True +#caching=False +def ListSlices(api, creds, call_id): + def _ListSlices(server, creds, call_id): + server_version = _get_server_version(api, server) + args = [creds] + if _call_id_supported(api, server): + args.append(call_id) + return server.ListSlices(*args) + + if Callids().already_handled(call_id): return [] + + # look in cache first + if caching and api.cache: + slices = api.cache.get('slices') + if slices: + return slices + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + # fetch from aggregates + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + server = api.aggregates[aggregate] + threads.run(_ListSlices, server, credential, call_id) + + # combime results + results = threads.get_results() + slices = [] + for result in results: + slices.extend(result) + + # cache the result + if caching and api.cache: + api.cache.add('slices', slices) + + return slices + + + if rspec_version['type'] == pg_rspec_ad_version['type']: + rspec = PGRSpec() + else: + rspec = SfaRSpec() + for result in results: + try: + rspec.merge(result) + except: + api.logger.info("SM.ListResources: Failed to merge aggregate rspec") + + # cache the result + if caching and api.cache and not xrn: + api.cache.add(version_string, rspec.toxml()) + + return rspec.toxml() + + def get_ticket(api, xrn, creds, rspec, users): slice_hrn, type = urn_to_hrn(xrn) # get the netspecs contained within the clients rspec @@ -222,29 +444,6 @@ def get_ticket(api, xrn, creds, rspec, users): ticket.sign() return ticket.save_to_string(save_parents=True) - -def DeleteSliver(api, xrn, creds, call_id): - if Callids().already_handled(call_id): return "" - (hrn, type) = urn_to_hrn(xrn) - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - server = api.aggregates[aggregate] - threads.run(server.DeleteSliver, xrn, credential, call_id) - threads.get_results() - return 1 - def start_slice(api, xrn, creds): hrn, type = urn_to_hrn(xrn) @@ -307,141 +506,6 @@ def status(api, xrn, creds): """ return 1 -# Thierry : caching at the slicemgr level makes sense to some extent -caching=True -#caching=False -def ListSlices(api, creds, call_id): - - if Callids().already_handled(call_id): return [] - - # look in cache first - if caching and api.cache: - slices = api.cache.get('slices') - if slices: - return slices - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() - threads = ThreadManager() - # fetch from aggregates - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - server = api.aggregates[aggregate] - threads.run(server.ListSlices, credential, call_id) - - # combime results - results = threads.get_results() - slices = [] - for result in results: - slices.extend(result) - - # cache the result - if caching and api.cache: - api.cache.add('slices', slices) - - return slices - - -def ListResources(api, creds, options, call_id): - - if Callids().already_handled(call_id): return "" - - # get slice's hrn from options - xrn = options.get('geni_slice_urn', '') - (hrn, type) = urn_to_hrn(xrn) - - # get the rspec's return format from options - rspec_version = RSpecVersion(options.get('rspec_version')) - version_string = "rspec_%s" % (rspec_version.get_version_name()) - - # look in cache first - if caching and api.cache and not xrn: - rspec = api.cache.get(version_string) - if rspec: - return rspec - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - # get the rspec from the aggregate - server = api.aggregates[aggregate] - my_opts = copy(options) - my_opts['geni_compressed'] = False - threads.run(server.ListResources, credential, my_opts, call_id) - - results = threads.get_results() - rspec_version = RSpecVersion(my_opts.get('rspec_version')) - if rspec_version['type'] == pg_rspec_ad_version['type']: - rspec = PGRSpec() - else: - rspec = SfaRSpec() - - for result in results: - try: - rspec.merge(result) - except: - api.logger.info("SM.ListResources: Failed to merge aggregate rspec") - - # cache the result - if caching and api.cache and not xrn: - api.cache.add(version_string, rspec.toxml()) - - return rspec.toxml() - -# first draft at a merging SliverStatus -def SliverStatus(api, slice_xrn, creds, call_id): - if Callids().already_handled(call_id): return {} - # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - server = api.aggregates[aggregate] - threads.run (server.SliverStatus, slice_xrn, credential, call_id) - results = threads.get_results() - - # get rid of any void result - e.g. when call_id was hit where by convention we return {} - results = [ result for result in results if result and result['geni_resources']] - - # do not try to combine if there's no result - if not results : return {} - - # otherwise let's merge stuff - overall = {} - - # mmh, it is expected that all results carry the same urn - overall['geni_urn'] = results[0]['geni_urn'] - overall['pl_login'] = results[0]['pl_login'] - # append all geni_resources - overall['geni_resources'] = \ - reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) - overall['status'] = 'unknown' - if overall['geni_resources']: - overall['status'] = 'ready' - - return overall - def main(): r = RSpec() r.parseFile(sys.argv[1])