X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fmanagers%2Fslice_manager_pl.py;h=7b57e0f142d1d5220c5e4095f0cc0228a6e920a3;hb=e01bd14db52bd0ba64941e86e22dc890bacadb52;hp=995b837bb776f510f2aa4fe3aecf7097d79d5a1d;hpb=a06e9fb53ab40e3c67cfa8343afa296acad4b1e6;p=sfa.git diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index 995b837b..7b57e0f1 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -1,4 +1,4 @@ -# +# import sys import time,datetime from StringIO import StringIO @@ -58,11 +58,11 @@ def get_serverproxy_url (server): return server.url except: logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") - return server._ServerProxy__host + server._ServerProxy__handler + return server._ServerProxy__host + server._ServerProxy__handler def GetVersion(api): # peers explicitly in aggregates.xml - peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() if peername != api.hrn]) xrn=Xrn (api.hrn) request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)] @@ -82,26 +82,52 @@ def GetVersion(api): sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version +def drop_slicemgr_stats(rspec): + try: + stats_elements = rspec.xml.xpath('//statistics') + for node in stats_elements: + node.getparent().remove(node) + except Exception, e: + api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) + +def add_slicemgr_stat(rspec, callname, aggname, elapsed, status): + try: + stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) + if stats_tags: + stats_tag = stats_tags[0] + else: + stats_tag = etree.SubElement(rspec.xml, "statistics", call=callname) + + etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) + except Exception, e: + api.logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) def ListResources(api, creds, options, call_id): - def _ListResources(server, credential, my_opts, call_id): + def _ListResources(aggregate, server, credential, opts, call_id): + + my_opts = copy(opts) args = [credential, my_opts] - if _call_id_supported(api, server): - args.append(call_id) + tStart = time.time() try: - return server.ListResources(*args) + if _call_id_supported(api, server): + args.append(call_id) + version = api.get_cached_server_version(server) + # force ProtoGENI aggregates to give us a v2 RSpec + if 'sfa' not in version.keys(): + my_opts['rspec_version'] = dict(pg_rspec_ad_version) + rspec = server.ListResources(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} except Exception, e: api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e))) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" # get slice's hrn from options xrn = options.get('geni_slice_urn', '') (hrn, type) = urn_to_hrn(xrn) - my_opts = copy(options) - my_opts['geni_compressed'] = False - if 'rspec_version' in my_opts: - del my_opts['rspec_version'] + if 'geni_compressed' in options: + del(options['geni_compressed']) # get the rspec's return format from options rspec_version = RSpecVersion(options.get('rspec_version')) @@ -131,20 +157,22 @@ def ListResources(api, creds, options, call_id): # get the rspec from the aggregate server = api.aggregates[aggregate] - #threads.run(server.ListResources, credentials, my_opts, call_id) - threads.run(_ListResources, server, credentials, my_opts, call_id) + threads.run(_ListResources, aggregate, server, credentials, options, call_id) results = threads.get_results() - rspec_version = RSpecVersion(my_opts.get('rspec_version')) + rspec_version = RSpecVersion(options.get('rspec_version')) if rspec_version['type'] == pg_rspec_ad_version['type']: rspec = PGRSpec() else: rspec = SfaRSpec() + for result in results: - try: - rspec.merge(result) - except: - api.logger.info("SM.ListResources: Failed to merge aggregate rspec") + add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.merge(result["rspec"]) + except: + api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") # cache the result if caching and api.cache and not xrn: @@ -155,11 +183,12 @@ def ListResources(api, creds, options, call_id): def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - def _CreateSliver(server, xrn, credential, rspec, users, call_id): + def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id): + tStart = time.time() try: - # Need to call GetVersion at an aggregate to determine the supported - # rspec type/format beofre calling CreateSliver at an Aggregate. - server_version = api.get_cached_server_version(server) + # Need to call GetVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + server_version = api.get_cached_server_version(server) if 'sfa' not in server_version and 'geni_api' in server_version: # sfa aggregtes support both sfa and pg rspecs, no need to convert # if aggregate supports sfa rspecs. otherwise convert to pg rspec @@ -167,9 +196,11 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): args = [xrn, credential, rspec, users] if _call_id_supported(api, server): args.append(call_id) - return server.CreateSliver(*args) + rspec = server.CreateSliver(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} except: logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" # Validate the RSpec against PlanetLab's schema --disabled for now @@ -180,6 +211,10 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): if schema: rspec.validate(schema) + # if there is a section, the aggregates don't care about it, + # so delete it. + drop_slicemgr_stats(rspec) + # attempt to use delegated credential first credential = api.getDelegatedCredential(creds) if not credential: @@ -197,17 +232,22 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): continue server = api.aggregates[aggregate] # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id) + threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id) results = threads.get_results() rspec = SfaRSpec() for result in results: - rspec.merge(result) + add_slicemgr_stat(rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.merge(result["rspec"]) + except: + api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") return rspec.toxml() def RenewSliver(api, xrn, creds, expiration_time, call_id): def _RenewSliver(server, xrn, creds, expiration_time, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [xrn, creds, expiration_time, call_id] if _call_id_supported(api, server): args.append(call_id) @@ -237,7 +277,7 @@ def RenewSliver(api, xrn, creds, expiration_time, call_id): def DeleteSliver(api, xrn, creds, call_id): def _DeleteSliver(server, xrn, creds, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [xrn, creds] if _call_id_supported(api, server): args.append(call_id) @@ -268,7 +308,7 @@ def DeleteSliver(api, xrn, creds, call_id): # first draft at a merging SliverStatus def SliverStatus(api, slice_xrn, creds, call_id): def _SliverStatus(server, xrn, creds, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [xrn, creds] if _call_id_supported(api, server): args.append(call_id) @@ -310,7 +350,7 @@ caching=True #caching=False def ListSlices(api, creds, call_id): def _ListSlices(server, creds, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [creds] if _call_id_supported(api, server): args.append(call_id) @@ -391,7 +431,7 @@ def get_ticket(api, xrn, creds, rspec, users): continue # send the request to this address url = target_aggs[0]['url'] - server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file) + server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file, timeout=30) # aggregate found, no need to keep looping break if server is None: @@ -502,4 +542,4 @@ def main(): if __name__ == "__main__": main() - +