1 # pylint: disable=c0111, c0103
8 from sfa.trust.credential import Credential
10 from sfa.util.sfalogging import logger
11 from sfa.util.xrn import Xrn, urn_to_hrn
12 from sfa.util.version import version_core
13 from sfa.util.callids import Callids
14 from sfa.util.cache import Cache
16 from sfa.client.multiclient import MultiClient
18 from sfa.rspecs.version_manager import VersionManager
19 from sfa.rspecs.rspec import RSpec
21 from sfa.client.return_value import ReturnValue
26 # the cache instance is a class member so it survives across incoming
30 def __init__(self, config):
32 if config.SFA_SM_CACHING:
33 if SliceManager.cache is None:
34 SliceManager.cache = Cache()
35 self.cache = SliceManager.cache
37 def GetVersion(self, api, options):
38 # peers explicitly in aggregates.xml
39 peers = {peername: interface.get_url()
40 for (peername, interface) in api.aggregates.iteritems()
41 if peername != api.hrn}
42 version_manager = VersionManager()
43 ad_rspec_versions = []
44 request_rspec_versions = []
45 cred_types = [{'geni_type': 'geni_sfa',
46 'geni_version': str(i)} for i in range(4)[-2:]]
47 for rspec_version in version_manager.versions:
48 if rspec_version.content_type in ['*', 'ad']:
49 ad_rspec_versions.append(rspec_version.to_dict())
50 if rspec_version.content_type in ['*', 'request']:
51 request_rspec_versions.append(rspec_version.to_dict())
52 xrn = Xrn(api.hrn, 'authority+sm')
54 'interface': 'slicemgr',
59 % (api.config.SFA_SM_HOST, api.config.SFA_SM_PORT)},
63 # Accept operations that act on as subset of slivers in a given
65 'geni_single_allocation': 0,
66 # Multiple slivers can exist and be incrementally added, including
67 # those which connect or overlap in some way.
68 'geni_allocate': 'geni_many',
69 'geni_credential_types': cred_types,
71 sm_version = version_core(version_more)
72 # local aggregate if present needs to have localhost resolved
73 if api.hrn in api.aggregates:
74 local_am_url = api.aggregates[api.hrn].get_url()
75 sm_version['peers'][api.hrn] = local_am_url.replace(
76 'localhost', sm_version['hostname'])
79 def drop_slicemgr_stats(self, rspec):
81 stats_elements = rspec.xml.xpath('//statistics')
82 for node in stats_elements:
83 node.getparent().remove(node)
84 except Exception as e:
85 logger.warning("drop_slicemgr_stats failed: %s " % (str(e)))
87 def add_slicemgr_stat(self, rspec, callname, aggname,
88 elapsed, status, exc_info=None):
90 stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
92 stats_tag = stats_tags[0]
94 stats_tag = rspec.xml.root.add_element(
95 "statistics", call=callname)
97 stat_tag = stats_tag.add_element(
98 "aggregate", name=str(aggname),
99 elapsed=str(elapsed), status=str(status))
102 exc_tag = stat_tag.add_element(
103 "exc_info", name=str(exc_info[1]))
105 # formats the traceback as a set of xml elements
106 tb = traceback.extract_tb(exc_info[2])
108 exc_frame = exc_tag.add_element(
109 "tb_frame", filename=str(item[0]),
110 line=str(item[1]), func=str(item[2]), code=str(item[3]))
112 except Exception as e:
113 logger.warning("add_slicemgr_stat failed on %s: %s" %
116 def ListResources(self, api, creds, options):
117 call_id = options.get('call_id')
118 if Callids().already_handled(call_id):
121 version_manager = VersionManager()
123 def _ListResources(aggregate, server, credential, options):
124 forward_options = copy(options)
127 version = api.get_cached_server_version(server)
128 # force ProtoGENI aggregates to give us a v2 RSpec
129 forward_options['geni_rspec_version'] = options.get(
130 'geni_rspec_version')
131 result = server.ListResources(credential, forward_options)
132 return {"aggregate": aggregate, "result": result,
133 "elapsed": time.time() - tStart, "status": "success"}
134 except Exception as e:
135 logger.log_exc("ListResources failed at %s" % (server.url))
136 return {"aggregate": aggregate, "elapsed": time.time() - tStart,
137 "status": "exception", "exc_info": sys.exc_info()}
139 # get slice's hrn from options
140 xrn = options.get('geni_slice_urn', '')
141 (hrn, type) = urn_to_hrn(xrn)
142 if 'geni_compressed' in options:
143 del(options['geni_compressed'])
145 # get the rspec's return format from options
146 rspec_version = version_manager.get_version(
147 options.get('geni_rspec_version'))
148 version_string = "rspec_%s" % (rspec_version)
150 # look in cache first
151 cached_requested = options.get('cached', True)
152 if not xrn and self.cache and cached_requested:
153 rspec = self.cache.get(version_string)
156 "SliceManager.ListResources returns cached advertisement")
159 # get the callers hrn
160 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
161 caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
163 # attempt to use delegated credential first
164 cred = api.getDelegatedCredential(creds)
166 cred = api.getCredential()
167 multiclient = MultiClient()
168 for aggregate in api.aggregates:
169 # prevent infinite loop. Dont send request back to caller
170 # unless the caller is the aggregate's SM
171 if caller_hrn == aggregate and aggregate != api.hrn:
174 # get the rspec from the aggregate
175 interface = api.aggregates[aggregate]
176 server = api.server_proxy(interface, cred)
177 multiclient.run(_ListResources, aggregate, server, [cred], options)
179 results = multiclient.get_results()
180 rspec_version = version_manager.get_version(
181 options.get('geni_rspec_version'))
183 result_version = version_manager._get_version(
184 rspec_version.type, rspec_version.version, 'manifest')
186 result_version = version_manager._get_version(
187 rspec_version.type, rspec_version.version, 'ad')
188 rspec = RSpec(version=result_version)
189 for result in results:
190 self.add_slicemgr_stat(
191 rspec, "ListResources", result["aggregate"], result["elapsed"],
192 result["status"], result.get("exc_info", None))
193 if result["status"] == "success":
194 res = result['result']['value']
196 rspec.version.merge(ReturnValue.get_value(res))
199 "SM.ListResources: Failed to merge aggregate rspec")
202 if self.cache and not xrn:
203 logger.debug("SliceManager.ListResources caches advertisement")
204 self.cache.add(version_string, rspec.toxml())
208 def Allocate(self, api, xrn, creds, rspec_str, expiration, options):
209 call_id = options.get('call_id')
210 if Callids().already_handled(call_id):
213 version_manager = VersionManager()
215 def _Allocate(aggregate, server, xrn, credential, rspec, options):
218 # Need to call GetVersion at an aggregate to determine the supported
219 # rspec type/format beofre calling CreateSliver at an Aggregate.
220 #server_version = api.get_cached_server_version(server)
221 # if 'sfa' not in server_version and 'geni_api' in server_version:
222 # sfa aggregtes support both sfa and pg rspecs, no need to convert
223 # if aggregate supports sfa rspecs. otherwise convert to pg rspec
224 #rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request'))
225 #filter = {'component_manager_id': server_version['urn']}
226 # rspec.filter(filter)
227 #rspec = rspec.toxml()
228 result = server.Allocate(xrn, credential, rspec, options)
229 return {"aggregate": aggregate, "result": result,
230 "elapsed": time.time() - tStart, "status": "success"}
233 'Something wrong in _Allocate with URL %s' % server.url)
234 return {"aggregate": aggregate, "elapsed": time.time() - tStart,
235 "status": "exception", "exc_info": sys.exc_info()}
237 # Validate the RSpec against PlanetLab's schema --disabled for now
238 # The schema used here needs to aggregate the PL and VINI schemas
239 # schema = "/var/www/html/schemas/pl.rng"
240 rspec = RSpec(rspec_str)
243 # rspec.validate(schema)
245 # if there is a <statistics> section, the aggregates don't care about it,
247 self.drop_slicemgr_stats(rspec)
249 # attempt to use delegated credential first
250 cred = api.getDelegatedCredential(creds)
252 cred = api.getCredential()
254 # get the callers hrn
255 hrn, type = urn_to_hrn(xrn)
256 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
257 caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
258 multiclient = MultiClient()
259 for aggregate in api.aggregates:
260 # prevent infinite loop. Dont send request back to caller
261 # unless the caller is the aggregate's SM
262 if caller_hrn == aggregate and aggregate != api.hrn:
264 interface = api.aggregates[aggregate]
265 server = api.server_proxy(interface, cred)
266 # Just send entire RSpec to each aggregate
267 multiclient.run(_Allocate, aggregate, server, xrn,
268 [cred], rspec.toxml(), options)
270 results = multiclient.get_results()
271 manifest_version = version_manager._get_version(
272 rspec.version.type, rspec.version.version, 'manifest')
273 result_rspec = RSpec(version=manifest_version)
277 for result in results:
278 self.add_slicemgr_stat(
279 result_rspec, "Allocate", result["aggregate"], result["elapsed"],
280 result["status"], result.get("exc_info", None))
281 if result["status"] == "success":
283 res = result['result']['value']
284 geni_urn = res['geni_urn']
285 result_rspec.version.merge(
286 ReturnValue.get_value(res['geni_rspec']))
287 geni_slivers.extend(res['geni_slivers'])
290 "SM.Allocate: Failed to merge aggregate rspec")
292 'geni_urn': geni_urn,
293 'geni_rspec': result_rspec.toxml(),
294 'geni_slivers': geni_slivers
297 def Provision(self, api, xrn, creds, options):
298 call_id = options.get('call_id')
299 if Callids().already_handled(call_id):
302 version_manager = VersionManager()
304 def _Provision(aggregate, server, xrn, credential, options):
307 # Need to call GetVersion at an aggregate to determine the supported
308 # rspec type/format before calling CreateSliver at an
310 server_version = api.get_cached_server_version(server)
311 result = server.Provision(xrn, credential, options)
312 return {"aggregate": aggregate, "result": result,
313 "elapsed": time.time() - tStart, "status": "success"}
316 'Something wrong in _Allocate with URL %s' % server.url)
317 return {"aggregate": aggregate, "elapsed": time.time() - tStart,
318 "status": "exception", "exc_info": sys.exc_info()}
320 # attempt to use delegated credential first
321 cred = api.getDelegatedCredential(creds)
323 cred = api.getCredential()
325 # get the callers hrn
326 valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0]
327 caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
328 multiclient = MultiClient()
329 for aggregate in api.aggregates:
330 # prevent infinite loop. Dont send request back to caller
331 # unless the caller is the aggregate's SM
332 if caller_hrn == aggregate and aggregate != api.hrn:
334 interface = api.aggregates[aggregate]
335 server = api.server_proxy(interface, cred)
336 # Just send entire RSpec to each aggregate
337 multiclient.run(_Provision, aggregate,
338 server, xrn, [cred], options)
340 results = multiclient.get_results()
341 manifest_version = version_manager._get_version(
342 'GENI', '3', 'manifest')
343 result_rspec = RSpec(version=manifest_version)
346 for result in results:
347 self.add_slicemgr_stat(
348 result_rspec, "Provision", result["aggregate"], result["elapsed"],
349 result["status"], result.get("exc_info", None))
350 if result["status"] == "success":
352 res = result['result']['value']
353 geni_urn = res['geni_urn']
354 result_rspec.version.merge(
355 ReturnValue.get_value(res['geni_rspec']))
356 geni_slivers.extend(res['geni_slivers'])
359 "SM.Provision: Failed to merge aggregate rspec")
361 'geni_urn': geni_urn,
362 'geni_rspec': result_rspec.toxml(),
363 'geni_slivers': geni_slivers
366 def Renew(self, api, xrn, creds, expiration_time, options):
367 call_id = options.get('call_id')
368 if Callids().already_handled(call_id):
371 def _Renew(aggregate, server, xrn, creds, expiration_time, options):
373 result = server.Renew(xrn, creds, expiration_time, options)
374 if type(result) != dict:
375 result = {'code': {'geni_code': 0}, 'value': result}
376 result['aggregate'] = aggregate
380 'Something wrong in _Renew with URL %s' % server.url)
381 return {'aggregate': aggregate, 'exc_info': traceback.format_exc(),
382 'code': {'geni_code': -1},
383 'value': False, 'output': ""}
385 # get the callers hrn
386 valid_cred = api.auth.checkCredentials(creds, 'renewsliver', xrn)[0]
387 caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
389 # attempt to use delegated credential first
390 cred = api.getDelegatedCredential(creds)
392 cred = api.getCredential(minimumExpiration=31 * 86400)
393 multiclient = MultiClient()
394 for aggregate in api.aggregates:
395 # prevent infinite loop. Dont send request back to caller
396 # unless the caller is the aggregate's SM
397 if caller_hrn == aggregate and aggregate != api.hrn:
399 interface = api.aggregates[aggregate]
400 server = api.server_proxy(interface, cred)
401 multiclient.run(_Renew, aggregate, server, xrn,
402 [cred], expiration_time, options)
404 results = multiclient.get_results()
407 geni_output = ",".join([x.get('output', "") for x in results])
408 geni_value = reduce(lambda x, y: x and y,
409 [result.get('value', False) for result in results],
411 for agg_result in results:
412 agg_geni_code = agg_result['code'].get('geni_code', 0)
414 geni_code = agg_geni_code
416 results = {'aggregates': results, 'code': {
417 'geni_code': geni_code}, 'value': geni_value, 'output': geni_output}
421 def Delete(self, api, xrn, creds, options):
422 call_id = options.get('call_id')
423 if Callids().already_handled(call_id):
426 def _Delete(server, xrn, creds, options):
427 return server.Delete(xrn, creds, options)
429 (hrn, type) = urn_to_hrn(xrn[0])
430 # get the callers hrn
431 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
432 caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
434 # attempt to use delegated credential first
435 cred = api.getDelegatedCredential(creds)
437 cred = api.getCredential()
438 multiclient = MultiClient()
439 for aggregate in api.aggregates:
440 # prevent infinite loop. Dont send request back to caller
441 # unless the caller is the aggregate's SM
442 if caller_hrn == aggregate and aggregate != api.hrn:
444 interface = api.aggregates[aggregate]
445 server = api.server_proxy(interface, cred)
446 multiclient.run(_Delete, server, xrn, [cred], options)
449 for result in multiclient.get_results():
450 results += ReturnValue.get_value(result)
453 # first draft at a merging SliverStatus
454 def Status(self, api, slice_xrn, creds, options):
455 def _Status(server, xrn, creds, options):
456 return server.Status(xrn, creds, options)
458 call_id = options.get('call_id')
459 if Callids().already_handled(call_id):
461 # attempt to use delegated credential first
462 cred = api.getDelegatedCredential(creds)
464 cred = api.getCredential()
465 multiclient = MultiClient()
466 for aggregate in api.aggregates:
467 interface = api.aggregates[aggregate]
468 server = api.server_proxy(interface, cred)
469 multiclient.run(_Status, server, slice_xrn, [cred], options)
470 results = [ReturnValue.get_value(result)
471 for result in multiclient.get_results()]
473 # get rid of any void result - e.g. when call_id was hit, where by
474 # convention we return {}
476 result for result in results if result and result['geni_slivers']]
478 # do not try to combine if there's no result
482 # otherwise let's merge stuff
485 for result in results:
487 geni_urn = result['geni_urn']
488 geni_slivers.extend(result['geni_slivers'])
491 "SM.Provision: Failed to merge aggregate rspec")
493 'geni_urn': geni_urn,
494 'geni_slivers': geni_slivers
497 def Describe(self, api, creds, xrns, options):
498 def _Describe(server, xrn, creds, options):
499 return server.Describe(xrn, creds, options)
501 call_id = options.get('call_id')
502 if Callids().already_handled(call_id):
504 # attempt to use delegated credential first
505 cred = api.getDelegatedCredential(creds)
507 cred = api.getCredential()
508 multiclient = MultiClient()
509 for aggregate in api.aggregates:
510 interface = api.aggregates[aggregate]
511 server = api.server_proxy(interface, cred)
512 multiclient.run(_Describe, server, xrns, [cred], options)
513 results = [ReturnValue.get_value(result)
514 for result in multiclient.get_results()]
516 # get rid of any void result - e.g. when call_id was hit, where by
517 # convention we return {}
519 result for result in results if result and result.get('geni_urn')]
521 # do not try to combine if there's no result
525 # otherwise let's merge stuff
526 version_manager = VersionManager()
527 manifest_version = version_manager._get_version(
528 'GENI', '3', 'manifest')
529 result_rspec = RSpec(version=manifest_version)
532 for result in results:
534 geni_urn = result['geni_urn']
535 result_rspec.version.merge(
536 ReturnValue.get_value(result['geni_rspec']))
537 geni_slivers.extend(result['geni_slivers'])
540 "SM.Provision: Failed to merge aggregate rspec")
542 'geni_urn': geni_urn,
543 'geni_rspec': result_rspec.toxml(),
544 'geni_slivers': geni_slivers
547 def PerformOperationalAction(self, api, xrn, creds, action, options):
548 # get the callers hrn
549 valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0]
550 caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
552 # attempt to use delegated credential first
553 cred = api.getDelegatedCredential(creds)
555 cred = api.getCredential()
556 multiclient = MultiClient()
557 for aggregate in api.aggregates:
558 # prevent infinite loop. Dont send request back to caller
559 # unless the caller is the aggregate's SM
560 if caller_hrn == aggregate and aggregate != api.hrn:
562 interface = api.aggregates[aggregate]
563 server = api.server_proxy(interface, cred)
564 multiclient.run(server.PerformOperationalAction,
565 xrn, [cred], action, options)
566 multiclient.get_results()
569 def Shutdown(self, api, xrn, creds, options=None):
573 # get the callers hrn
574 valid_cred = api.auth.checkCredentials(creds, 'stopslice', xrn.hrn)[0]
575 caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
577 # attempt to use delegated credential first
578 cred = api.getDelegatedCredential(creds)
580 cred = api.getCredential()
581 multiclient = MultiClient()
582 for aggregate in api.aggregates:
583 # prevent infinite loop. Dont send request back to caller
584 # unless the caller is the aggregate's SM
585 if caller_hrn == aggregate and aggregate != api.hrn:
587 interface = api.aggregates[aggregate]
588 server = api.server_proxy(interface, cred)
589 multiclient.run(server.Shutdown, xrn.urn, cred)
590 multiclient.get_results()