from sfa.util.specdict import *
from sfa.util.faults import *
from sfa.util.record import SfaRecord
+from sfa.rspecs.pg_rspec import PGRSpec
+from sfa.rspecs.sfa_rspec import SfaRSpec
+from sfa.rspecs.rspec_converter import RSpecConverter
+from sfa.rspecs.rspec_parser import parse_rspec
+from sfa.rspecs.rspec_version import RSpecVersion
+from sfa.rspecs.sfa_rspec import sfa_rspec_version
+from sfa.rspecs.pg_rspec import pg_rspec_version
from sfa.util.policy import Policy
from sfa.util.prefixTree import prefixTree
from sfa.util.sfaticket import *
from sfa.util.version import version_core
from sfa.util.callids import Callids
-# XX FIX ME: should merge result from multiple aggregates instead of
-# calling aggregate implementation
-from sfa.managers.aggregate_manager_pl import slice_status
-
# we have specialized xmlrpclib.ServerProxy to remember the input url
# OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
def get_serverproxy_url (server):
peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
if peername != api.hrn])
xrn=Xrn (api.hrn)
- sm_version=version_core({'interface':'slicemgr',
- 'hrn' : xrn.get_hrn(),
- 'urn' : xrn.get_urn(),
- 'peers': peers,
- })
+ supported_rspecs = [dict(pg_rspec_version), dict(sfa_rspec_version)]
+ version_more = {'interface':'slicemgr',
+ 'hrn' : xrn.get_hrn(),
+ 'urn' : xrn.get_urn(),
+ 'peers': peers,
+ 'request_rspec_versions': supported_rspecs,
+ 'ad_rspec_versions': supported_rspecs,
+ 'default_ad_rspec': dict(sfa_rspec_version)
+ }
+ sm_version=version_core(version_more)
# local aggregate if present needs to have localhost resolved
if api.hrn in api.aggregates:
local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
return sm_version
-def CreateSliver(api, xrn, creds, rspec, users, call_id):
+def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
- if Callids().already_handled(call_id): return ""
+ def _CreateSliver(aggregate, xrn, credential, rspec, users, call_id):
+ # Need to call GetVersion at an aggregate to determine the supported
+ # rspec type/format beofre calling CreateSliver at an Aggregate.
+ # The Aggregate's verion info is cached
+ server = api.aggregates[aggregate]
+ # get cached aggregate version
+ aggregate_version_key = 'version_'+ aggregate
+ aggregate_version = api.cache.get(aggregate_version_key)
+ if not aggregate_version:
+ # get current aggregate version anc cache it for 24 hours
+ aggregate_version = server.GetVersion()
+ api.cache.add(aggregate_version_key, aggregate_version, 60 * 60 * 24)
+
+ if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version:
+ # sfa aggregtes support both sfa and pg rspecs, no need to convert
+ # if aggregate supports sfa rspecs. othewise convert to pg rspec
+ rspec = RSpecConverter.to_pg_rspec(rspec)
+
+ return server.CreateSliver(xrn, credential, rspec, users, call_id)
+
- hrn, type = urn_to_hrn(xrn)
+ if Callids().already_handled(call_id): return ""
# Validate the RSpec against PlanetLab's schema --disabled for now
# The schema used here needs to aggregate the PL and VINI schemas
# schema = "/var/www/html/schemas/pl.rng"
+ rspec = parse_rspec(rspec_str)
schema = None
if schema:
- try:
- tree = etree.parse(StringIO(rspec))
- except etree.XMLSyntaxError:
- message = str(sys.exc_info()[1])
- raise InvalidRSpec(message)
-
- relaxng_doc = etree.parse(schema)
- relaxng = etree.RelaxNG(relaxng_doc)
-
- if not relaxng(tree):
- error = relaxng.error_log.last_error
- message = "%s (line %s)" % (error.message, error.line)
- raise InvalidRSpec(message)
-
- # get the callers hrn
- valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
- caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
+ rspec.validate(schema)
# attempt to use delegated credential first
credential = api.getDelegatedCredential(creds)
- if not credential:
+ if not credential:
credential = api.getCredential()
+
+ # get the callers hrn
+ hrn, type = urn_to_hrn(xrn)
+ valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
+ caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
threads = ThreadManager()
for aggregate in api.aggregates:
# prevent infinite loop. Dont send request back to caller
continue
# Just send entire RSpec to each aggregate
- server = api.aggregates[aggregate]
- threads.run(server.CreateSliver, xrn, credential, rspec, users, call_id)
+ threads.run(_CreateSliver, aggregate, xrn, credential, rspec.toxml(), users, call_id)
- results = threads.get_results()
- merged_rspec = merge_rspecs(results)
- return merged_rspec
+ results = threads.get_results()
+ rspec = SfaRSpec()
+ for result in results:
+ rspec.merge(result)
+ return rspec.toxml()
def RenewSliver(api, xrn, creds, expiration_time, call_id):
if Callids().already_handled(call_id): return True
xrn = options.get('geni_slice_urn', '')
(hrn, type) = urn_to_hrn(xrn)
- # get hrn of the original caller
- origin_hrn = options.get('origin_hrn', None)
- if not origin_hrn:
- if isinstance(creds, list):
- origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
- else:
- origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
-
- # look in cache first
+ # get the rspec's return format from options
+ rspec_version = RSpecVersion(options.get('rspec_version'))
+ version_string = "rspec_%s" % (rspec_version.get_version_name())
+
+ # look in cache first
if caching and api.cache and not xrn:
- rspec = api.cache.get('nodes')
+ rspec = api.cache.get(version_string)
if rspec:
return rspec
my_opts = copy(options)
my_opts['geni_compressed'] = False
threads.run(server.ListResources, credential, my_opts, call_id)
- #threads.run(server.get_resources, cred, xrn, origin_hrn)
results = threads.get_results()
- merged_rspec = merge_rspecs(results)
+ #results.append(open('/root/protogeni.rspec', 'r').read())
+ rspec_version = RSpecVersion(my_opts.get('rspec_version'))
+ if rspec_version['type'].lower() == 'protogeni':
+ rspec = PGRSpec()
+ else:
+ rspec = SfaRSpec()
+
+ for result in results:
+ print "RESULT"
+ try:
+ rspec.merge(result)
+ except:
+ raise
+ api.logger.info("SM.ListResources: Failed to merge aggregate rspec")
# cache the result
if caching and api.cache and not xrn:
- api.cache.add('nodes', merged_rspec)
+ api.cache.add(version_string, rspec.toxml())
- return merged_rspec
+ return rspec.toxml()
+
+# first draft at a merging SliverStatus
+def SliverStatus(api, slice_xrn, creds, call_id):
+ if Callids().already_handled(call_id): return {}
+ # attempt to use delegated credential first
+ credential = api.getDelegatedCredential(creds)
+ if not credential:
+ credential = api.getCredential()
+ threads = ThreadManager()
+ for aggregate in api.aggregates:
+ server = api.aggregates[aggregate]
+ threads.run (server.SliverStatus, slice_xrn, credential, call_id)
+ results = threads.get_results()
+
+ # get rid of any void result - e.g. when call_id was hit where by convention we return {}
+ results = [ result for result in results if result and result['geni_resources']]
+
+ # do not try to combine if there's no result
+ if not results : return {}
+
+ # otherwise let's merge stuff
+ overall = {}
+
+ # mmh, it is expected that all results carry the same urn
+ overall['geni_urn'] = results[0]['geni_urn']
+
+ # consolidate geni_status - simple model using max on a total order
+ states = [ 'ready', 'configuring', 'failed', 'unknown' ]
+ # hash name to index
+ shash = dict ( zip ( states, range(len(states)) ) )
+ def combine_status (x,y):
+ return shash [ max (shash(x),shash(y)) ]
+ overall['geni_status'] = reduce (combine_status, [ result['geni_status'] for result in results], 'ready' )
+
+ # {'ready':0,'configuring':1,'failed':2,'unknown':3}
+ # append all geni_resources
+ overall['geni_resources'] = \
+ reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
+
+ return overall
def main():
r = RSpec()