X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fmanagers%2Fslice_manager_pl.py;h=bda355e55dba0d06de72d0efaeefb289e9c4d447;hb=dec7138b35234db846d96dc6ce3b5a402c474a1d;hp=57d888a488eb7a767047320db4cdc6601229c18a;hpb=80ff4de69d60d2b0ce89af9bec5f9c08cd86b9d4;p=sfa.git diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index 57d888a4..bda355e5 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -1,4 +1,4 @@ -# +# import sys import time,datetime from StringIO import StringIO @@ -33,27 +33,18 @@ from sfa.util.version import version_core from sfa.util.callids import Callids -def _get_cached_server_version(api, server): - cache_key = server.url + "-version" - server_version = api.cache.get(cache_key) - if not server_version: - server_version = server.GetVersion() - # cache version for 24 hours - api.cache.add(cache_key, server_version, ttl= 60*60*24) - return server_version - def _call_id_supported(api, server): """ Returns true if server support the optional call_id arg, false otherwise. """ - server_version = _get_cached_server_version(api, server) + server_version = api.get_cached_server_version(server) if 'sfa' in server_version: code_tag = server_version['code_tag'] code_tag_parts = code_tag.split("-") version_parts = code_tag_parts[0].split(".") - major, minor = version_parts[0], version_parts[1] + major, minor = version_parts[0:2] rev = code_tag_parts[1] if int(major) > 1: if int(minor) > 0 or int(rev) > 20: @@ -67,11 +58,11 @@ def get_serverproxy_url (server): return server.url except: logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") - return server._ServerProxy__host + server._ServerProxy__handler + return server._ServerProxy__host + server._ServerProxy__handler def GetVersion(api): # peers explicitly in aggregates.xml - peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() if peername != api.hrn]) xrn=Xrn (api.hrn) request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)] @@ -91,26 +82,52 @@ def GetVersion(api): sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version +def drop_slicemgr_stats(rspec): + try: + stats_elements = rspec.xml.xpath('//statistics') + for node in stats_elements: + node.getparent().remove(node) + except Exception, e: + api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) + +def add_slicemgr_stat(rspec, callname, aggname, elapsed, status): + try: + stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) + if stats_tags: + stats_tag = stats_tags[0] + else: + stats_tag = etree.SubElement(rspec.xml, "statistics", call="ListResources") + + etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) + except Exception, e: + api.logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) def ListResources(api, creds, options, call_id): - def _ListResources(server, credential, my_opts, call_id): + def _ListResources(aggregate, server, credential, opts, call_id): + + my_opts = copy(opts) args = [credential, my_opts] - if _call_id_supported(api, server): - args.append(call_id) + tStart = time.time() try: - return server.ListResources(*args) + if _call_id_supported(api, server): + args.append(call_id) + version = api.get_cached_server_version(server) + # force ProtoGENI aggregates to give us a v2 RSpec + if 'sfa' not in version.keys(): + my_opts['rspec_version'] = pg_rspec_ad_version + rspec = server.ListResources(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} except Exception, e: api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e))) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" # get slice's hrn from options xrn = options.get('geni_slice_urn', '') (hrn, type) = urn_to_hrn(xrn) - my_opts = copy(options) - my_opts['geni_compressed'] = False - if 'rspec_version' in my_opts: - del my_opts['rspec_version'] + if 'geni_compressed' in options: + del(options['geni_compressed']) # get the rspec's return format from options rspec_version = RSpecVersion(options.get('rspec_version')) @@ -137,22 +154,25 @@ def ListResources(api, creds, options, call_id): # unless the caller is the aggregate's SM if caller_hrn == aggregate and aggregate != api.hrn: continue + # get the rspec from the aggregate server = api.aggregates[aggregate] - #threads.run(server.ListResources, credentials, my_opts, call_id) - threads.run(_ListResources, server, credentials, my_opts, call_id) + threads.run(_ListResources, aggregate, server, credentials, options, call_id) results = threads.get_results() - rspec_version = RSpecVersion(my_opts.get('rspec_version')) + rspec_version = RSpecVersion(options.get('rspec_version')) if rspec_version['type'] == pg_rspec_ad_version['type']: rspec = PGRSpec() else: rspec = SfaRSpec() + for result in results: - try: - rspec.merge(result) - except: - api.logger.info("SM.ListResources: Failed to merge aggregate rspec") + add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.merge(result["rspec"]) + except: + api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") # cache the result if caching and api.cache and not xrn: @@ -163,21 +183,24 @@ def ListResources(api, creds, options, call_id): def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - def _CreateSliver(server, xrn, credential, rspec, users, call_id): - # Need to call GetVersion at an aggregate to determine the supported - # rspec type/format beofre calling CreateSliver at an Aggregate. - server_version = _get_server_version(api, server) - if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version: + def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id): + tStart = time.time() + try: + # Need to call GetVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + server_version = api.get_cached_server_version(server) + if 'sfa' not in server_version and 'geni_api' in server_version: # sfa aggregtes support both sfa and pg rspecs, no need to convert - # if aggregate supports sfa rspecs. othewise convert to pg rspec + # if aggregate supports sfa rspecs. otherwise convert to pg rspec rspec = RSpecConverter.to_pg_rspec(rspec) args = [xrn, credential, rspec, users] if _call_id_supported(api, server): args.append(call_id) - try: - return server.CreateSliver(*args) - except Exception, e: - api.logger.warn("CreateSliver failed at %s: %s" %(server.url, str(e))) + rspec = server.CreateSliver(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + except: + logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" # Validate the RSpec against PlanetLab's schema --disabled for now @@ -188,6 +211,10 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): if schema: rspec.validate(schema) + # if there is a section, the aggregates don't care about it, + # so delete it. + drop_slicemgr_stats(rspec) + # attempt to use delegated credential first credential = api.getDelegatedCredential(creds) if not credential: @@ -205,17 +232,22 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): continue server = api.aggregates[aggregate] # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id) + threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id) results = threads.get_results() rspec = SfaRSpec() for result in results: - rspec.merge(result) + add_slicemgr_stat(rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.merge(result["rspec"]) + except: + api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") return rspec.toxml() def RenewSliver(api, xrn, creds, expiration_time, call_id): def _RenewSliver(server, xrn, creds, expiration_time, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [xrn, creds, expiration_time, call_id] if _call_id_supported(api, server): args.append(call_id) @@ -245,7 +277,7 @@ def RenewSliver(api, xrn, creds, expiration_time, call_id): def DeleteSliver(api, xrn, creds, call_id): def _DeleteSliver(server, xrn, creds, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [xrn, creds] if _call_id_supported(api, server): args.append(call_id) @@ -276,7 +308,7 @@ def DeleteSliver(api, xrn, creds, call_id): # first draft at a merging SliverStatus def SliverStatus(api, slice_xrn, creds, call_id): def _SliverStatus(server, xrn, creds, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [xrn, creds] if _call_id_supported(api, server): args.append(call_id) @@ -318,7 +350,7 @@ caching=True #caching=False def ListSlices(api, creds, call_id): def _ListSlices(server, creds, call_id): - server_version = _get_server_version(api, server) + server_version = api.get_cached_server_version(api, server) args = [creds] if _call_id_supported(api, server): args.append(call_id) @@ -399,7 +431,7 @@ def get_ticket(api, xrn, creds, rspec, users): continue # send the request to this address url = target_aggs[0]['url'] - server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file) + server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file, timeout=30) # aggregate found, no need to keep looping break if server is None: @@ -510,4 +542,4 @@ def main(): if __name__ == "__main__": main() - +