From e4b13375c99c6def2f558db1c1d6f881d9d13ec7 Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Sun, 16 Oct 2011 22:12:56 -0400 Subject: [PATCH] fix bug in GetVersion(). Use sfa.plc.api.get_server() to establish a connection the remote aggregates --- sfa/managers/slice_manager_pl.py | 128 +++++++++++++++---------------- 1 file changed, 61 insertions(+), 67 deletions(-) diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index ad859128..8d5a6955 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -52,7 +52,7 @@ def _call_id_supported(api, server): # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances def get_serverproxy_url (server): try: - return server.url + return server.get_url() except: logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") return server._ServerProxy__host + server._ServerProxy__handler @@ -65,10 +65,10 @@ def GetVersion(api): ad_rspec_versions = [] request_rspec_versions = [] for rspec_version in version_manager.versions: - if rspec_version in ['*', 'ad']: + if rspec_version.content_type in ['*', 'ad']: + ad_rspec_versions.append(rspec_version.to_dict()) + if rspec_version.content_type in ['*', 'request']: request_rspec_versions.append(rspec_version.to_dict()) - if rspec_version in ['*', 'request']: - request_rspec_version.append(rspec_version.to_dict()) default_rspec_version = version_manager.get_version("sfa 1").to_dict() xrn=Xrn(api.hrn, 'authority+sa') version_more = {'interface':'slicemgr', @@ -149,10 +149,9 @@ def ListResources(api, creds, options, call_id): caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() - credentials = [credential] + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller @@ -161,8 +160,10 @@ def ListResources(api, creds, options, call_id): continue # get the rspec from the aggregate - server = api.aggregates[aggregate] - threads.run(_ListResources, aggregate, server, credentials, options, call_id) + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(_ListResources, aggregate, server, [cred], options, call_id) + results = threads.get_results() rspec_version = version_manager.get_version(options.get('rspec_version')) @@ -204,7 +205,7 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): rspec.filter(filter) rspec = rspec.toxml() requested_users = sfa_to_pg_users_arg(users) - args = [xrn, [credential], rspec, requested_users] + args = [xrn, credential, rspec, requested_users] if _call_id_supported(api, server): args.append(call_id) rspec = server.CreateSliver(*args) @@ -227,9 +228,9 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): drop_slicemgr_stats(rspec) # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() # get the callers hrn hrn, type = urn_to_hrn(xrn) @@ -241,9 +242,10 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = api.aggregates[aggregate] + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id) + threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id) results = threads.get_results() manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') @@ -273,17 +275,18 @@ def RenewSliver(api, xrn, creds, expiration_time, call_id): caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = api.aggregates[aggregate] - threads.run(_RenewSliver, server, xrn, [credential], expiration_time, call_id) + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id) # 'and' the results return reduce (lambda x,y: x and y, threads.get_results() , True) @@ -302,17 +305,18 @@ def DeleteSliver(api, xrn, creds, call_id): caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = api.aggregates[aggregate] - threads.run(_DeleteSliver, server, xrn, credential, call_id) + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(_DeleteSliver, server, xrn, [cred], call_id) threads.get_results() return 1 @@ -328,13 +332,14 @@ def SliverStatus(api, slice_xrn, creds, call_id): if Callids().already_handled(call_id): return {} # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() for aggregate in api.aggregates: - server = api.aggregates[aggregate] - threads.run (_SliverStatus, server, slice_xrn, credential, call_id) + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run (_SliverStatus, server, slice_xrn, [cred], call_id) results = threads.get_results() # get rid of any void result - e.g. when call_id was hit where by convention we return {} @@ -381,9 +386,9 @@ def ListSlices(api, creds, call_id): caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred= api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() # fetch from aggregates for aggregate in api.aggregates: @@ -391,8 +396,9 @@ def ListSlices(api, creds, call_id): # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = api.aggregates[aggregate] - threads.run(_ListSlices, server, credential, call_id) + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(_ListSlices, server, [cred], call_id) # combime results results = threads.get_results() @@ -422,33 +428,19 @@ def get_ticket(api, xrn, creds, rspec, users): caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems(): # prevent infinite loop. Dont send request back to caller # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = None - if aggregate in api.aggregates: - server = api.aggregates[aggregate] - else: - net_urn = hrn_to_urn(aggregate, 'authority') - # we may have a peer that knows about this aggregate - for agg in api.aggregates: - target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn) - if not target_aggs or not 'hrn' in target_aggs[0]: - continue - # send the request to this address - url = target_aggs[0]['url'] - server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file, timeout=30) - # aggregate found, no need to keep looping - break - if server is None: - continue - threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users) + + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users) results = threads.get_results() @@ -492,17 +484,18 @@ def start_slice(api, xrn, creds): caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = api.aggregates[aggregate] - threads.run(server.Start, xrn, credential) + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(server.Start, xrn, cred) threads.get_results() return 1 @@ -514,17 +507,18 @@ def stop_slice(api, xrn, creds): caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first - credential = api.getDelegatedCredential(creds) - if not credential: - credential = api.getCredential() + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() threads = ThreadManager() for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue - server = api.aggregates[aggregate] - threads.run(server.Stop, xrn, credential) + interface = api.aggregates[aggregate] + server = api.get_server(interface, cred) + threads.run(server.Stop, xrn, cred) threads.get_results() return 1 -- 2.45.2