92f414cf89f0f5d40e4cc85b33f74fc06d2e020b
[sfa.git] / sfa / managers / slice_manager.py
1 # pylint: disable=c0111, c0103
2
3 import sys
4 import time
5 import traceback
6 from copy import copy
7
8 from sfa.trust.credential import Credential
9
10 from sfa.util.sfalogging import logger
11 from sfa.util.xrn import Xrn, urn_to_hrn
12 from sfa.util.version import version_core
13 from sfa.util.callids import Callids
14 from sfa.util.cache import Cache
15
16 from sfa.client.multiclient import MultiClient
17
18 from sfa.rspecs.version_manager import VersionManager
19 from sfa.rspecs.rspec import RSpec
20
21 from sfa.client.return_value import ReturnValue
22
23
24 class SliceManager:
25
26     # the cache instance is a class member so it survives across incoming
27     # requests
28     cache = None
29
30     def __init__(self, config):
31         self.cache = None
32         if config.SFA_SM_CACHING:
33             if SliceManager.cache is None:
34                 SliceManager.cache = Cache()
35             self.cache = SliceManager.cache
36
37     def GetVersion(self, api, options):
38         # peers explicitly in aggregates.xml
39         peers = {peername: interface.get_url()
40                  for (peername, interface) in api.aggregates.iteritems()
41                  if peername != api.hrn}
42         version_manager = VersionManager()
43         ad_rspec_versions = []
44         request_rspec_versions = []
45         cred_types = [{'geni_type': 'geni_sfa',
46                        'geni_version': str(i)} for i in range(4)[-2:]]
47         for rspec_version in version_manager.versions:
48             if rspec_version.content_type in ['*', 'ad']:
49                 ad_rspec_versions.append(rspec_version.to_dict())
50             if rspec_version.content_type in ['*', 'request']:
51                 request_rspec_versions.append(rspec_version.to_dict())
52         xrn = Xrn(api.hrn, 'authority+sm')
53         version_more = {
54             'interface': 'slicemgr',
55             'sfa': 2,
56             'geni_api': 3,
57             'geni_api_versions':
58                 {'3': 'https://%s:%s'
59                       % (api.config.SFA_SM_HOST, api.config.SFA_SM_PORT)},
60             'hrn': xrn.get_hrn(),
61             'urn': xrn.get_urn(),
62             'peers': peers,
63             # Accept operations that act on as subset of slivers in a given
64             # state.
65             'geni_single_allocation': 0,
66             # Multiple slivers can exist and be incrementally added, including
67             # those which connect or overlap in some way.
68             'geni_allocate': 'geni_many',
69             'geni_credential_types': cred_types,
70         }
71         sm_version = version_core(version_more)
72         # local aggregate if present needs to have localhost resolved
73         if api.hrn in api.aggregates:
74             local_am_url = api.aggregates[api.hrn].get_url()
75             sm_version['peers'][api.hrn] = local_am_url.replace(
76                 'localhost', sm_version['hostname'])
77         return sm_version
78
79     def drop_slicemgr_stats(self, rspec):
80         try:
81             stats_elements = rspec.xml.xpath('//statistics')
82             for node in stats_elements:
83                 node.getparent().remove(node)
84         except Exception as e:
85             logger.warning("drop_slicemgr_stats failed: %s " % (str(e)))
86
87     def add_slicemgr_stat(self, rspec, callname, aggname,
88                           elapsed, status, exc_info=None):
89         try:
90             stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
91             if stats_tags:
92                 stats_tag = stats_tags[0]
93             else:
94                 stats_tag = rspec.xml.root.add_element(
95                     "statistics", call=callname)
96
97             stat_tag = stats_tag.add_element(
98                 "aggregate", name=str(aggname),
99                 elapsed=str(elapsed), status=str(status))
100
101             if exc_info:
102                 exc_tag = stat_tag.add_element(
103                     "exc_info", name=str(exc_info[1]))
104
105                 # formats the traceback as a set of xml elements
106                 tb = traceback.extract_tb(exc_info[2])
107                 for item in tb:
108                     exc_frame = exc_tag.add_element(
109                         "tb_frame", filename=str(item[0]),
110                         line=str(item[1]), func=str(item[2]), code=str(item[3]))
111
112         except Exception as e:
113             logger.warning("add_slicemgr_stat failed on  %s: %s" %
114                            (aggname, str(e)))
115
116     def ListResources(self, api, creds, options):
117         call_id = options.get('call_id')
118         if Callids().already_handled(call_id):
119             return ""
120
121         version_manager = VersionManager()
122
123         def _ListResources(aggregate, server, credential, options):
124             forward_options = copy(options)
125             tStart = time.time()
126             try:
127                 version = api.get_cached_server_version(server)
128                 # force ProtoGENI aggregates to give us a v2 RSpec
129                 forward_options['geni_rspec_version'] = options.get(
130                     'geni_rspec_version')
131                 result = server.ListResources(credential, forward_options)
132                 return {"aggregate": aggregate, "result": result,
133                         "elapsed": time.time() - tStart, "status": "success"}
134             except Exception as e:
135                 api.logger.log_exc("ListResources failed at %s" % (server.url))
136                 return {"aggregate": aggregate, "elapsed": time.time() - tStart,
137                         "status": "exception", "exc_info": sys.exc_info()}
138
139         # get slice's hrn from options
140         xrn = options.get('geni_slice_urn', '')
141         (hrn, type) = urn_to_hrn(xrn)
142         if 'geni_compressed' in options:
143             del(options['geni_compressed'])
144
145         # get the rspec's return format from options
146         rspec_version = version_manager.get_version(
147             options.get('geni_rspec_version'))
148         version_string = "rspec_%s" % (rspec_version)
149
150         # look in cache first
151         cached_requested = options.get('cached', True)
152         if not xrn and self.cache and cached_requested:
153             rspec = self.cache.get(version_string)
154             if rspec:
155                 api.logger.debug(
156                     "SliceManager.ListResources returns cached advertisement")
157                 return rspec
158
159         # get the callers hrn
160         valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
161         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
162
163         # attempt to use delegated credential first
164         cred = api.getDelegatedCredential(creds)
165         if not cred:
166             cred = api.getCredential()
167         multiclient = MultiClient()
168         for aggregate in api.aggregates:
169             # prevent infinite loop. Dont send request back to caller
170             # unless the caller is the aggregate's SM
171             if caller_hrn == aggregate and aggregate != api.hrn:
172                 continue
173
174             # get the rspec from the aggregate
175             interface = api.aggregates[aggregate]
176             server = api.server_proxy(interface, cred)
177             multiclient.run(_ListResources, aggregate, server, [cred], options)
178
179         results = multiclient.get_results()
180         rspec_version = version_manager.get_version(
181             options.get('geni_rspec_version'))
182         if xrn:
183             result_version = version_manager._get_version(
184                 rspec_version.type, rspec_version.version, 'manifest')
185         else:
186             result_version = version_manager._get_version(
187                 rspec_version.type, rspec_version.version, 'ad')
188         rspec = RSpec(version=result_version)
189         for result in results:
190             self.add_slicemgr_stat(
191                 rspec, "ListResources", result["aggregate"], result["elapsed"],
192                 result["status"], result.get("exc_info", None))
193             if result["status"] == "success":
194                 res = result['result']['value']
195                 try:
196                     rspec.version.merge(ReturnValue.get_value(res))
197                 except Exception:
198                     api.logger.log_exc(
199                         "SM.ListResources: Failed to merge aggregate rspec")
200
201         # cache the result
202         if self.cache and not xrn:
203             api.logger.debug("SliceManager.ListResources caches advertisement")
204             self.cache.add(version_string, rspec.toxml())
205
206         return rspec.toxml()
207
208     def Allocate(self, api, xrn, creds, rspec_str, expiration, options):
209         call_id = options.get('call_id')
210         if Callids().already_handled(call_id):
211             return ""
212
213         version_manager = VersionManager()
214
215         def _Allocate(aggregate, server, xrn, credential, rspec, options):
216             tStart = time.time()
217             try:
218                 # Need to call GetVersion at an aggregate to determine the supported
219                 # rspec type/format beofre calling CreateSliver at an Aggregate.
220                 #server_version = api.get_cached_server_version(server)
221                 # if 'sfa' not in server_version and 'geni_api' in server_version:
222                     # sfa aggregtes support both sfa and pg rspecs, no need to convert
223                     # if aggregate supports sfa rspecs. otherwise convert to pg rspec
224                     #rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request'))
225                     #filter = {'component_manager_id': server_version['urn']}
226                     # rspec.filter(filter)
227                     #rspec = rspec.toxml()
228                 result = server.Allocate(xrn, credential, rspec, options)
229                 return {"aggregate": aggregate, "result": result,
230                         "elapsed": time.time() - tStart, "status": "success"}
231             except:
232                 logger.log_exc(
233                     'Something wrong in _Allocate with URL %s' % server.url)
234                 return {"aggregate": aggregate, "elapsed": time.time() - tStart,
235                         "status": "exception", "exc_info": sys.exc_info()}
236
237         # Validate the RSpec against PlanetLab's schema --disabled for now
238         # The schema used here needs to aggregate the PL and VINI schemas
239         # schema = "/var/www/html/schemas/pl.rng"
240         rspec = RSpec(rspec_str)
241     #    schema = None
242     #    if schema:
243     #        rspec.validate(schema)
244
245         # if there is a <statistics> section, the aggregates don't care about it,
246         # so delete it.
247         self.drop_slicemgr_stats(rspec)
248
249         # attempt to use delegated credential first
250         cred = api.getDelegatedCredential(creds)
251         if not cred:
252             cred = api.getCredential()
253
254         # get the callers hrn
255         hrn, type = urn_to_hrn(xrn)
256         valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
257         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
258         multiclient = MultiClient()
259         for aggregate in api.aggregates:
260             # prevent infinite loop. Dont send request back to caller
261             # unless the caller is the aggregate's SM
262             if caller_hrn == aggregate and aggregate != api.hrn:
263                 continue
264             interface = api.aggregates[aggregate]
265             server = api.server_proxy(interface, cred)
266             # Just send entire RSpec to each aggregate
267             multiclient.run(_Allocate, aggregate, server, xrn,
268                             [cred], rspec.toxml(), options)
269
270         results = multiclient.get_results()
271         manifest_version = version_manager._get_version(
272             rspec.version.type, rspec.version.version, 'manifest')
273         result_rspec = RSpec(version=manifest_version)
274         geni_urn = None
275         geni_slivers = []
276
277         for result in results:
278             self.add_slicemgr_stat(
279                 result_rspec, "Allocate", result["aggregate"], result["elapsed"],
280                 result["status"], result.get("exc_info", None))
281             if result["status"] == "success":
282                 try:
283                     res = result['result']['value']
284                     geni_urn = res['geni_urn']
285                     result_rspec.version.merge(
286                         ReturnValue.get_value(res['geni_rspec']))
287                     geni_slivers.extend(res['geni_slivers'])
288                 except Exception:
289                     api.logger.log_exc(
290                         "SM.Allocate: Failed to merge aggregate rspec")
291         return {
292             'geni_urn': geni_urn,
293             'geni_rspec': result_rspec.toxml(),
294             'geni_slivers': geni_slivers
295         }
296
297     def Provision(self, api, xrn, creds, options):
298         call_id = options.get('call_id')
299         if Callids().already_handled(call_id):
300             return ""
301
302         version_manager = VersionManager()
303
304         def _Provision(aggregate, server, xrn, credential, options):
305             tStart = time.time()
306             try:
307                 # Need to call GetVersion at an aggregate to determine the supported
308                 # rspec type/format before calling CreateSliver at an
309                 # Aggregate.
310                 server_version = api.get_cached_server_version(server)
311                 result = server.Provision(xrn, credential, options)
312                 return {"aggregate": aggregate, "result": result,
313                         "elapsed": time.time() - tStart, "status": "success"}
314             except Exception:
315                 logger.log_exc(
316                     'Something wrong in _Allocate with URL %s' % server.url)
317                 return {"aggregate": aggregate, "elapsed": time.time() - tStart,
318                         "status": "exception", "exc_info": sys.exc_info()}
319
320         # attempt to use delegated credential first
321         cred = api.getDelegatedCredential(creds)
322         if not cred:
323             cred = api.getCredential()
324
325         # get the callers hrn
326         valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0]
327         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
328         multiclient = MultiClient()
329         for aggregate in api.aggregates:
330             # prevent infinite loop. Dont send request back to caller
331             # unless the caller is the aggregate's SM
332             if caller_hrn == aggregate and aggregate != api.hrn:
333                 continue
334             interface = api.aggregates[aggregate]
335             server = api.server_proxy(interface, cred)
336             # Just send entire RSpec to each aggregate
337             multiclient.run(_Provision, aggregate,
338                             server, xrn, [cred], options)
339
340         results = multiclient.get_results()
341         manifest_version = version_manager._get_version(
342             'GENI', '3', 'manifest')
343         result_rspec = RSpec(version=manifest_version)
344         geni_slivers = []
345         geni_urn = None
346         for result in results:
347             self.add_slicemgr_stat(
348                 result_rspec, "Provision", result["aggregate"], result["elapsed"],
349                 result["status"], result.get("exc_info", None))
350             if result["status"] == "success":
351                 try:
352                     res = result['result']['value']
353                     geni_urn = res['geni_urn']
354                     result_rspec.version.merge(
355                         ReturnValue.get_value(res['geni_rspec']))
356                     geni_slivers.extend(res['geni_slivers'])
357                 except:
358                     api.logger.log_exc(
359                         "SM.Provision: Failed to merge aggregate rspec")
360         return {
361             'geni_urn': geni_urn,
362             'geni_rspec': result_rspec.toxml(),
363             'geni_slivers': geni_slivers
364         }
365
366     def Renew(self, api, xrn, creds, expiration_time, options):
367         call_id = options.get('call_id')
368         if Callids().already_handled(call_id):
369             return True
370
371         def _Renew(aggregate, server, xrn, creds, expiration_time, options):
372             try:
373                 result = server.Renew(xrn, creds, expiration_time, options)
374                 if type(result) != dict:
375                     result = {'code': {'geni_code': 0}, 'value': result}
376                 result['aggregate'] = aggregate
377                 return result
378             except:
379                 logger.log_exc(
380                     'Something wrong in _Renew with URL %s' % server.url)
381                 return {'aggregate': aggregate, 'exc_info': traceback.format_exc(),
382                         'code': {'geni_code': -1},
383                         'value': False, 'output': ""}
384
385         # get the callers hrn
386         valid_cred = api.auth.checkCredentials(creds, 'renewsliver', xrn)[0]
387         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
388
389         # attempt to use delegated credential first
390         cred = api.getDelegatedCredential(creds)
391         if not cred:
392             cred = api.getCredential(minimumExpiration=31 * 86400)
393         multiclient = MultiClient()
394         for aggregate in api.aggregates:
395             # prevent infinite loop. Dont send request back to caller
396             # unless the caller is the aggregate's SM
397             if caller_hrn == aggregate and aggregate != api.hrn:
398                 continue
399             interface = api.aggregates[aggregate]
400             server = api.server_proxy(interface, cred)
401             multiclient.run(_Renew, aggregate, server, xrn,
402                             [cred], expiration_time, options)
403
404         results = multiclient.get_results()
405
406         geni_code = 0
407         geni_output = ",".join([x.get('output', "") for x in results])
408         geni_value = reduce(lambda x, y: x and y,
409                             [result.get('value', False) for result in results],
410                             True)
411         for agg_result in results:
412             agg_geni_code = agg_result['code'].get('geni_code', 0)
413             if agg_geni_code:
414                 geni_code = agg_geni_code
415
416         results = {'aggregates': results, 'code': {
417             'geni_code': geni_code}, 'value': geni_value, 'output': geni_output}
418
419         return results
420
421     def Delete(self, api, xrn, creds, options):
422         call_id = options.get('call_id')
423         if Callids().already_handled(call_id):
424             return ""
425
426         def _Delete(server, xrn, creds, options):
427             return server.Delete(xrn, creds, options)
428
429         (hrn, type) = urn_to_hrn(xrn[0])
430         # get the callers hrn
431         valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
432         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
433
434         # attempt to use delegated credential first
435         cred = api.getDelegatedCredential(creds)
436         if not cred:
437             cred = api.getCredential()
438         multiclient = MultiClient()
439         for aggregate in api.aggregates:
440             # prevent infinite loop. Dont send request back to caller
441             # unless the caller is the aggregate's SM
442             if caller_hrn == aggregate and aggregate != api.hrn:
443                 continue
444             interface = api.aggregates[aggregate]
445             server = api.server_proxy(interface, cred)
446             multiclient.run(_Delete, server, xrn, [cred], options)
447
448         results = []
449         for result in multiclient.get_results():
450             results += ReturnValue.get_value(result)
451         return results
452
453     # first draft at a merging SliverStatus
454     def Status(self, api, slice_xrn, creds, options):
455         def _Status(server, xrn, creds, options):
456             return server.Status(xrn, creds, options)
457
458         call_id = options.get('call_id')
459         if Callids().already_handled(call_id):
460             return {}
461         # attempt to use delegated credential first
462         cred = api.getDelegatedCredential(creds)
463         if not cred:
464             cred = api.getCredential()
465         multiclient = MultiClient()
466         for aggregate in api.aggregates:
467             interface = api.aggregates[aggregate]
468             server = api.server_proxy(interface, cred)
469             multiclient.run(_Status, server, slice_xrn, [cred], options)
470         results = [ReturnValue.get_value(result)
471                    for result in multiclient.get_results()]
472
473         # get rid of any void result - e.g. when call_id was hit, where by
474         # convention we return {}
475         results = [
476             result for result in results if result and result['geni_slivers']]
477
478         # do not try to combine if there's no result
479         if not results:
480             return {}
481
482         # otherwise let's merge stuff
483         geni_slivers = []
484         geni_urn = None
485         for result in results:
486             try:
487                 geni_urn = result['geni_urn']
488                 geni_slivers.extend(result['geni_slivers'])
489             except Exception:
490                 api.logger.log_exc(
491                     "SM.Provision: Failed to merge aggregate rspec")
492         return {
493             'geni_urn': geni_urn,
494             'geni_slivers': geni_slivers
495         }
496
497     def Describe(self, api, creds, xrns, options):
498         def _Describe(server, xrn, creds, options):
499             return server.Describe(xrn, creds, options)
500
501         call_id = options.get('call_id')
502         if Callids().already_handled(call_id):
503             return {}
504         # attempt to use delegated credential first
505         cred = api.getDelegatedCredential(creds)
506         if not cred:
507             cred = api.getCredential()
508         multiclient = MultiClient()
509         for aggregate in api.aggregates:
510             interface = api.aggregates[aggregate]
511             server = api.server_proxy(interface, cred)
512             multiclient.run(_Describe, server, xrns, [cred], options)
513         results = [ReturnValue.get_value(result)
514                    for result in multiclient.get_results()]
515
516         # get rid of any void result - e.g. when call_id was hit, where by
517         # convention we return {}
518         results = [
519             result for result in results if result and result.get('geni_urn')]
520
521         # do not try to combine if there's no result
522         if not results:
523             return {}
524
525         # otherwise let's merge stuff
526         version_manager = VersionManager()
527         manifest_version = version_manager._get_version(
528             'GENI', '3', 'manifest')
529         result_rspec = RSpec(version=manifest_version)
530         geni_slivers = []
531         geni_urn = None
532         for result in results:
533             try:
534                 geni_urn = result['geni_urn']
535                 result_rspec.version.merge(
536                     ReturnValue.get_value(result['geni_rspec']))
537                 geni_slivers.extend(result['geni_slivers'])
538             except Exception:
539                 api.logger.log_exc(
540                     "SM.Provision: Failed to merge aggregate rspec")
541         return {
542             'geni_urn': geni_urn,
543             'geni_rspec': result_rspec.toxml(),
544             'geni_slivers': geni_slivers
545         }
546
547     def PerformOperationalAction(self, api, xrn, creds, action, options):
548         # get the callers hrn
549         valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0]
550         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
551
552         # attempt to use delegated credential first
553         cred = api.getDelegatedCredential(creds)
554         if not cred:
555             cred = api.getCredential()
556         multiclient = MultiClient()
557         for aggregate in api.aggregates:
558             # prevent infinite loop. Dont send request back to caller
559             # unless the caller is the aggregate's SM
560             if caller_hrn == aggregate and aggregate != api.hrn:
561                 continue
562             interface = api.aggregates[aggregate]
563             server = api.server_proxy(interface, cred)
564             multiclient.run(server.PerformOperationalAction,
565                             xrn, [cred], action, options)
566         multiclient.get_results()
567         return 1
568
569     def Shutdown(self, api, xrn, creds, options=None):
570         if options is None:
571             options = {}
572         xrn = Xrn(xrn)
573         # get the callers hrn
574         valid_cred = api.auth.checkCredentials(creds, 'stopslice', xrn.hrn)[0]
575         caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn()
576
577         # attempt to use delegated credential first
578         cred = api.getDelegatedCredential(creds)
579         if not cred:
580             cred = api.getCredential()
581         multiclient = MultiClient()
582         for aggregate in api.aggregates:
583             # prevent infinite loop. Dont send request back to caller
584             # unless the caller is the aggregate's SM
585             if caller_hrn == aggregate and aggregate != api.hrn:
586                 continue
587             interface = api.aggregates[aggregate]
588             server = api.server_proxy(interface, cred)
589             multiclient.run(server.Shutdown, xrn.urn, cred)
590         multiclient.get_results()
591         return 1