From cfc23b00778fcbdaa9c211fa33c2ece7b3afc235 Mon Sep 17 00:00:00 2001 From: smbaker <smbaker@fc8clean.lan> Date: Mon, 7 Nov 2011 19:21:46 -0800 Subject: [PATCH] revert to old slice_manager --- sfa/managers/slice_manager.py | 353 +--------------------------------- 1 file changed, 6 insertions(+), 347 deletions(-) diff --git a/sfa/managers/slice_manager.py b/sfa/managers/slice_manager.py index cf60d783..72a9c698 100644 --- a/sfa/managers/slice_manager.py +++ b/sfa/managers/slice_manager.py @@ -1,5 +1,4 @@ import sys -import traceback import time from StringIO import StringIO from copy import copy @@ -81,120 +80,16 @@ class SliceManager: local_am_url=self.get_serverproxy_url(api.aggregates[api.hrn]) sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version - -def drop_slicemgr_stats(rspec): - try: - stats_elements = rspec.xml.xpath('//statistics') - for node in stats_elements: - node.getparent().remove(node) - except Exception, e: - logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) - -def add_slicemgr_stat(rspec, callname, aggname, elapsed, status, exc_info=None): - try: - stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) - if stats_tags: - stats_tag = stats_tags[0] - else: - stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname) - - stat_tag = etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) - - if exc_info: - exc_tag = etree.SubElement(stat_tag, "exc_info", name=str(exc_info[1])) - - # this would encode it as a text block - #exc_tag.text = "\n".join(traceback.format_exception(exc_info[0], exc_info[1], exc_info[2])) - - # this encodes the traceback as a set of xml tags - tb = traceback.extract_tb(exc_info[2]) - for item in tb: - exc_frame = etree.SubElement(exc_tag, "tb_frame", filename=str(item[0]), line=str(item[1]), func=str(item[2]), code=str(item[3])) - - except Exception, e: - logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) - -def ListResources(api, creds, options, call_id): - version_manager = VersionManager() - def _ListResources(aggregate, server, credential, opts, call_id): - - my_opts = copy(opts) - args = [credential, my_opts] - tStart = time.time() + + def drop_slicemgr_stats(self, rspec): try: stats_elements = rspec.xml.xpath('//statistics') for node in stats_elements: node.getparent().remove(node) except Exception, e: - api.logger.log_exc("ListResources failed at %s" %(server.url)) - return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()} - - if Callids().already_handled(call_id): return "" - - # get slice's hrn from options - xrn = options.get('geni_slice_urn', '') - (hrn, type) = urn_to_hrn(xrn) - if 'geni_compressed' in options: - del(options['geni_compressed']) - - # get the rspec's return format from options - rspec_version = version_manager.get_version(options.get('rspec_version')) - version_string = "rspec_%s" % (rspec_version.to_string()) - - # look in cache first - if caching and api.cache and not xrn: - rspec = api.cache.get(version_string) - if rspec: - return rspec - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - - # get the rspec from the aggregate - interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) - threads.run(_ListResources, aggregate, server, [cred], options, call_id) - - - results = threads.get_results() - rspec_version = version_manager.get_version(options.get('rspec_version')) - if xrn: - result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest') - else: - result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad') - rspec = RSpec(version=result_version) - for result in results: - add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"], result.get("exc_info",None)) - if result["status"]=="success": - try: - rspec.version.merge(result["rspec"]) - except: - api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") - - # cache the result - if caching and api.cache and not xrn: - api.cache.add(version_string, rspec.toxml()) - - return rspec.toxml() - - -def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - - version_manager = VersionManager() - def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id): - tStart = time.time() + logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) + + def add_slicemgr_stat(self, rspec, callname, aggname, elapsed, status): try: stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) if stats_tags: @@ -365,46 +260,6 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): args = [xrn, creds, expiration_time, call_id] if self._call_id_supported(api, server): args.append(call_id) -<<<<<<< HEAD:sfa/managers/slice_manager.py - rspec = server.CreateSliver(*args) - return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} - except: - logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) - return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()} - - if Callids().already_handled(call_id): return "" - # Validate the RSpec against PlanetLab's schema --disabled for now - # The schema used here needs to aggregate the PL and VINI schemas - # schema = "/var/www/html/schemas/pl.rng" - rspec = RSpec(rspec_str) -# schema = None -# if schema: -# rspec.validate(schema) - - # if there is a <statistics> section, the aggregates don't care about it, - # so delete it. - drop_slicemgr_stats(rspec) - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - - # get the callers hrn - hrn, type = urn_to_hrn(xrn) - valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) - # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id) -======= return server.RenewSliver(*args) if Callids().already_handled(call_id): return True @@ -575,208 +430,12 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue ->>>>>>> a3996bfa45298c8d0abfd58916221abba737441c:sfa/managers/slice_manager.py -<<<<<<< HEAD:sfa/managers/slice_manager.py - results = threads.get_results() - manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') - result_rspec = RSpec(version=manifest_version) - for result in results: - add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"], result.get("exc_info",None)) - if result["status"]=="success": - try: - result_rspec.version.merge(result["rspec"]) - except: - api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") - return result_rspec.toxml() - -def RenewSliver(api, xrn, creds, expiration_time, call_id): - def _RenewSliver(server, xrn, creds, expiration_time, call_id): - server_version = api.get_cached_server_version(server) - args = [xrn, creds, expiration_time, call_id] - if _call_id_supported(api, server): - args.append(call_id) - return server.RenewSliver(*args) - - if Callids().already_handled(call_id): return True - - (hrn, type) = urn_to_hrn(xrn) - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) - threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id) - # 'and' the results - return reduce (lambda x,y: x and y, threads.get_results() , True) - -def DeleteSliver(api, xrn, creds, call_id): - def _DeleteSliver(server, xrn, creds, call_id): - server_version = api.get_cached_server_version(server) - args = [xrn, creds] - if _call_id_supported(api, server): - args.append(call_id) - return server.DeleteSliver(*args) - - if Callids().already_handled(call_id): return "" - (hrn, type) = urn_to_hrn(xrn) - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) - threads.run(_DeleteSliver, server, xrn, [cred], call_id) - threads.get_results() - return 1 - - -# first draft at a merging SliverStatus -def SliverStatus(api, slice_xrn, creds, call_id): - def _SliverStatus(server, xrn, creds, call_id): - server_version = api.get_cached_server_version(server) - args = [xrn, creds] - if _call_id_supported(api, server): - args.append(call_id) - return server.SliverStatus(*args) -======= interface = api.aggregates[aggregate] server = api.server_proxy(interface, cred) threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users) ->>>>>>> a3996bfa45298c8d0abfd58916221abba737441c:sfa/managers/slice_manager.py - -<<<<<<< HEAD:sfa/managers/slice_manager.py - if Callids().already_handled(call_id): return {} - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for aggregate in api.aggregates: - interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) - threads.run (_SliverStatus, server, slice_xrn, [cred], call_id) - results = threads.get_results() - - # get rid of any void result - e.g. when call_id was hit where by convention we return {} - results = [ result for result in results if result and result['geni_resources']] - - # do not try to combine if there's no result - if not results : return {} - - # otherwise let's merge stuff - overall = {} - - # mmh, it is expected that all results carry the same urn - overall['geni_urn'] = results[0]['geni_urn'] - overall['pl_login'] = results[0]['pl_login'] - # append all geni_resources - overall['geni_resources'] = \ - reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) - overall['status'] = 'unknown' - if overall['geni_resources']: - overall['status'] = 'ready' - - return overall - -caching=True -#caching=False -def ListSlices(api, creds, call_id): - def _ListSlices(server, creds, call_id): - server_version = api.get_cached_server_version(server) - args = [creds] - if _call_id_supported(api, server): - args.append(call_id) - return server.ListSlices(*args) - - if Callids().already_handled(call_id): return [] - - # look in cache first - if caching and api.cache: - slices = api.cache.get('slices') - if slices: - return slices - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred= api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - # fetch from aggregates - for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue - interface = api.aggregates[aggregate] - server = api.get_server(interface, cred) - threads.run(_ListSlices, server, [cred], call_id) - - # combime results - results = threads.get_results() - slices = [] - for result in results: - slices.extend(result) - - # cache the result - if caching and api.cache: - api.cache.add('slices', slices) - - return slices - - -def get_ticket(api, xrn, creds, rspec, users): - slice_hrn, type = urn_to_hrn(xrn) - # get the netspecs contained within the clients rspec - aggregate_rspecs = {} - tree= etree.parse(StringIO(rspec)) - elements = tree.findall('./network') - for element in elements: - aggregate_hrn = element.values()[0] - aggregate_rspecs[aggregate_hrn] = rspec - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - - # attempt to use delegated credential first - cred = api.getDelegatedCredential(creds) - if not cred: - cred = api.getCredential() - threads = ThreadManager() - for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems(): - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue -======= + results = threads.get_results() ->>>>>>> a3996bfa45298c8d0abfd58916221abba737441c:sfa/managers/slice_manager.py # gather information from each ticket rspec = None -- 2.47.0