rename server.threadmanager into client.multiclient
authorThierry Parmentelat <thierry.parmentelat@inria.fr>
Thu, 7 Nov 2013 13:43:50 +0000 (14:43 +0100)
committerThierry Parmentelat <thierry.parmentelat@inria.fr>
Thu, 7 Nov 2013 13:43:50 +0000 (14:43 +0100)
sfa/client/multiclient.py [moved from sfa/server/threadmanager.py with 90% similarity]
sfa/managers/slice_manager.py
sfa/openstack/osaggregate.py

similarity index 90%
rename from sfa/server/threadmanager.py
rename to sfa/client/multiclient.py
index b47b818..75573ed 100644 (file)
@@ -16,7 +16,7 @@ def ThreadedMethod(callable, results, errors):
                 try:
                     results.put(callable(*args, **kwds))
                 except Exception, e:
                 try:
                     results.put(callable(*args, **kwds))
                 except Exception, e:
-                    logger.log_exc('ThreadManager: Error in thread: ')
+                    logger.log_exc('MultiClient: Error in thread: ')
                     errors.put(traceback.format_exc())
                     
         thread = ThreadInstance()
                     errors.put(traceback.format_exc())
                     
         thread = ThreadInstance()
@@ -26,10 +26,10 @@ def ThreadedMethod(callable, results, errors):
 
  
 
 
  
 
-class ThreadManager:
+class MultiClient:
     """
     """
-    ThreadManager executes a callable in a thread and stores the result
-    in a thread safe queue. 
+    MultiClient allows to issue several SFA calls in parallel in different threads
+    and stores the results in a thread safe queue. 
     """
 
     def __init__(self):
     """
 
     def __init__(self):
@@ -58,7 +58,7 @@ class ThreadManager:
         """
         Return a list of all the results so far. Blocks until 
         all threads are finished. 
         """
         Return a list of all the results so far. Blocks until 
         all threads are finished. 
-        If lienent is set to false the error queue will be checked before 
+        If lenient is set to false the error queue will be checked before 
         the response is returned. If there are errors in the queue an SFA Fault will 
         be raised.   
         """
         the response is returned. If there are errors in the queue an SFA Fault will 
         be raised.   
         """
@@ -107,7 +107,7 @@ if __name__ == '__main__':
             time.sleep(sleep)
         return nums      
 
             time.sleep(sleep)
         return nums      
 
-    threads = ThreadManager()
+    threads = MultiClient()
     threads.run(f, "Thread1", 10, 2)
     threads.run(f, "Thread2", -10, 1)
     threads.run(e, "Thread3", 19, 1)
     threads.run(f, "Thread1", 10, 2)
     threads.run(f, "Thread2", -10, 1)
     threads.run(e, "Thread3", 19, 1)
index 746bac6..05b0f1e 100644 (file)
@@ -14,7 +14,7 @@ from sfa.util.version import version_core
 from sfa.util.callids import Callids
 from sfa.util.cache import Cache
 
 from sfa.util.callids import Callids
 from sfa.util.cache import Cache
 
-from sfa.server.threadmanager import ThreadManager
+from sfa.client.multiclient import MultiClient
 
 from sfa.rspecs.rspec_converter import RSpecConverter
 from sfa.rspecs.version_manager import VersionManager
 
 from sfa.rspecs.rspec_converter import RSpecConverter
 from sfa.rspecs.version_manager import VersionManager
@@ -147,7 +147,7 @@ class SliceManager:
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
@@ -157,10 +157,10 @@ class SliceManager:
             # get the rspec from the aggregate
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
             # get the rspec from the aggregate
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
-            threads.run(_ListResources, aggregate, server, [cred], options)
+            multiclient.run(_ListResources, aggregate, server, [cred], options)
     
     
     
     
-        results = threads.get_results()
+        results = multiclient.get_results()
         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
         if xrn:    
             result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
         if xrn:    
             result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
@@ -230,7 +230,7 @@ class SliceManager:
         hrn, type = urn_to_hrn(xrn)
         valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
         hrn, type = urn_to_hrn(xrn)
         valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM 
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM 
@@ -239,9 +239,9 @@ class SliceManager:
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
             # Just send entire RSpec to each aggregate
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
             # Just send entire RSpec to each aggregate
-            threads.run(_Allocate, aggregate, server, xrn, [cred], rspec.toxml(), options)
+            multiclient.run(_Allocate, aggregate, server, xrn, [cred], rspec.toxml(), options)
                 
                 
-        results = threads.get_results()
+        results = multiclient.get_results()
         manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
         result_rspec = RSpec(version=manifest_version)
         geni_urn = None
         manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
         result_rspec = RSpec(version=manifest_version)
         geni_urn = None
@@ -290,7 +290,7 @@ class SliceManager:
         # get the callers hrn
         valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0]
         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
         # get the callers hrn
         valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0]
         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
@@ -299,9 +299,9 @@ class SliceManager:
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
             # Just send entire RSpec to each aggregate
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
             # Just send entire RSpec to each aggregate
-            threads.run(_Provision, aggregate, server, xrn, [cred], options)
+            multiclient.run(_Provision, aggregate, server, xrn, [cred], options)
 
 
-        results = threads.get_results()
+        results = multiclient.get_results()
         manifest_version = version_manager._get_version('GENI', '3', 'manifest')
         result_rspec = RSpec(version=manifest_version)
         geni_slivers = []
         manifest_version = version_manager._get_version('GENI', '3', 'manifest')
         result_rspec = RSpec(version=manifest_version)
         geni_slivers = []
@@ -350,7 +350,7 @@ class SliceManager:
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential(minimumExpiration=31*86400)
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential(minimumExpiration=31*86400)
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
@@ -358,9 +358,9 @@ class SliceManager:
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
-            threads.run(_Renew, aggregate, server, xrn, [cred], expiration_time, options)
+            multiclient.run(_Renew, aggregate, server, xrn, [cred], expiration_time, options)
 
 
-        results = threads.get_results()
+        results = multiclient.get_results()
 
         geni_code = 0
         geni_output = ",".join([x.get('output',"") for x in results])
 
         geni_code = 0
         geni_output = ",".join([x.get('output',"") for x in results])
@@ -390,7 +390,7 @@ class SliceManager:
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
@@ -398,10 +398,10 @@ class SliceManager:
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
-            threads.run(_Delete, server, xrn, [cred], options)
+            multiclient.run(_Delete, server, xrn, [cred], options)
         
         results = []
         
         results = []
-        for result in threads.get_results():
+        for result in multiclient.get_results():
             results += ReturnValue.get_value(result)
         return results
     
             results += ReturnValue.get_value(result)
         return results
     
@@ -417,12 +417,12 @@ class SliceManager:
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
         for aggregate in api.aggregates:
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
-            threads.run (_Status, server, slice_xrn, [cred], options)
-        results = [ReturnValue.get_value(result) for result in threads.get_results()]
+            multiclient.run (_Status, server, slice_xrn, [cred], options)
+        results = [ReturnValue.get_value(result) for result in multiclient.get_results()]
     
         # get rid of any void result - e.g. when call_id was hit, where by convention we return {}
         results = [ result for result in results if result and result['geni_slivers']]
     
         # get rid of any void result - e.g. when call_id was hit, where by convention we return {}
         results = [ result for result in results if result and result['geni_slivers']]
@@ -455,12 +455,12 @@ class SliceManager:
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
         for aggregate in api.aggregates:
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
-            threads.run (_Describe, server, xrns, [cred], options)
-        results = [ReturnValue.get_value(result) for result in threads.get_results()]
+            multiclient.run (_Describe, server, xrns, [cred], options)
+        results = [ReturnValue.get_value(result) for result in multiclient.get_results()]
 
         # get rid of any void result - e.g. when call_id was hit, where by convention we return {}
         results = [ result for result in results if result and result.get('geni_urn')]
 
         # get rid of any void result - e.g. when call_id was hit, where by convention we return {}
         results = [ result for result in results if result and result.get('geni_urn')]
@@ -496,7 +496,7 @@ class SliceManager:
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
@@ -504,8 +504,8 @@ class SliceManager:
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)    
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)    
-            threads.run(server.PerformOperationalAction, xrn, [cred], action, options)
-        threads.get_results()    
+            multiclient.run(server.PerformOperationalAction, xrn, [cred], action, options)
+        multiclient.get_results()    
         return 1
      
     def Shutdown(self, api, xrn, creds, options={}):
         return 1
      
     def Shutdown(self, api, xrn, creds, options={}):
@@ -518,7 +518,7 @@ class SliceManager:
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
         cred = api.getDelegatedCredential(creds)
         if not cred:
             cred = api.getCredential()
-        threads = ThreadManager()
+        multiclient = MultiClient()
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
         for aggregate in api.aggregates:
             # prevent infinite loop. Dont send request back to caller
             # unless the caller is the aggregate's SM
@@ -526,7 +526,7 @@ class SliceManager:
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
                 continue
             interface = api.aggregates[aggregate]
             server = api.server_proxy(interface, cred)
-            threads.run(server.Shutdown, xrn.urn, cred)
-        threads.get_results()    
+            multiclient.run(server.Shutdown, xrn.urn, cred)
+        multiclient.get_results()    
         return 1
     
         return 1
     
index de34995..16eec41 100644 (file)
@@ -24,7 +24,7 @@ from sfa.planetlab.plxrn import PlXrn
 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename
 from sfa.rspecs.version_manager import VersionManager
 from sfa.openstack.security_group import SecurityGroup
 from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename
 from sfa.rspecs.version_manager import VersionManager
 from sfa.openstack.security_group import SecurityGroup
-from sfa.server.threadmanager import ThreadManager
+from sfa.client.multiclient import MultiClient
 from sfa.util.sfalogging import logger
 
 def pubkeys_to_user_data(pubkeys):
 from sfa.util.sfalogging import logger
 
 def pubkeys_to_user_data(pubkeys):
@@ -412,7 +412,7 @@ class OSAggregate:
                     time.sleep(.5)
                 manager.delete_security_group(security_group)
 
                     time.sleep(.5)
                 manager.delete_security_group(security_group)
 
-        thread_manager = ThreadManager()
+        multiclient = MultiClient()
         tenant = self.driver.shell.auth_manager.tenants.find(id=instance.tenant_id)  
         self.driver.shell.nova_manager.connect(tenant=tenant.name)
         args = {'name': instance.name,
         tenant = self.driver.shell.auth_manager.tenants.find(id=instance.tenant_id)  
         self.driver.shell.nova_manager.connect(tenant=tenant.name)
         args = {'name': instance.name,
@@ -423,7 +423,7 @@ class OSAggregate:
             # destroy instance
             self.driver.shell.nova_manager.servers.delete(instance)
             # deleate this instance's security groups
             # destroy instance
             self.driver.shell.nova_manager.servers.delete(instance)
             # deleate this instance's security groups
-            thread_manager.run(_delete_security_group, instance)
+            multiclient.run(_delete_security_group, instance)
         return 1
 
     def stop_instances(self, instance_name, tenant_name, id=None):
         return 1
 
     def stop_instances(self, instance_name, tenant_name, id=None):