X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fmanagers%2Fslice_manager_pl.py;h=7ed75ed974ca81bdc6007f253ff5726db38b1161;hb=5bb8f044e0085d73ec79abb1822421db3fc50101;hp=7ad48f909cfb0f9e4dd26d7f18785bbcbf217f59;hpb=fa08ec086dd7540c0af97193336ed573a61beacb;p=sfa.git diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index 7ad48f90..7ed75ed9 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -1,4 +1,4 @@ -# +# import sys import time,datetime from StringIO import StringIO @@ -15,13 +15,10 @@ from sfa.util.rspec import * from sfa.util.specdict import * from sfa.util.faults import * from sfa.util.record import SfaRecord -from sfa.rspecs.pg_rspec import PGRSpec -from sfa.rspecs.sfa_rspec import SfaRSpec from sfa.rspecs.rspec_converter import RSpecConverter -from sfa.rspecs.rspec_parser import parse_rspec -from sfa.rspecs.rspec_version import RSpecVersion -from sfa.rspecs.sfa_rspec import sfa_rspec_version -from sfa.rspecs.pg_rspec import pg_rspec_ad_version, pg_rspec_request_version +from sfa.client.client_helper import sfa_to_pg_users_arg +from sfa.rspecs.version_manager import VersionManager +from sfa.rspecs.rspec import RSpec from sfa.util.policy import Policy from sfa.util.prefixTree import prefixTree from sfa.util.sfaticket import * @@ -58,22 +55,29 @@ def get_serverproxy_url (server): return server.url except: logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals") - return server._ServerProxy__host + server._ServerProxy__handler + return server._ServerProxy__host + server._ServerProxy__handler def GetVersion(api): # peers explicitly in aggregates.xml - peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() + peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() if peername != api.hrn]) - xrn=Xrn (api.hrn) - request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)] - ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)] + version_manager = VersionManager() + ad_rspec_versions = [] + request_rspec_versions = [] + for rspec_version in version_manager.versions: + if rspec_version in ['*', 'ad']: + request_rspec_versions.append(rspec_version.to_dict()) + if rspec_version in ['*', 'request']: + request_rspec_version.append(rspec_version.to_dict()) + default_rspec_version = version_manager.get_version("sfa 1").to_dict() + xrn=Xrn(api.hrn) version_more = {'interface':'slicemgr', 'hrn' : xrn.get_hrn(), 'urn' : xrn.get_urn(), 'peers': peers, 'request_rspec_versions': request_rspec_versions, 'ad_rspec_versions': ad_rspec_versions, - 'default_ad_rspec': dict(sfa_rspec_version) + 'default_ad_rspec': default_rspec_version } sm_version=version_core(version_more) # local aggregate if present needs to have localhost resolved @@ -82,22 +86,45 @@ def GetVersion(api): sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version +def drop_slicemgr_stats(rspec): + try: + stats_elements = rspec.xml.xpath('//statistics') + for node in stats_elements: + node.getparent().remove(node) + except Exception, e: + api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e))) + +def add_slicemgr_stat(rspec, callname, aggname, elapsed, status): + try: + stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname) + if stats_tags: + stats_tag = stats_tags[0] + else: + stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname) + + etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status)) + except Exception, e: + api.logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e))) def ListResources(api, creds, options, call_id): - def _ListResources(server, credential, opts, call_id): - + version_manager = VersionManager() + def _ListResources(aggregate, server, credential, opts, call_id): + my_opts = copy(opts) args = [credential, my_opts] - if _call_id_supported(api, server): - args.append(call_id) - version = api.get_cached_server_version(server) - # force ProtoGENI aggregates to give us a v2 RSpec - if 'sfa' not in version.keys(): - my_opts['rspec_version'] = pg_rspec_ad_version + tStart = time.time() try: - return server.ListResources(*args) + if _call_id_supported(api, server): + args.append(call_id) + version = api.get_cached_server_version(server) + # force ProtoGENI aggregates to give us a v2 RSpec + if 'sfa' not in version.keys(): + my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict() + rspec = server.ListResources(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} except Exception, e: - api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e))) + api.logger.log_exc("ListResources failed at %s" %(server.url)) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" @@ -106,10 +133,10 @@ def ListResources(api, creds, options, call_id): (hrn, type) = urn_to_hrn(xrn) if 'geni_compressed' in options: del(options['geni_compressed']) - + # get the rspec's return format from options - rspec_version = RSpecVersion(options.get('rspec_version')) - version_string = "rspec_%s" % (rspec_version.get_version_name()) + rspec_version = version_manager.get_version(options.get('rspec_version')) + version_string = "rspec_%s" % (rspec_version.to_string()) # look in cache first if caching and api.cache and not xrn: @@ -135,19 +162,22 @@ def ListResources(api, creds, options, call_id): # get the rspec from the aggregate server = api.aggregates[aggregate] - threads.run(_ListResources, server, credentials, options, call_id) + threads.run(_ListResources, aggregate, server, credentials, options, call_id) results = threads.get_results() - rspec_version = RSpecVersion(options.get('rspec_version')) - if rspec_version['type'] == pg_rspec_ad_version['type']: - rspec = PGRSpec() - else: - rspec = SfaRSpec() + rspec_version = version_manager.get_version(options.get('rspec_version')) + if xrn: + result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest') + else: + result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad') + rspec = RSpec(version=result_version) for result in results: - try: - rspec.merge(result) - except: - api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") + add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + rspec.version.merge(result["rspec"]) + except: + api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") # cache the result if caching and api.cache and not xrn: @@ -158,31 +188,44 @@ def ListResources(api, creds, options, call_id): def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - def _CreateSliver(server, xrn, credential, rspec, users, call_id): + version_manager = VersionManager() + def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id): + tStart = time.time() try: - # Need to call GetVersion at an aggregate to determine the supported - # rspec type/format beofre calling CreateSliver at an Aggregate. - server_version = api.get_cached_server_version(server) + # Need to call GetVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + server_version = api.get_cached_server_version(server) + requested_users = users if 'sfa' not in server_version and 'geni_api' in server_version: # sfa aggregtes support both sfa and pg rspecs, no need to convert # if aggregate supports sfa rspecs. otherwise convert to pg rspec - rspec = RSpecConverter.to_pg_rspec(rspec) - args = [xrn, credential, rspec, users] + rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request')) + filter = {'component_manager_id': server_version['urn']} + rspec.filter(filter) + rspec = rspec.toxml() + requested_users = sfa_to_pg_users_arg(users) + args = [xrn, credential, rspec, requested_users] if _call_id_supported(api, server): args.append(call_id) - return server.CreateSliver(*args) + rspec = server.CreateSliver(*args) + return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} except: logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"} if Callids().already_handled(call_id): return "" # Validate the RSpec against PlanetLab's schema --disabled for now # The schema used here needs to aggregate the PL and VINI schemas # schema = "/var/www/html/schemas/pl.rng" - rspec = parse_rspec(rspec_str) + rspec = RSpec(rspec_str) schema = None if schema: rspec.validate(schema) + # if there is a section, the aggregates don't care about it, + # so delete it. + drop_slicemgr_stats(rspec) + # attempt to use delegated credential first credential = api.getDelegatedCredential(creds) if not credential: @@ -200,13 +243,19 @@ def CreateSliver(api, xrn, creds, rspec_str, users, call_id): continue server = api.aggregates[aggregate] # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id) + threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id) results = threads.get_results() - rspec = SfaRSpec() + manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') + result_rspec = RSpec(version=manifest_version) for result in results: - rspec.merge(result) - return rspec.toxml() + add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"]) + if result["status"]=="success": + try: + result_rspec.version.merge(result["rspec"]) + except: + api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") + return result_rspec.toxml() def RenewSliver(api, xrn, creds, expiration_time, call_id): def _RenewSliver(server, xrn, creds, expiration_time, call_id): @@ -313,7 +362,7 @@ caching=True #caching=False def ListSlices(api, creds, call_id): def _ListSlices(server, creds, call_id): - server_version = api.get_cached_server_version(api, server) + server_version = api.get_cached_server_version(server) args = [creds] if _call_id_supported(api, server): args.append(call_id) @@ -505,4 +554,4 @@ def main(): if __name__ == "__main__": main() - +