add 'geni_api_versions' field to GetVersion output
[sfa.git] / sfa / managers / slice_manager.py
1 import sys
2 import time
3 import traceback
4 from StringIO import StringIO
5 from copy import copy
6 from lxml import etree
7
8 from sfa.trust.sfaticket import SfaTicket
9 from sfa.trust.credential import Credential
10
11 from sfa.util.sfalogging import logger
12 from sfa.util.xrn import Xrn, urn_to_hrn
13 from sfa.util.version import version_core
14 from sfa.util.callids import Callids
15 from sfa.util.cache import Cache
16
17 from sfa.server.threadmanager import ThreadManager
18
19 from sfa.rspecs.rspec_converter import RSpecConverter
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec 
22
23 from sfa.client.client_helper import sfa_to_pg_users_arg
24 from sfa.client.return_value import ReturnValue
25
26 class SliceManager:
27
28     # the cache instance is a class member so it survives across incoming requests
29     cache = None
30
31     def __init__ (self, config):
32         self.cache=None
33         if config.SFA_SM_CACHING:
34             if SliceManager.cache is None:
35                 SliceManager.cache = Cache()
36             self.cache = SliceManager.cache
37         
38     def GetVersion(self, api, options):
39         # peers explicitly in aggregates.xml
40         peers =dict ([ (peername,interface.get_url()) for (peername,interface) in api.aggregates.iteritems()
41                        if peername != api.hrn])
42         version_manager = VersionManager()
43         ad_rspec_versions = []
44         request_rspec_versions = []
45         for rspec_version in version_manager.versions:
46             if rspec_version.content_type in ['*', 'ad']:
47                 ad_rspec_versions.append(rspec_version.to_dict())
48             if rspec_version.content_type in ['*', 'request']:
49                 request_rspec_versions.append(rspec_version.to_dict())
50         xrn=Xrn(api.hrn, 'authority+sa')
51         version_more = {
52             'interface':'slicemgr',
53             'sfa': 2,
54             'geni_api': 2,
55             'geni_api_versions': {'2': '%s:%s' % (api.config.SFA_SM_HOST, api.config.SFA_SM_PORT)},
56             'hrn' : xrn.get_hrn(),
57             'urn' : xrn.get_urn(),
58             'peers': peers,
59             'geni_request_rspec_versions': request_rspec_versions,
60             'geni_ad_rspec_versions': ad_rspec_versions,
61             }
62         sm_version=version_core(version_more)
63         # local aggregate if present needs to have localhost resolved
64         if api.hrn in api.aggregates:
65             local_am_url=api.aggregates[api.hrn].get_url()
66             sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
67         return sm_version
68     
69     def drop_slicemgr_stats(self, rspec):
70         try:
71             stats_elements = rspec.xml.xpath('//statistics')
72             for node in stats_elements:
73                 node.getparent().remove(node)
74         except Exception, e:
75             logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
76     
77     def add_slicemgr_stat(self, rspec, callname, aggname, elapsed, status, exc_info=None):
78         try:
79             stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
80             if stats_tags:
81                 stats_tag = stats_tags[0]
82             else:
83                 stats_tag = rspec.xml.root.add_element("statistics", call=callname)
84
85             stat_tag = stats_tag.add_element("aggregate", name=str(aggname), 
86                                              elapsed=str(elapsed), status=str(status))
87
88             if exc_info:
89                 exc_tag = stat_tag.add_element("exc_info", name=str(exc_info[1]))
90
91                 # formats the traceback as one big text blob
92                 #exc_tag.text = "\n".join(traceback.format_exception(exc_info[0], exc_info[1], exc_info[2]))
93
94                 # formats the traceback as a set of xml elements
95                 tb = traceback.extract_tb(exc_info[2])
96                 for item in tb:
97                     exc_frame = exc_tag.add_element("tb_frame", filename=str(item[0]), 
98                                                     line=str(item[1]), func=str(item[2]), code=str(item[3]))
99
100         except Exception, e:
101             logger.warn("add_slicemgr_stat failed on  %s: %s" %(aggname, str(e)))
102     
103     def ListResources(self, api, creds, options):
104         call_id = options.get('call_id') 
105         if Callids().already_handled(call_id): return ""
106
107         version_manager = VersionManager()
108
109         def _ListResources(aggregate, server, credential, options):
110             forward_options = copy(options)
111             tStart = time.time()
112             try:
113                 version = api.get_cached_server_version(server)
114                 # force ProtoGENI aggregates to give us a v2 RSpec
115                 if 'sfa' in version.keys():
116                     forward_options['rspec_version'] = version_manager.get_version('SFA 1').to_dict()
117                 else:
118                     forward_options['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict()
119                 rspec = server.ListResources(credential, forward_options)
120                 return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
121             except Exception, e:
122                 api.logger.log_exc("ListResources failed at %s" %(server.url))
123                 return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()}
124     
125         # get slice's hrn from options
126         xrn = options.get('geni_slice_urn', '')
127         (hrn, type) = urn_to_hrn(xrn)
128         if 'geni_compressed' in options:
129             del(options['geni_compressed'])
130     
131         # get the rspec's return format from options
132         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
133         version_string = "rspec_%s" % (rspec_version)
134     
135         # look in cache first
136         cached_requested = options.get('cached', True)
137         if not xrn and self.cache and cached_request:
138             rspec =  self.cache.get(version_string)
139             if rspec:
140                 api.logger.debug("SliceManager.ListResources returns cached advertisement")
141                 return rspec
142     
143         # get the callers hrn
144         valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
145         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
146     
147         # attempt to use delegated credential first
148         cred = api.getDelegatedCredential(creds)
149         if not cred:
150             cred = api.getCredential()
151         threads = ThreadManager()
152         for aggregate in api.aggregates:
153             # prevent infinite loop. Dont send request back to caller
154             # unless the caller is the aggregate's SM
155             if caller_hrn == aggregate and aggregate != api.hrn:
156                 continue
157     
158             # get the rspec from the aggregate
159             interface = api.aggregates[aggregate]
160             server = api.server_proxy(interface, cred)
161             threads.run(_ListResources, aggregate, server, [cred], options)
162     
163     
164         results = threads.get_results()
165         rspec_version = version_manager.get_version(options.get('geni_rspec_version'))
166         if xrn:    
167             result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
168         else: 
169             result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad')
170         rspec = RSpec(version=result_version)
171         for result in results:
172             self.add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], 
173                                    result["status"], result.get("exc_info",None))
174             if result["status"]=="success":
175                 try:
176                     rspec.version.merge(ReturnValue.get_value(result["rspec"]))
177                 except:
178                     api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
179     
180         # cache the result
181         if self.cache and not xrn:
182             api.logger.debug("SliceManager.ListResources caches advertisement")
183             self.cache.add(version_string, rspec.toxml())
184     
185         return rspec.toxml()
186
187
188     def CreateSliver(self, api, xrn, creds, rspec_str, users, options):
189         call_id = options.get('call_id')
190         if Callids().already_handled(call_id): return ""
191     
192         version_manager = VersionManager()
193         def _CreateSliver(aggregate, server, xrn, credential, rspec, users, options):
194             tStart = time.time()
195             try:
196                 # Need to call GetVersion at an aggregate to determine the supported
197                 # rspec type/format beofre calling CreateSliver at an Aggregate.
198                 server_version = api.get_cached_server_version(server)
199                 requested_users = users
200                 if 'sfa' not in server_version and 'geni_api' in server_version:
201                     # sfa aggregtes support both sfa and pg rspecs, no need to convert
202                     # if aggregate supports sfa rspecs. otherwise convert to pg rspec
203                     rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request'))
204                     filter = {'component_manager_id': server_version['urn']}
205                     rspec.filter(filter)
206                     rspec = rspec.toxml()
207                     requested_users = sfa_to_pg_users_arg(users)
208                 rspec = server.CreateSliver(xrn, credential, rspec, requested_users, options)
209                 return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
210             except:
211                 logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
212                 return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()}
213
214         # Validate the RSpec against PlanetLab's schema --disabled for now
215         # The schema used here needs to aggregate the PL and VINI schemas
216         # schema = "/var/www/html/schemas/pl.rng"
217         rspec = RSpec(rspec_str)
218     #    schema = None
219     #    if schema:
220     #        rspec.validate(schema)
221     
222         # if there is a <statistics> section, the aggregates don't care about it,
223         # so delete it.
224         self.drop_slicemgr_stats(rspec)
225     
226         # attempt to use delegated credential first
227         cred = api.getDelegatedCredential(creds)
228         if not cred:
229             cred = api.getCredential()
230     
231         # get the callers hrn
232         hrn, type = urn_to_hrn(xrn)
233         valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
234         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
235         threads = ThreadManager()
236         for aggregate in api.aggregates:
237             # prevent infinite loop. Dont send request back to caller
238             # unless the caller is the aggregate's SM 
239             if caller_hrn == aggregate and aggregate != api.hrn:
240                 continue
241             interface = api.aggregates[aggregate]
242             server = api.server_proxy(interface, cred)
243             # Just send entire RSpec to each aggregate
244             threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, options)
245                 
246         results = threads.get_results()
247         manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
248         result_rspec = RSpec(version=manifest_version)
249         for result in results:
250             self.add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], 
251                                    result["status"], result.get("exc_info",None))
252             if result["status"]=="success":
253                 try:
254                     result_rspec.version.merge(ReturnValue.get_value(result["rspec"]))
255                 except:
256                     api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
257         return result_rspec.toxml()
258     
259     def RenewSliver(self, api, xrn, creds, expiration_time, options):
260         call_id = options.get('call_id')
261         if Callids().already_handled(call_id): return True
262
263         def _RenewSliver(server, xrn, creds, expiration_time, options):
264             return server.RenewSliver(xrn, creds, expiration_time, options)
265     
266         (hrn, type) = urn_to_hrn(xrn)
267         # get the callers hrn
268         valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
269         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
270     
271         # attempt to use delegated credential first
272         cred = api.getDelegatedCredential(creds)
273         if not cred:
274             cred = api.getCredential()
275         threads = ThreadManager()
276         for aggregate in api.aggregates:
277             # prevent infinite loop. Dont send request back to caller
278             # unless the caller is the aggregate's SM
279             if caller_hrn == aggregate and aggregate != api.hrn:
280                 continue
281             interface = api.aggregates[aggregate]
282             server = api.server_proxy(interface, cred)
283             threads.run(_RenewSliver, server, xrn, [cred], expiration_time, options)
284         # 'and' the results
285         results = [ReturnValue.get_value(result) for result in threads.get_results()]
286         return reduce (lambda x,y: x and y, results , True)
287     
288     def DeleteSliver(self, api, xrn, creds, options):
289         call_id = options.get('call_id') 
290         if Callids().already_handled(call_id): return ""
291
292         def _DeleteSliver(server, xrn, creds, options):
293             return server.DeleteSliver(xrn, creds, options)
294
295         (hrn, type) = urn_to_hrn(xrn)
296         # get the callers hrn
297         valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
298         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
299     
300         # attempt to use delegated credential first
301         cred = api.getDelegatedCredential(creds)
302         if not cred:
303             cred = api.getCredential()
304         threads = ThreadManager()
305         for aggregate in api.aggregates:
306             # prevent infinite loop. Dont send request back to caller
307             # unless the caller is the aggregate's SM
308             if caller_hrn == aggregate and aggregate != api.hrn:
309                 continue
310             interface = api.aggregates[aggregate]
311             server = api.server_proxy(interface, cred)
312             threads.run(_DeleteSliver, server, xrn, [cred], options)
313         threads.get_results()
314         return 1
315     
316     
317     # first draft at a merging SliverStatus
318     def SliverStatus(self, api, slice_xrn, creds, options):
319         def _SliverStatus(server, xrn, creds, options):
320             return server.SliverStatus(xrn, creds, options)
321
322         call_id = options.get('call_id') 
323         if Callids().already_handled(call_id): return {}
324         # attempt to use delegated credential first
325         cred = api.getDelegatedCredential(creds)
326         if not cred:
327             cred = api.getCredential()
328         threads = ThreadManager()
329         for aggregate in api.aggregates:
330             interface = api.aggregates[aggregate]
331             server = api.server_proxy(interface, cred)
332             threads.run (_SliverStatus, server, slice_xrn, [cred], options)
333         results = [ReturnValue.get_value(result) for result in threads.get_results()]
334     
335         # get rid of any void result - e.g. when call_id was hit, where by convention we return {}
336         results = [ result for result in results if result and result['geni_resources']]
337     
338         # do not try to combine if there's no result
339         if not results : return {}
340     
341         # otherwise let's merge stuff
342         overall = {}
343     
344         # mmh, it is expected that all results carry the same urn
345         overall['geni_urn'] = results[0]['geni_urn']
346         overall['pl_login'] = results[0]['pl_login']
347         # append all geni_resources
348         overall['geni_resources'] = \
349             reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
350         overall['status'] = 'unknown'
351         if overall['geni_resources']:
352             overall['status'] = 'ready'
353     
354         return overall
355     
356     def ListSlices(self, api, creds, options):
357         call_id = options.get('call_id') 
358         if Callids().already_handled(call_id): return []
359     
360         def _ListSlices(server, creds, options):
361             return server.ListSlices(creds, options)
362
363         # look in cache first
364         # xxx is this really frequent enough that it is worth being cached ?
365         if self.cache:
366             slices = self.cache.get('slices')
367             if slices:
368                 api.logger.debug("SliceManager.ListSlices returns from cache")
369                 return slices
370     
371         # get the callers hrn
372         valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
373         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
374     
375         # attempt to use delegated credential first
376         cred= api.getDelegatedCredential(creds)
377         if not cred:
378             cred = api.getCredential()
379         threads = ThreadManager()
380         # fetch from aggregates
381         for aggregate in api.aggregates:
382             # prevent infinite loop. Dont send request back to caller
383             # unless the caller is the aggregate's SM
384             if caller_hrn == aggregate and aggregate != api.hrn:
385                 continue
386             interface = api.aggregates[aggregate]
387             server = api.server_proxy(interface, cred)
388             threads.run(_ListSlices, server, [cred], options)
389     
390         # combime results
391         results = [ReturnValue.get_value(result) for result in threads.get_results()]
392         slices = []
393         for result in results:
394             slices.extend(result)
395     
396         # cache the result
397         if self.cache:
398             api.logger.debug("SliceManager.ListSlices caches value")
399             self.cache.add('slices', slices)
400     
401         return slices
402     
403     
404     def GetTicket(self, api, xrn, creds, rspec, users, options):
405         slice_hrn, type = urn_to_hrn(xrn)
406         # get the netspecs contained within the clients rspec
407         aggregate_rspecs = {}
408         tree= etree.parse(StringIO(rspec))
409         elements = tree.findall('./network')
410         for element in elements:
411             aggregate_hrn = element.values()[0]
412             aggregate_rspecs[aggregate_hrn] = rspec 
413     
414         # get the callers hrn
415         valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
416         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
417     
418         # attempt to use delegated credential first
419         cred = api.getDelegatedCredential(creds)
420         if not cred:
421             cred = api.getCredential() 
422         threads = ThreadManager()
423         for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
424             # xxx sounds like using call_id here would be safer
425             # prevent infinite loop. Dont send request back to caller
426             # unless the caller is the aggregate's SM
427             if caller_hrn == aggregate and aggregate != api.hrn:
428                 continue
429             
430             interface = api.aggregates[aggregate]
431             server = api.server_proxy(interface, cred)
432             threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users, options)
433     
434         results = threads.get_results()
435         
436         # gather information from each ticket 
437         rspec = None
438         initscripts = []
439         slivers = [] 
440         object_gid = None  
441         for result in results:
442             agg_ticket = SfaTicket(string=result)
443             attrs = agg_ticket.get_attributes()
444             if not object_gid:
445                 object_gid = agg_ticket.get_gid_object()
446             if not rspec:
447                 rspec = RSpec(agg_ticket.get_rspec())
448             else:
449                 rspec.version.merge(agg_ticket.get_rspec())
450             initscripts.extend(attrs.get('initscripts', [])) 
451             slivers.extend(attrs.get('slivers', [])) 
452         
453         # merge info
454         attributes = {'initscripts': initscripts,
455                      'slivers': slivers}
456         
457         # create a new ticket
458         ticket = SfaTicket(subject = slice_hrn)
459         ticket.set_gid_caller(api.auth.client_gid)
460         ticket.set_issuer(key=api.key, subject=api.hrn)
461         ticket.set_gid_object(object_gid)
462         ticket.set_pubkey(object_gid.get_pubkey())
463         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
464         ticket.set_attributes(attributes)
465         ticket.set_rspec(rspec.toxml())
466         ticket.encode()
467         ticket.sign()          
468         return ticket.save_to_string(save_parents=True)
469     
470     def start_slice(self, api, xrn, creds):
471         hrn, type = urn_to_hrn(xrn)
472     
473         # get the callers hrn
474         valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
475         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
476     
477         # attempt to use delegated credential first
478         cred = api.getDelegatedCredential(creds)
479         if not cred:
480             cred = api.getCredential()
481         threads = ThreadManager()
482         for aggregate in api.aggregates:
483             # prevent infinite loop. Dont send request back to caller
484             # unless the caller is the aggregate's SM
485             if caller_hrn == aggregate and aggregate != api.hrn:
486                 continue
487             interface = api.aggregates[aggregate]
488             server = api.server_proxy(interface, cred)    
489             threads.run(server.Start, xrn, cred)
490         threads.get_results()    
491         return 1
492      
493     def stop_slice(self, api, xrn, creds):
494         hrn, type = urn_to_hrn(xrn)
495     
496         # get the callers hrn
497         valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
498         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
499     
500         # attempt to use delegated credential first
501         cred = api.getDelegatedCredential(creds)
502         if not cred:
503             cred = api.getCredential()
504         threads = ThreadManager()
505         for aggregate in api.aggregates:
506             # prevent infinite loop. Dont send request back to caller
507             # unless the caller is the aggregate's SM
508             if caller_hrn == aggregate and aggregate != api.hrn:
509                 continue
510             interface = api.aggregates[aggregate]
511             server = api.server_proxy(interface, cred)
512             threads.run(server.Stop, xrn, cred)
513         threads.get_results()    
514         return 1
515     
516     def reset_slice(self, api, xrn):
517         """
518         Not implemented
519         """
520         return 1
521     
522     def shutdown(self, api, xrn, creds):
523         """
524         Not implemented   
525         """
526         return 1
527     
528     def status(self, api, xrn, creds):
529         """
530         Not implemented 
531         """
532         return 1
533