fix bug in GetVersion(). Use sfa.plc.api.get_server() to establish a connection the...
authorTony Mack <tmack@paris.CS.Princeton.EDU>
Mon, 17 Oct 2011 02:12:56 +0000 (22:12 -0400)
committerTony Mack <tmack@paris.CS.Princeton.EDU>
Mon, 17 Oct 2011 02:12:56 +0000 (22:12 -0400)
sfa/managers/slice_manager_pl.py

index ad85912..8d5a695 100644 (file)
@@ -52,7 +52,7 @@ def _call_id_supported(api, server):
 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
 def get_serverproxy_url (server):
     try:
-        return server.url
+        return server.get_url()
     except:
         logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
         return server._ServerProxy__host + server._ServerProxy__handler
@@ -65,10 +65,10 @@ def GetVersion(api):
     ad_rspec_versions = []
     request_rspec_versions = []
     for rspec_version in version_manager.versions:
-        if rspec_version in ['*', 'ad']:
+        if rspec_version.content_type in ['*', 'ad']:
+            ad_rspec_versions.append(rspec_version.to_dict())
+        if rspec_version.content_type in ['*', 'request']:
             request_rspec_versions.append(rspec_version.to_dict())
-        if rspec_version in ['*', 'request']:
-            request_rspec_version.append(rspec_version.to_dict())
     default_rspec_version = version_manager.get_version("sfa 1").to_dict()
     xrn=Xrn(api.hrn, 'authority+sa')
     version_more = {'interface':'slicemgr',
@@ -149,10 +149,9 @@ def ListResources(api, creds, options, call_id):
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
-    credentials = [credential]
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
     threads = ThreadManager()
     for aggregate in api.aggregates:
         # prevent infinite loop. Dont send request back to caller
@@ -161,8 +160,10 @@ def ListResources(api, creds, options, call_id):
             continue
 
         # get the rspec from the aggregate
-        server = api.aggregates[aggregate]
-        threads.run(_ListResources, aggregate, server, credentials, options, call_id)
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
+        threads.run(_ListResources, aggregate, server, [cred], options, call_id)
+
 
     results = threads.get_results()
     rspec_version = version_manager.get_version(options.get('rspec_version'))
@@ -204,7 +205,7 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
                 rspec.filter(filter)
                 rspec = rspec.toxml()
                 requested_users = sfa_to_pg_users_arg(users)
-            args = [xrn, [credential], rspec, requested_users]
+            args = [xrn, credential, rspec, requested_users]
             if _call_id_supported(api, server):
                 args.append(call_id)
             rspec = server.CreateSliver(*args)
@@ -227,9 +228,9 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
     drop_slicemgr_stats(rspec)
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
 
     # get the callers hrn
     hrn, type = urn_to_hrn(xrn)
@@ -241,9 +242,10 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
         # unless the caller is the aggregate's SM 
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
-        server = api.aggregates[aggregate]
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
         # Just send entire RSpec to each aggregate
-        threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id)
+        threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id)
             
     results = threads.get_results()
     manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
@@ -273,17 +275,18 @@ def RenewSliver(api, xrn, creds, expiration_time, call_id):
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
     threads = ThreadManager()
     for aggregate in api.aggregates:
         # prevent infinite loop. Dont send request back to caller
         # unless the caller is the aggregate's SM
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
-        server = api.aggregates[aggregate]
-        threads.run(_RenewSliver, server, xrn, [credential], expiration_time, call_id)
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
+        threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id)
     # 'and' the results
     return reduce (lambda x,y: x and y, threads.get_results() , True)
 
@@ -302,17 +305,18 @@ def DeleteSliver(api, xrn, creds, call_id):
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
     threads = ThreadManager()
     for aggregate in api.aggregates:
         # prevent infinite loop. Dont send request back to caller
         # unless the caller is the aggregate's SM
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
-        server = api.aggregates[aggregate]
-        threads.run(_DeleteSliver, server, xrn, credential, call_id)
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
+        threads.run(_DeleteSliver, server, xrn, [cred], call_id)
     threads.get_results()
     return 1
 
@@ -328,13 +332,14 @@ def SliverStatus(api, slice_xrn, creds, call_id):
     
     if Callids().already_handled(call_id): return {}
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
     threads = ThreadManager()
     for aggregate in api.aggregates:
-        server = api.aggregates[aggregate]
-        threads.run (_SliverStatus, server, slice_xrn, credential, call_id)
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
+        threads.run (_SliverStatus, server, slice_xrn, [cred], call_id)
     results = threads.get_results()
 
     # get rid of any void result - e.g. when call_id was hit where by convention we return {}
@@ -381,9 +386,9 @@ def ListSlices(api, creds, call_id):
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
+    cred= api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
     threads = ThreadManager()
     # fetch from aggregates
     for aggregate in api.aggregates:
@@ -391,8 +396,9 @@ def ListSlices(api, creds, call_id):
         # unless the caller is the aggregate's SM
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
-        server = api.aggregates[aggregate]
-        threads.run(_ListSlices, server, credential, call_id)
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
+        threads.run(_ListSlices, server, [cred], call_id)
 
     # combime results
     results = threads.get_results()
@@ -422,33 +428,19 @@ def get_ticket(api, xrn, creds, rspec, users):
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential() 
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential() 
     threads = ThreadManager()
     for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
         # prevent infinite loop. Dont send request back to caller
         # unless the caller is the aggregate's SM
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
-        server = None
-        if aggregate in api.aggregates:
-            server = api.aggregates[aggregate]
-        else:
-            net_urn = hrn_to_urn(aggregate, 'authority')     
-            # we may have a peer that knows about this aggregate
-            for agg in api.aggregates:
-                target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
-                if not target_aggs or not 'hrn' in target_aggs[0]:
-                    continue
-                # send the request to this address 
-                url = target_aggs[0]['url']
-                server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file, timeout=30)
-                # aggregate found, no need to keep looping
-                break   
-        if server is None:
-            continue 
-        threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
+        
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
+        threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users)
 
     results = threads.get_results()
     
@@ -492,17 +484,18 @@ def start_slice(api, xrn, creds):
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
     threads = ThreadManager()
     for aggregate in api.aggregates:
         # prevent infinite loop. Dont send request back to caller
         # unless the caller is the aggregate's SM
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
-        server = api.aggregates[aggregate]
-        threads.run(server.Start, xrn, credential)
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)    
+        threads.run(server.Start, xrn, cred)
     threads.get_results()    
     return 1
  
@@ -514,17 +507,18 @@ def stop_slice(api, xrn, creds):
     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
 
     # attempt to use delegated credential first
-    credential = api.getDelegatedCredential(creds)
-    if not credential:
-        credential = api.getCredential()
+    cred = api.getDelegatedCredential(creds)
+    if not cred:
+        cred = api.getCredential()
     threads = ThreadManager()
     for aggregate in api.aggregates:
         # prevent infinite loop. Dont send request back to caller
         # unless the caller is the aggregate's SM
         if caller_hrn == aggregate and aggregate != api.hrn:
             continue
-        server = api.aggregates[aggregate]
-        threads.run(server.Stop, xrn, credential)
+        interface = api.aggregates[aggregate]
+        server = api.get_server(interface, cred)
+        threads.run(server.Stop, xrn, cred)
     threads.get_results()    
     return 1