From d5171f944557ee597783370f0b96dd51416be25c Mon Sep 17 00:00:00 2001 From: Tony Mack Date: Sat, 17 Jul 2010 14:37:51 +0000 Subject: [PATCH] extend threading functionality to other SM methods --- sfa/managers/slice_manager_pl.py | 92 ++++++++++++++------------------ 1 file changed, 39 insertions(+), 53 deletions(-) diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index 3ecb706f..d71b38a2 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -24,16 +24,11 @@ import sfa.plc.peers as peers def delete_slice(api, xrn, origin_hrn=None): credential = api.getCredential() - aggregates = api.aggregates - for aggregate in aggregates: - success = False - # request hash is optional so lets try the call without it - try: - aggregates[aggregate].delete_slice(credential, xrn, origin_hrn) - success = True - except: - print >> log, "%s" % (traceback.format_exc()) - print >> log, "Error calling delete slice at aggregate %s" % aggregate + threads = ThreadManager() + for aggregate in api.aggregates: + server = api.aggregates[aggregate] + threads.run(server.delete_slice, credential, xrn, origin_hrn) + threads.get_results() return 1 def create_slice(api, xrn, rspec, origin_hrn=None): @@ -58,18 +53,15 @@ def create_slice(api, xrn, rspec, origin_hrn=None): message = "%s (line %s)" % (error.message, error.line) raise InvalidRSpec(message) - aggs = api.aggregates - cred = api.getCredential() - for agg in aggs: - if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]: - try: - # Just send entire RSpec to each aggregate - aggs[agg].create_slice(cred, xrn, rspec, origin_hrn) - except: - print >> log, "Error creating slice %s at %s" % (hrn, agg) - traceback.print_exc() - - return True + cred = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]: + server = api.aggregates[aggregate] + # Just send entire RSpec to each aggregate + threads.run(server.create_slice, cred, xrn, rspec, origin_hrn) + threads.get_results() + return 1 def get_ticket(api, xrn, rspec, origin_hrn=None): slice_hrn, type = urn_to_hrn(xrn) @@ -156,28 +148,19 @@ def get_ticket(api, xrn, rspec, origin_hrn=None): return new_ticket.save_to_string(save_parents=True) def start_slice(api, xrn): - hrn, type = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(hrn) - slice_id = slices[0] - attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) - attribute_id = attreibutes[0]['slice_attribute_id'] - api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" ) - + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + server = api.aggregates[aggregate] + threads.run(server.stop_slice, credential, xrn) return 1 def stop_slice(api, xrn): - hrn, type = urn_to_hrn(xrn) - slicename = hrn_to_pl_slicename(hrn) - slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(hrn) - slice_id = slices[0]['slice_id'] - attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id']) - attribute_id = attributes[0]['slice_attribute_id'] - api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0") + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + server = api.aggregates[aggregate] + threads.run(server.stop_slice, credential, xrn) return 1 def reset_slice(api, xrn): @@ -194,14 +177,17 @@ def get_slices(api): # fetch from aggregates slices = [] credential = api.getCredential() + threads = Threadmanager() for aggregate in api.aggregates: - try: - tmp_slices = api.aggregates[aggregate].get_slices(credential) - slices.extend(tmp_slices) - except: - print >> log, "%s" % (traceback.format_exc()) - print >> log, "Error calling slices at aggregate %(aggregate)s" % locals() + server = api.aggregates[aggregate] + threads.run(server.get_slices, credential) + # combime results + results = threads.get_results() + slices = [] + for result in results: + slices.extend(result) + # cache the result if api.cache: api.cache.add('slices', slices) @@ -217,13 +203,13 @@ def get_rspec(api, xrn=None, origin_hrn=None): hrn, type = urn_to_hrn(xrn) rspec = None - aggs = api.aggregates cred = api.getCredential() threads = ThreadManager() - for agg in aggs: - if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]: - # get the rspec from the aggregate - threads.run(aggs[agg].get_resources, cred, xrn, origin_hrn) + for aggregate in api.aggregates: + if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]: + # get the rspec from the aggregate + server = api.aggregates[aggregate] + threads.run(server.get_resources, cred, xrn, origin_hrn) results = threads.get_results() # combine the rspecs into a single rspec @@ -231,7 +217,7 @@ def get_rspec(api, xrn=None, origin_hrn=None): try: tree = etree.parse(StringIO(agg_rspec)) except etree.XMLSyntaxError: - message = agg + ": " + str(sys.exc_info()[1]) + message = str(agg_rspec) + ": " + str(sys.exc_info()[1]) raise InvalidRSpec(message) root = tree.getroot() -- 2.43.0