extend threading functionality to other SM methods
authorTony Mack <tmack@cs.princeton.edu>
Sat, 17 Jul 2010 14:37:51 +0000 (14:37 +0000)
committerTony Mack <tmack@cs.princeton.edu>
Sat, 17 Jul 2010 14:37:51 +0000 (14:37 +0000)
sfa/managers/slice_manager_pl.py

index 3ecb706..d71b38a 100644 (file)
@@ -24,16 +24,11 @@ import sfa.plc.peers as peers
 
 def delete_slice(api, xrn, origin_hrn=None):
     credential = api.getCredential()
-    aggregates = api.aggregates
-    for aggregate in aggregates:
-        success = False
-        # request hash is optional so lets try the call without it
-        try:
-            aggregates[aggregate].delete_slice(credential, xrn, origin_hrn)
-            success = True
-        except:
-            print >> log, "%s" % (traceback.format_exc())
-            print >> log, "Error calling delete slice at aggregate %s" % aggregate
+    threads = ThreadManager()
+    for aggregate in api.aggregates:
+        server = api.aggregates[aggregate] 
+        threads.run(server.delete_slice, credential, xrn, origin_hrn)
+    threads.get_results()
     return 1
 
 def create_slice(api, xrn, rspec, origin_hrn=None):
@@ -58,18 +53,15 @@ def create_slice(api, xrn, rspec, origin_hrn=None):
             message = "%s (line %s)" % (error.message, error.line)
             raise InvalidRSpec(message)
 
-    aggs = api.aggregates
-    cred = api.getCredential()                                                 
-    for agg in aggs:
-        if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
-            try:
-                # Just send entire RSpec to each aggregate
-                aggs[agg].create_slice(cred, xrn, rspec, origin_hrn)
-            except:
-                print >> log, "Error creating slice %s at %s" % (hrn, agg)
-                traceback.print_exc()
-
-    return True
+    cred = api.getCredential()
+    threads = ThreadManager()
+    for aggregate in api.aggregates:
+        if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
+            server = api.aggregates[aggregate]
+            # Just send entire RSpec to each aggregate
+            threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
+    threads.get_results() 
+    return 1
 
 def get_ticket(api, xrn, rspec, origin_hrn=None):
     slice_hrn, type = urn_to_hrn(xrn)
@@ -156,28 +148,19 @@ def get_ticket(api, xrn, rspec, origin_hrn=None):
     return new_ticket.save_to_string(save_parents=True)
 
 def start_slice(api, xrn):
-    hrn, type = urn_to_hrn(xrn)
-    slicename = hrn_to_pl_slicename(hrn)
-    slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
-    if not slices:
-        raise RecordNotFound(hrn)
-    slice_id = slices[0]
-    attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
-    attribute_id = attreibutes[0]['slice_attribute_id']
-    api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" )
-
+    credential = api.getCredential()
+    threads = ThreadManager()
+    for aggregate in api.aggregates:
+        server = api.aggregates[aggregate]
+        threads.run(server.stop_slice, credential, xrn)
     return 1
  
 def stop_slice(api, xrn):
-    hrn, type = urn_to_hrn(xrn)
-    slicename = hrn_to_pl_slicename(hrn)
-    slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
-    if not slices:
-        raise RecordNotFound(hrn)
-    slice_id = slices[0]['slice_id']
-    attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
-    attribute_id = attributes[0]['slice_attribute_id']
-    api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0")
+    credential = api.getCredential()
+    threads = ThreadManager()
+    for aggregate in api.aggregates:
+        server = api.aggregates[aggregate]
+        threads.run(server.stop_slice, credential, xrn)
     return 1
 
 def reset_slice(api, xrn):
@@ -194,14 +177,17 @@ def get_slices(api):
     # fetch from aggregates
     slices = []
     credential = api.getCredential()
+    threads = Threadmanager()
     for aggregate in api.aggregates:
-        try:
-            tmp_slices = api.aggregates[aggregate].get_slices(credential)
-            slices.extend(tmp_slices)
-        except:
-            print >> log, "%s" % (traceback.format_exc())
-            print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
+        server = api.aggregates[aggregate]
+        threads.run(server.get_slices, credential)
 
+    # combime results
+    results = threads.get_results()
+    slices = []
+    for result in results:
+        slices.extend(result)
+    
     # cache the result
     if api.cache:
         api.cache.add('slices', slices)
@@ -217,13 +203,13 @@ def get_rspec(api, xrn=None, origin_hrn=None):
 
     hrn, type = urn_to_hrn(xrn)
     rspec = None
-    aggs = api.aggregates
     cred = api.getCredential()
     threads = ThreadManager()
-    for agg in aggs:
-        if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
-                # get the rspec from the aggregate
-                threads.run(aggs[agg].get_resources, cred, xrn, origin_hrn)
+    for aggregate in api.aggregates:
+        if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
+            # get the rspec from the aggregate
+            server = api.aggregates[aggregate]
+            threads.run(server.get_resources, cred, xrn, origin_hrn)
 
     results = threads.get_results()
     # combine the rspecs into a single rspec 
@@ -231,7 +217,7 @@ def get_rspec(api, xrn=None, origin_hrn=None):
         try:
             tree = etree.parse(StringIO(agg_rspec))
         except etree.XMLSyntaxError:
-            message = agg + ": " + str(sys.exc_info()[1])
+            message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
             raise InvalidRSpec(message)
 
         root = tree.getroot()