X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fmanagers%2Fslice_manager_pl.py;h=01fd43b2faf863686af2925e7a7c7568d4ff84c4;hb=b5e71f49ffc297ae9e31285857215b4ad5a51ae9;hp=44b1a5e834dd553681ce55cb4aee0b8a932fe175;hpb=725c637b3d6f4e41773b83f7977e2ba962b5b1b7;p=sfa.git diff --git a/sfa/managers/slice_manager_pl.py b/sfa/managers/slice_manager_pl.py index 44b1a5e8..01fd43b2 100644 --- a/sfa/managers/slice_manager_pl.py +++ b/sfa/managers/slice_manager_pl.py @@ -15,6 +15,10 @@ from sfa.util.rspec import * from sfa.util.specdict import * from sfa.util.faults import * from sfa.util.record import SfaRecord +from sfa.rspecs.pg_rspec import PGRSpec +from sfa.rspecs.sfa_rspec import SfaRSpec +from sfa.rspecs.rspec_converter import RSpecConverter +from sfa.rspecs.rspec_parser import parse_rspec from sfa.util.policy import Policy from sfa.util.prefixTree import prefixTree from sfa.util.sfaticket import * @@ -23,12 +27,9 @@ from sfa.util.threadmanager import ThreadManager import sfa.util.xmlrpcprotocol as xmlrpcprotocol import sfa.plc.peers as peers from sfa.util.version import version_core +from sfa.rspecs.rspec_version import RSpecVersion from sfa.util.callids import Callids -# XX FIX ME: should merge result from multiple aggregates instead of -# calling aggregate implementation -from sfa.managers.aggregate_manager_pl import slice_status - # we have specialized xmlrpclib.ServerProxy to remember the input url # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances def get_serverproxy_url (server): @@ -54,39 +55,38 @@ def GetVersion(api): sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname']) return sm_version -def CreateSliver(api, xrn, creds, rspec, users, call_id): +def CreateSliver(api, xrn, creds, rspec_str, users, call_id): - if Callids().already_handled(call_id): return "" + def _CreateSliver(server, xrn, credentail, rspec, users, call_id): + # get aggregate version + version = server.GetVersion() + if 'sfa' in version: + # just send the whole rspec to SFA AM/SM + server.CreateSliver(xrn, credential, rspec, users, call_id) + elif 'geni_api' in version: + pass + # convert to pg rspec + - hrn, type = urn_to_hrn(xrn) + if Callids().already_handled(call_id): return "" # Validate the RSpec against PlanetLab's schema --disabled for now # The schema used here needs to aggregate the PL and VINI schemas # schema = "/var/www/html/schemas/pl.rng" + rspec = parse_rspec(rspec_str) schema = None if schema: - try: - tree = etree.parse(StringIO(rspec)) - except etree.XMLSyntaxError: - message = str(sys.exc_info()[1]) - raise InvalidRSpec(message) - - relaxng_doc = etree.parse(schema) - relaxng = etree.RelaxNG(relaxng_doc) - - if not relaxng(tree): - error = relaxng.error_log.last_error - message = "%s (line %s)" % (error.message, error.line) - raise InvalidRSpec(message) - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + rspec.validate(schema) # attempt to use delegated credential first credential = api.getDelegatedCredential(creds) - if not credential: + if not credential: credential = api.getCredential() + + # get the callers hrn + hrn, type = urn_to_hrn(xrn) + valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] + caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() threads = ThreadManager() for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller @@ -96,11 +96,13 @@ def CreateSliver(api, xrn, creds, rspec, users, call_id): # Just send entire RSpec to each aggregate server = api.aggregates[aggregate] - threads.run(server.CreateSliver, xrn, credential, rspec, users, call_id) + threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id) - results = threads.get_results() - merged_rspec = merge_rspecs(results) - return merged_rspec + results = threads.get_results() + rspec = SfaRSpec() + for result in results: + rspec.merge(result) + return rspec.toxml() def RenewSliver(api, xrn, creds, expiration_time, call_id): if Callids().already_handled(call_id): return True @@ -340,17 +342,13 @@ def ListResources(api, creds, options, call_id): xrn = options.get('geni_slice_urn', '') (hrn, type) = urn_to_hrn(xrn) - # get hrn of the original caller - origin_hrn = options.get('origin_hrn', None) - if not origin_hrn: - if isinstance(creds, list): - origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn() - else: - origin_hrn = Credential(string=creds).get_gid_caller().get_hrn() - - # look in cache first + # get the rspec's return format from options + rspec_version = RSpecVersion(options.get('rspec_version', 'SFA 1')) + version_string = "rspec_%s_%s" % (rspec_version.format, rspec_version.version) + + # look in cache first if caching and api.cache and not xrn: - rspec = api.cache.get('nodes') + rspec = api.cache.get(version_string) if rspec: return rspec @@ -373,16 +371,67 @@ def ListResources(api, creds, options, call_id): my_opts = copy(options) my_opts['geni_compressed'] = False threads.run(server.ListResources, credential, my_opts, call_id) - #threads.run(server.get_resources, cred, xrn, origin_hrn) results = threads.get_results() - merged_rspec = merge_rspecs(results) + #results.append(open('/root/protogeni.rspec', 'r').read()) + rspec = SfaRSpec() + for result in results: + try: + tmp_rspec = parse_rspec(result) + if isinstance(tmp_rspec, SfaRSpec): + rspec.merge(result) + elif isinstance(tmp_rspec, PGRSpec): + rspec.merge(RSpecConverter.to_sfa_rspec(result)) + else: + api.logger.info("SM.ListResources: invalid aggregate rspec") + except: + api.logger.info("SM.ListResources: Failed to merge aggregate rspec") # cache the result if caching and api.cache and not xrn: - api.cache.add('nodes', merged_rspec) + api.cache.add(version_string, rspec.toxml()) - return merged_rspec + return rspec.toxml() + +# first draft at a merging SliverStatus +def SliverStatus(api, slice_xrn, creds, call_id): + if Callids().already_handled(call_id): return {} + # attempt to use delegated credential first + credential = api.getDelegatedCredential(creds) + if not credential: + credential = api.getCredential() + threads = ThreadManager() + for aggregate in api.aggregates: + server = api.aggregates[aggregate] + threads.run (server.SliverStatus, slice_xrn, credential, call_id) + results = threads.get_results() + + # get rid of any void result - e.g. when call_id was hit where by convention we return {} + results = [ result for result in results if result and result['geni_resources']] + + # do not try to combine if there's no result + if not results : return {} + + # otherwise let's merge stuff + overall = {} + + # mmh, it is expected that all results carry the same urn + overall['geni_urn'] = results[0]['geni_urn'] + + # consolidate geni_status - simple model using max on a total order + states = [ 'ready', 'configuring', 'failed', 'unknown' ] + # hash name to index + shash = dict ( zip ( states, range(len(states)) ) ) + def combine_status (x,y): + return shash [ max (shash(x),shash(y)) ] + overall['geni_status'] = reduce (combine_status, [ result['geni_status'] for result in results], 'ready' ) + + # {'ready':0,'configuring':1,'failed':2,'unknown':3} + # append all geni_resources + overall['geni_resources'] = \ + reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) + + return overall def main(): r = RSpec()