From 3d9a47c9a4d045727beca3a6c7eb515df099544b Mon Sep 17 00:00:00 2001 From: smbaker Date: Mon, 8 Aug 2011 22:50:42 -0700 Subject: [PATCH] add statistics to slicemanager listresources/createsliver, better error checking on bad aggregates --- sfa/managers/slice_manager_pl.py | 89 ++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 26 deletions(-) diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index 7ad48f90..bda355e5 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -1,4 +1,4 @@ -# +# import sys import time,datetime from StringIO import StringIO @@ -58,11 +58,11 @@ def get_serverproxy_url (server): return server.url except: logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") - return server._ServerProxy__host + server._ServerProxy__handler + return server._ServerProxy__host + server._ServerProxy__handler def GetVersion(api): # peers explicitly in aggregates.xml - peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() if peername != api.hrn]) xrn=Xrn (api.hrn) request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)] @@ -82,22 +82,44 @@ def GetVersion(api): sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version +def drop_slicemgr_stats(rspec): + try: + stats_elements = rspec.xml.xpath('//statistics') + for node in stats_elements: + node.getparent().remove(node) + except Exception, e: + api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) + +def add_slicemgr_stat(rspec, callname, aggname, elapsed, status): + try: + stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) + if stats_tags: + stats_tag = stats_tags[0] + else: + stats_tag = etree.SubElement(rspec.xml, "statistics", call="ListResources") + + etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) + except Exception, e: + api.logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) def ListResources(api, creds, options, call_id): - def _ListResources(server, credential, opts, call_id): - + def _ListResources(aggregate, server, credential, opts, call_id): + my_opts = copy(opts) args = [credential, my_opts] - if _call_id_supported(api, server): - args.append(call_id) - version = api.get_cached_server_version(server) - # force ProtoGENI aggregates to give us a v2 RSpec - if 'sfa' not in version.keys(): - my_opts['rspec_version'] = pg_rspec_ad_version + tStart = time.time() try: - return server.ListResources(*args) + if _call_id_supported(api, server): + args.append(call_id) + version = api.get_cached_server_version(server) + # force ProtoGENI aggregates to give us a v2 RSpec + if 'sfa' not in version.keys(): + my_opts['rspec_version'] = pg_rspec_ad_version + rspec = server.ListResources(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} except Exception, e: api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e))) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" @@ -106,7 +128,7 @@ def ListResources(api, creds, options, call_id): (hrn, type) = urn_to_hrn(xrn) if 'geni_compressed' in options: del(options['geni_compressed']) - + # get the rspec's return format from options rspec_version = RSpecVersion(options.get('rspec_version')) version_string = "rspec_%s" % (rspec_version.get_version_name()) @@ -135,7 +157,7 @@ def ListResources(api, creds, options, call_id): # get the rspec from the aggregate server = api.aggregates[aggregate] - threads.run(_ListResources, server, credentials, options, call_id) + threads.run(_ListResources, aggregate, server, credentials, options, call_id) results = threads.get_results() rspec_version = RSpecVersion(options.get('rspec_version')) @@ -143,11 +165,14 @@ def ListResources(api, creds, options, call_id): rspec = PGRSpec() else: rspec = SfaRSpec() + for result in results: - try: - rspec.merge(result) - except: - api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") + add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.merge(result["rspec"]) + except: + api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") # cache the result if caching and api.cache and not xrn: @@ -158,11 +183,12 @@ def ListResources(api, creds, options, call_id): def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - def _CreateSliver(server, xrn, credential, rspec, users, call_id): + def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id): + tStart = time.time() try: - # Need to call GetVersion at an aggregate to determine the supported - # rspec type/format beofre calling CreateSliver at an Aggregate. - server_version = api.get_cached_server_version(server) + # Need to call GetVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + server_version = api.get_cached_server_version(server) if 'sfa' not in server_version and 'geni_api' in server_version: # sfa aggregtes support both sfa and pg rspecs, no need to convert # if aggregate supports sfa rspecs. otherwise convert to pg rspec @@ -170,9 +196,11 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): args = [xrn, credential, rspec, users] if _call_id_supported(api, server): args.append(call_id) - return server.CreateSliver(*args) + rspec = server.CreateSliver(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} except: logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" # Validate the RSpec against PlanetLab's schema --disabled for now @@ -183,6 +211,10 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): if schema: rspec.validate(schema) + # if there is a section, the aggregates don't care about it, + # so delete it. + drop_slicemgr_stats(rspec) + # attempt to use delegated credential first credential = api.getDelegatedCredential(creds) if not credential: @@ -200,12 +232,17 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): continue server = api.aggregates[aggregate] # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id) + threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id) results = threads.get_results() rspec = SfaRSpec() for result in results: - rspec.merge(result) + add_slicemgr_stat(rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.merge(result["rspec"]) + except: + api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") return rspec.toxml() def RenewSliver(api, xrn, creds, expiration_time, call_id): @@ -505,4 +542,4 @@ def main(): if __name__ == "__main__": main() - + -- 2.43.0