fix merge problem
[sfa.git] / sfa / managers / slice_manager.py
1 import sys
2 import traceback
3 import time
4 from StringIO import StringIO
5 from copy import copy
6 from lxml import etree
7
8 from sfa.trust.sfaticket import SfaTicket
9 from sfa.trust.credential import Credential
10
11 from sfa.util.sfalogging import logger
12 from sfa.util.xrn import Xrn, urn_to_hrn
13 from sfa.util.version import version_core
14 from sfa.util.callids import Callids
15
16 from sfa.server.threadmanager import ThreadManager
17
18 from sfa.rspecs.rspec_converter import RSpecConverter
19 from sfa.rspecs.version_manager import VersionManager
20 from sfa.rspecs.rspec import RSpec 
21 from sfa.client.client_helper import sfa_to_pg_users_arg
22
23 class SliceManager:
24     def __init__ (self):
25     #    self.caching=False
26         self.caching=True
27         
28     
29     def _call_id_supported(self, api, server):
30         """
31         Returns true if server support the optional call_id arg, false otherwise.
32         """
33         server_version = api.get_cached_server_version(server)
34     
35         if 'sfa' in server_version:
36             code_tag = server_version['code_tag']
37             code_tag_parts = code_tag.split("-")
38     
39             version_parts = code_tag_parts[0].split(".")
40             major, minor = version_parts[0:2]
41             rev = code_tag_parts[1]
42             if int(major) > 1:
43                 if int(minor) > 0 or int(rev) > 20:
44                     return True
45         return False
46     
47     # we have specialized xmlrpclib.ServerProxy to remember the input url
48     # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
49     def get_serverproxy_url (self, server):
50         try:
51             return server.get_url()
52         except:
53             logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
54             return server._ServerProxy__host + server._ServerProxy__handler
55     
56     def GetVersion(self, api):
57         # peers explicitly in aggregates.xml
58         peers =dict ([ (peername,self.get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
59                        if peername != api.hrn])
60         version_manager = VersionManager()
61         ad_rspec_versions = []
62         request_rspec_versions = []
63         for rspec_version in version_manager.versions:
64             if rspec_version.content_type in ['*', 'ad']:
65                 ad_rspec_versions.append(rspec_version.to_dict())
66             if rspec_version.content_type in ['*', 'request']:
67                 request_rspec_versions.append(rspec_version.to_dict())
68         default_rspec_version = version_manager.get_version("sfa 1").to_dict()
69         xrn=Xrn(api.hrn, 'authority+sa')
70         version_more = {'interface':'slicemgr',
71                         'hrn' : xrn.get_hrn(),
72                         'urn' : xrn.get_urn(),
73                         'peers': peers,
74                         'request_rspec_versions': request_rspec_versions,
75                         'ad_rspec_versions': ad_rspec_versions,
76                         'default_ad_rspec': default_rspec_version
77                     }
78         sm_version=version_core(version_more)
79         # local aggregate if present needs to have localhost resolved
80         if api.hrn in api.aggregates:
81             local_am_url=self.get_serverproxy_url(api.aggregates[api.hrn])
82             sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
83         return sm_version
84
85 def drop_slicemgr_stats(rspec):
86     try:
87         stats_elements = rspec.xml.xpath('//statistics')
88         for node in stats_elements:
89             node.getparent().remove(node)
90     except Exception, e:
91         logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
92
93 def add_slicemgr_stat(rspec, callname, aggname, elapsed, status, exc_info=None):
94     try:
95         stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
96         if stats_tags:
97             stats_tag = stats_tags[0]
98         else:
99             stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname)
100
101         stat_tag = etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
102
103         if exc_info:
104             exc_tag = etree.SubElement(stat_tag, "exc_info", name=str(exc_info[1]))
105
106             # this would encode it as a text block
107             #exc_tag.text = "\n".join(traceback.format_exception(exc_info[0], exc_info[1], exc_info[2]))
108
109             # this encodes the traceback as a set of xml tags
110             tb = traceback.extract_tb(exc_info[2])
111             for item in tb:
112                 exc_frame = etree.SubElement(exc_tag, "tb_frame", filename=str(item[0]), line=str(item[1]), func=str(item[2]), code=str(item[3]))
113
114     except Exception, e:
115         logger.warn("add_slicemgr_stat failed on  %s: %s" %(aggname, str(e)))
116
117 def ListResources(api, creds, options, call_id):
118     version_manager = VersionManager()
119     def _ListResources(aggregate, server, credential, opts, call_id):
120
121         my_opts = copy(opts)
122         args = [credential, my_opts]
123         tStart = time.time()
124         try:
125             stats_elements = rspec.xml.xpath('//statistics')
126             for node in stats_elements:
127                 node.getparent().remove(node)
128         except Exception, e:
129             api.logger.log_exc("ListResources failed at %s" %(server.url))
130             return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()}
131
132     if Callids().already_handled(call_id): return ""
133
134     # get slice's hrn from options
135     xrn = options.get('geni_slice_urn', '')
136     (hrn, type) = urn_to_hrn(xrn)
137     if 'geni_compressed' in options:
138         del(options['geni_compressed'])
139
140     # get the rspec's return format from options
141     rspec_version = version_manager.get_version(options.get('rspec_version'))
142     version_string = "rspec_%s" % (rspec_version.to_string())
143
144     # look in cache first
145     if caching and api.cache and not xrn:
146         rspec =  api.cache.get(version_string)
147         if rspec:
148             return rspec
149
150     # get the callers hrn
151     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
152     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
153
154     # attempt to use delegated credential first
155     cred = api.getDelegatedCredential(creds)
156     if not cred:
157         cred = api.getCredential()
158     threads = ThreadManager()
159     for aggregate in api.aggregates:
160         # prevent infinite loop. Dont send request back to caller
161         # unless the caller is the aggregate's SM
162         if caller_hrn == aggregate and aggregate != api.hrn:
163             continue
164
165         # get the rspec from the aggregate
166         interface = api.aggregates[aggregate]
167         server = api.get_server(interface, cred)
168         threads.run(_ListResources, aggregate, server, [cred], options, call_id)
169
170
171     results = threads.get_results()
172     rspec_version = version_manager.get_version(options.get('rspec_version'))
173     if xrn:    
174         result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
175     else:
176         result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad')
177     rspec = RSpec(version=result_version)
178     for result in results:
179         add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"], result.get("exc_info",None))
180         if result["status"]=="success":
181             try:
182                 rspec.version.merge(result["rspec"])
183             except:
184                 api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
185
186     # cache the result
187     if caching and api.cache and not xrn:
188         api.cache.add(version_string, rspec.toxml())
189
190     return rspec.toxml()
191
192
193 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
194
195     version_manager = VersionManager()
196     def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
197         tStart = time.time()
198         try:
199             stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
200             if stats_tags:
201                 stats_tag = stats_tags[0]
202             else:
203                 stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname)
204     
205             etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
206         except Exception, e:
207             logger.warn("add_slicemgr_stat failed on  %s: %s" %(aggname, str(e)))
208     
209     def ListResources(self, api, creds, options, call_id):
210         version_manager = VersionManager()
211         def _ListResources(aggregate, server, credential, opts, call_id):
212     
213             my_opts = copy(opts)
214             args = [credential, my_opts]
215             tStart = time.time()
216             try:
217                 if self._call_id_supported(api, server):
218                     args.append(call_id)
219                 version = api.get_cached_server_version(server)
220                 # force ProtoGENI aggregates to give us a v2 RSpec
221                 if 'sfa' not in version.keys():
222                     my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict()
223                 rspec = server.ListResources(*args)
224                 return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
225             except Exception, e:
226                 api.logger.log_exc("ListResources failed at %s" %(server.url))
227                 return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
228     
229         if Callids().already_handled(call_id): return ""
230     
231         # get slice's hrn from options
232         xrn = options.get('geni_slice_urn', '')
233         (hrn, type) = urn_to_hrn(xrn)
234         if 'geni_compressed' in options:
235             del(options['geni_compressed'])
236     
237         # get the rspec's return format from options
238         rspec_version = version_manager.get_version(options.get('rspec_version'))
239         version_string = "rspec_%s" % (rspec_version.to_string())
240     
241         # look in cache first
242         if self.caching and api.cache and not xrn:
243             rspec =  api.cache.get(version_string)
244             if rspec:
245                 return rspec
246     
247         # get the callers hrn
248         valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
249         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
250     
251         # attempt to use delegated credential first
252         cred = api.getDelegatedCredential(creds)
253         if not cred:
254             cred = api.getCredential()
255         threads = ThreadManager()
256         for aggregate in api.aggregates:
257             # prevent infinite loop. Dont send request back to caller
258             # unless the caller is the aggregate's SM
259             if caller_hrn == aggregate and aggregate != api.hrn:
260                 continue
261     
262             # get the rspec from the aggregate
263             interface = api.aggregates[aggregate]
264             server = api.server_proxy(interface, cred)
265             threads.run(_ListResources, aggregate, server, [cred], options, call_id)
266     
267     
268         results = threads.get_results()
269         rspec_version = version_manager.get_version(options.get('rspec_version'))
270         if xrn:    
271             result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
272         else: 
273             result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad')
274         rspec = RSpec(version=result_version)
275         for result in results:
276             self.add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
277             if result["status"]=="success":
278                 try:
279                     rspec.version.merge(result["rspec"])
280                 except:
281                     api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
282     
283         # cache the result
284         if self.caching and api.cache and not xrn:
285             api.cache.add(version_string, rspec.toxml())
286     
287         return rspec.toxml()
288     
289     
290     def CreateSliver(self, api, xrn, creds, rspec_str, users, call_id):
291     
292         version_manager = VersionManager()
293         def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
294             tStart = time.time()
295             try:
296                 # Need to call GetVersion at an aggregate to determine the supported
297                 # rspec type/format beofre calling CreateSliver at an Aggregate.
298                 server_version = api.get_cached_server_version(server)
299                 requested_users = users
300                 if 'sfa' not in server_version and 'geni_api' in server_version:
301                     # sfa aggregtes support both sfa and pg rspecs, no need to convert
302                     # if aggregate supports sfa rspecs. otherwise convert to pg rspec
303                     rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request'))
304                     filter = {'component_manager_id': server_version['urn']}
305                     rspec.filter(filter)
306                     rspec = rspec.toxml()
307                     requested_users = sfa_to_pg_users_arg(users)
308                 args = [xrn, credential, rspec, requested_users]
309                 if self._call_id_supported(api, server):
310                     args.append(call_id)
311                 rspec = server.CreateSliver(*args)
312                 return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
313             except: 
314                 logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
315                 return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
316     
317         if Callids().already_handled(call_id): return ""
318         # Validate the RSpec against PlanetLab's schema --disabled for now
319         # The schema used here needs to aggregate the PL and VINI schemas
320         # schema = "/var/www/html/schemas/pl.rng"
321         rspec = RSpec(rspec_str)
322     #    schema = None
323     #    if schema:
324     #        rspec.validate(schema)
325     
326         # if there is a <statistics> section, the aggregates don't care about it,
327         # so delete it.
328         self.drop_slicemgr_stats(rspec)
329     
330         # attempt to use delegated credential first
331         cred = api.getDelegatedCredential(creds)
332         if not cred:
333             cred = api.getCredential()
334     
335         # get the callers hrn
336         hrn, type = urn_to_hrn(xrn)
337         valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
338         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
339         threads = ThreadManager()
340         for aggregate in api.aggregates:
341             # prevent infinite loop. Dont send request back to caller
342             # unless the caller is the aggregate's SM 
343             if caller_hrn == aggregate and aggregate != api.hrn:
344                 continue
345             interface = api.aggregates[aggregate]
346             server = api.server_proxy(interface, cred)
347             # Just send entire RSpec to each aggregate
348             threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id)
349                 
350         results = threads.get_results()
351         manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
352         result_rspec = RSpec(version=manifest_version)
353         for result in results:
354             self.add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
355             if result["status"]=="success":
356                 try:
357                     result_rspec.version.merge(result["rspec"])
358                 except:
359                     api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
360         return result_rspec.toxml()
361     
362     def RenewSliver(self, api, xrn, creds, expiration_time, call_id):
363         def _RenewSliver(server, xrn, creds, expiration_time, call_id):
364             server_version = api.get_cached_server_version(server)
365             args =  [xrn, creds, expiration_time, call_id]
366             if self._call_id_supported(api, server):
367                 args.append(call_id)
368 <<<<<<< HEAD:sfa/managers/slice_manager.py
369             rspec = server.CreateSliver(*args)
370             return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
371         except: 
372             logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
373             return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()}
374
375     if Callids().already_handled(call_id): return ""
376     # Validate the RSpec against PlanetLab's schema --disabled for now
377     # The schema used here needs to aggregate the PL and VINI schemas
378     # schema = "/var/www/html/schemas/pl.rng"
379     rspec = RSpec(rspec_str)
380 #    schema = None
381 #    if schema:
382 #        rspec.validate(schema)
383
384     # if there is a <statistics> section, the aggregates don't care about it,
385     # so delete it.
386     drop_slicemgr_stats(rspec)
387
388     # attempt to use delegated credential first
389     cred = api.getDelegatedCredential(creds)
390     if not cred:
391         cred = api.getCredential()
392
393     # get the callers hrn
394     hrn, type = urn_to_hrn(xrn)
395     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
396     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
397     threads = ThreadManager()
398     for aggregate in api.aggregates:
399         # prevent infinite loop. Dont send request back to caller
400         # unless the caller is the aggregate's SM 
401         if caller_hrn == aggregate and aggregate != api.hrn:
402             continue
403         interface = api.aggregates[aggregate]
404         server = api.get_server(interface, cred)
405         # Just send entire RSpec to each aggregate
406         threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id)
407 =======
408             return server.RenewSliver(*args)
409     
410         if Callids().already_handled(call_id): return True
411     
412         (hrn, type) = urn_to_hrn(xrn)
413         # get the callers hrn
414         valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
415         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
416     
417         # attempt to use delegated credential first
418         cred = api.getDelegatedCredential(creds)
419         if not cred:
420             cred = api.getCredential()
421         threads = ThreadManager()
422         for aggregate in api.aggregates:
423             # prevent infinite loop. Dont send request back to caller
424             # unless the caller is the aggregate's SM
425             if caller_hrn == aggregate and aggregate != api.hrn:
426                 continue
427             interface = api.aggregates[aggregate]
428             server = api.server_proxy(interface, cred)
429             threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id)
430         # 'and' the results
431         return reduce (lambda x,y: x and y, threads.get_results() , True)
432     
433     def DeleteSliver(self, api, xrn, creds, call_id):
434         def _DeleteSliver(server, xrn, creds, call_id):
435             server_version = api.get_cached_server_version(server)
436             args =  [xrn, creds]
437             if self._call_id_supported(api, server):
438                 args.append(call_id)
439             return server.DeleteSliver(*args)
440     
441         if Callids().already_handled(call_id): return ""
442         (hrn, type) = urn_to_hrn(xrn)
443         # get the callers hrn
444         valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
445         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
446     
447         # attempt to use delegated credential first
448         cred = api.getDelegatedCredential(creds)
449         if not cred:
450             cred = api.getCredential()
451         threads = ThreadManager()
452         for aggregate in api.aggregates:
453             # prevent infinite loop. Dont send request back to caller
454             # unless the caller is the aggregate's SM
455             if caller_hrn == aggregate and aggregate != api.hrn:
456                 continue
457             interface = api.aggregates[aggregate]
458             server = api.server_proxy(interface, cred)
459             threads.run(_DeleteSliver, server, xrn, [cred], call_id)
460         threads.get_results()
461         return 1
462     
463     
464     # first draft at a merging SliverStatus
465     def SliverStatus(self, api, slice_xrn, creds, call_id):
466         def _SliverStatus(server, xrn, creds, call_id):
467             server_version = api.get_cached_server_version(server)
468             args =  [xrn, creds]
469             if self._call_id_supported(api, server):
470                 args.append(call_id)
471             return server.SliverStatus(*args)
472         
473         if Callids().already_handled(call_id): return {}
474         # attempt to use delegated credential first
475         cred = api.getDelegatedCredential(creds)
476         if not cred:
477             cred = api.getCredential()
478         threads = ThreadManager()
479         for aggregate in api.aggregates:
480             interface = api.aggregates[aggregate]
481             server = api.server_proxy(interface, cred)
482             threads.run (_SliverStatus, server, slice_xrn, [cred], call_id)
483         results = threads.get_results()
484     
485         # get rid of any void result - e.g. when call_id was hit where by convention we return {}
486         results = [ result for result in results if result and result['geni_resources']]
487     
488         # do not try to combine if there's no result
489         if not results : return {}
490     
491         # otherwise let's merge stuff
492         overall = {}
493     
494         # mmh, it is expected that all results carry the same urn
495         overall['geni_urn'] = results[0]['geni_urn']
496         overall['pl_login'] = results[0]['pl_login']
497         # append all geni_resources
498         overall['geni_resources'] = \
499             reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
500         overall['status'] = 'unknown'
501         if overall['geni_resources']:
502             overall['status'] = 'ready'
503     
504         return overall
505     
506     def ListSlices(self, api, creds, call_id):
507         def _ListSlices(server, creds, call_id):
508             server_version = api.get_cached_server_version(server)
509             args =  [creds]
510             if self._call_id_supported(api, server):
511                 args.append(call_id)
512             return server.ListSlices(*args)
513     
514         if Callids().already_handled(call_id): return []
515     
516         # look in cache first
517         if self.caching and api.cache:
518             slices = api.cache.get('slices')
519             if slices:
520                 return slices
521     
522         # get the callers hrn
523         valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
524         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
525     
526         # attempt to use delegated credential first
527         cred= api.getDelegatedCredential(creds)
528         if not cred:
529             cred = api.getCredential()
530         threads = ThreadManager()
531         # fetch from aggregates
532         for aggregate in api.aggregates:
533             # prevent infinite loop. Dont send request back to caller
534             # unless the caller is the aggregate's SM
535             if caller_hrn == aggregate and aggregate != api.hrn:
536                 continue
537             interface = api.aggregates[aggregate]
538             server = api.server_proxy(interface, cred)
539             threads.run(_ListSlices, server, [cred], call_id)
540     
541         # combime results
542         results = threads.get_results()
543         slices = []
544         for result in results:
545             slices.extend(result)
546     
547         # cache the result
548         if self.caching and api.cache:
549             api.cache.add('slices', slices)
550     
551         return slices
552     
553     
554     def get_ticket(self, api, xrn, creds, rspec, users):
555         slice_hrn, type = urn_to_hrn(xrn)
556         # get the netspecs contained within the clients rspec
557         aggregate_rspecs = {}
558         tree= etree.parse(StringIO(rspec))
559         elements = tree.findall('./network')
560         for element in elements:
561             aggregate_hrn = element.values()[0]
562             aggregate_rspecs[aggregate_hrn] = rspec 
563     
564         # get the callers hrn
565         valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
566         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
567     
568         # attempt to use delegated credential first
569         cred = api.getDelegatedCredential(creds)
570         if not cred:
571             cred = api.getCredential() 
572         threads = ThreadManager()
573         for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
574             # prevent infinite loop. Dont send request back to caller
575             # unless the caller is the aggregate's SM
576             if caller_hrn == aggregate and aggregate != api.hrn:
577                 continue
578 >>>>>>> a3996bfa45298c8d0abfd58916221abba737441c:sfa/managers/slice_manager.py
579             
580 <<<<<<< HEAD:sfa/managers/slice_manager.py
581     results = threads.get_results()
582     manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
583     result_rspec = RSpec(version=manifest_version)
584     for result in results:
585         add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"], result.get("exc_info",None))
586         if result["status"]=="success":
587             try:
588                 result_rspec.version.merge(result["rspec"])
589             except:
590                 api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
591     return result_rspec.toxml()
592
593 def RenewSliver(api, xrn, creds, expiration_time, call_id):
594     def _RenewSliver(server, xrn, creds, expiration_time, call_id):
595         server_version = api.get_cached_server_version(server)
596         args =  [xrn, creds, expiration_time, call_id]
597         if _call_id_supported(api, server):
598             args.append(call_id)
599         return server.RenewSliver(*args)
600
601     if Callids().already_handled(call_id): return True
602
603     (hrn, type) = urn_to_hrn(xrn)
604     # get the callers hrn
605     valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
606     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
607
608     # attempt to use delegated credential first
609     cred = api.getDelegatedCredential(creds)
610     if not cred:
611         cred = api.getCredential()
612     threads = ThreadManager()
613     for aggregate in api.aggregates:
614         # prevent infinite loop. Dont send request back to caller
615         # unless the caller is the aggregate's SM
616         if caller_hrn == aggregate and aggregate != api.hrn:
617             continue
618         interface = api.aggregates[aggregate]
619         server = api.get_server(interface, cred)
620         threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id)
621     # 'and' the results
622     return reduce (lambda x,y: x and y, threads.get_results() , True)
623
624 def DeleteSliver(api, xrn, creds, call_id):
625     def _DeleteSliver(server, xrn, creds, call_id):
626         server_version = api.get_cached_server_version(server)
627         args =  [xrn, creds]
628         if _call_id_supported(api, server):
629             args.append(call_id)
630         return server.DeleteSliver(*args)
631
632     if Callids().already_handled(call_id): return ""
633     (hrn, type) = urn_to_hrn(xrn)
634     # get the callers hrn
635     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
636     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
637
638     # attempt to use delegated credential first
639     cred = api.getDelegatedCredential(creds)
640     if not cred:
641         cred = api.getCredential()
642     threads = ThreadManager()
643     for aggregate in api.aggregates:
644         # prevent infinite loop. Dont send request back to caller
645         # unless the caller is the aggregate's SM
646         if caller_hrn == aggregate and aggregate != api.hrn:
647             continue
648         interface = api.aggregates[aggregate]
649         server = api.get_server(interface, cred)
650         threads.run(_DeleteSliver, server, xrn, [cred], call_id)
651     threads.get_results()
652     return 1
653
654
655 # first draft at a merging SliverStatus
656 def SliverStatus(api, slice_xrn, creds, call_id):
657     def _SliverStatus(server, xrn, creds, call_id):
658         server_version = api.get_cached_server_version(server)
659         args =  [xrn, creds]
660         if _call_id_supported(api, server):
661             args.append(call_id)
662         return server.SliverStatus(*args)
663 =======
664             interface = api.aggregates[aggregate]
665             server = api.server_proxy(interface, cred)
666             threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users)
667 >>>>>>> a3996bfa45298c8d0abfd58916221abba737441c:sfa/managers/slice_manager.py
668     
669 <<<<<<< HEAD:sfa/managers/slice_manager.py
670     if Callids().already_handled(call_id): return {}
671     # attempt to use delegated credential first
672     cred = api.getDelegatedCredential(creds)
673     if not cred:
674         cred = api.getCredential()
675     threads = ThreadManager()
676     for aggregate in api.aggregates:
677         interface = api.aggregates[aggregate]
678         server = api.get_server(interface, cred)
679         threads.run (_SliverStatus, server, slice_xrn, [cred], call_id)
680     results = threads.get_results()
681
682     # get rid of any void result - e.g. when call_id was hit where by convention we return {}
683     results = [ result for result in results if result and result['geni_resources']]
684
685     # do not try to combine if there's no result
686     if not results : return {}
687
688     # otherwise let's merge stuff
689     overall = {}
690
691     # mmh, it is expected that all results carry the same urn
692     overall['geni_urn'] = results[0]['geni_urn']
693     overall['pl_login'] = results[0]['pl_login']
694     # append all geni_resources
695     overall['geni_resources'] = \
696         reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
697     overall['status'] = 'unknown'
698     if overall['geni_resources']:
699         overall['status'] = 'ready'
700
701     return overall
702
703 caching=True
704 #caching=False
705 def ListSlices(api, creds, call_id):
706     def _ListSlices(server, creds, call_id):
707         server_version = api.get_cached_server_version(server)
708         args =  [creds]
709         if _call_id_supported(api, server):
710             args.append(call_id)
711         return server.ListSlices(*args)
712
713     if Callids().already_handled(call_id): return []
714
715     # look in cache first
716     if caching and api.cache:
717         slices = api.cache.get('slices')
718         if slices:
719             return slices
720
721     # get the callers hrn
722     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
723     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
724
725     # attempt to use delegated credential first
726     cred= api.getDelegatedCredential(creds)
727     if not cred:
728         cred = api.getCredential()
729     threads = ThreadManager()
730     # fetch from aggregates
731     for aggregate in api.aggregates:
732         # prevent infinite loop. Dont send request back to caller
733         # unless the caller is the aggregate's SM
734         if caller_hrn == aggregate and aggregate != api.hrn:
735             continue
736         interface = api.aggregates[aggregate]
737         server = api.get_server(interface, cred)
738         threads.run(_ListSlices, server, [cred], call_id)
739
740     # combime results
741     results = threads.get_results()
742     slices = []
743     for result in results:
744         slices.extend(result)
745
746     # cache the result
747     if caching and api.cache:
748         api.cache.add('slices', slices)
749
750     return slices
751
752
753 def get_ticket(api, xrn, creds, rspec, users):
754     slice_hrn, type = urn_to_hrn(xrn)
755     # get the netspecs contained within the clients rspec
756     aggregate_rspecs = {}
757     tree= etree.parse(StringIO(rspec))
758     elements = tree.findall('./network')
759     for element in elements:
760         aggregate_hrn = element.values()[0]
761         aggregate_rspecs[aggregate_hrn] = rspec 
762
763     # get the callers hrn
764     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
765     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
766
767     # attempt to use delegated credential first
768     cred = api.getDelegatedCredential(creds)
769     if not cred:
770         cred = api.getCredential() 
771     threads = ThreadManager()
772     for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
773         # prevent infinite loop. Dont send request back to caller
774         # unless the caller is the aggregate's SM
775         if caller_hrn == aggregate and aggregate != api.hrn:
776             continue
777 =======
778         results = threads.get_results()
779 >>>>>>> a3996bfa45298c8d0abfd58916221abba737441c:sfa/managers/slice_manager.py
780         
781         # gather information from each ticket 
782         rspec = None
783         initscripts = []
784         slivers = [] 
785         object_gid = None  
786         for result in results:
787             agg_ticket = SfaTicket(string=result)
788             attrs = agg_ticket.get_attributes()
789             if not object_gid:
790                 object_gid = agg_ticket.get_gid_object()
791             if not rspec:
792                 rspec = RSpec(agg_ticket.get_rspec())
793             else:
794                 rspec.version.merge(agg_ticket.get_rspec())
795             initscripts.extend(attrs.get('initscripts', [])) 
796             slivers.extend(attrs.get('slivers', [])) 
797         
798         # merge info
799         attributes = {'initscripts': initscripts,
800                      'slivers': slivers}
801         
802         # create a new ticket
803         ticket = SfaTicket(subject = slice_hrn)
804         ticket.set_gid_caller(api.auth.client_gid)
805         ticket.set_issuer(key=api.key, subject=api.hrn)
806         ticket.set_gid_object(object_gid)
807         ticket.set_pubkey(object_gid.get_pubkey())
808         #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
809         ticket.set_attributes(attributes)
810         ticket.set_rspec(rspec.toxml())
811         ticket.encode()
812         ticket.sign()          
813         return ticket.save_to_string(save_parents=True)
814     
815     def start_slice(self, api, xrn, creds):
816         hrn, type = urn_to_hrn(xrn)
817     
818         # get the callers hrn
819         valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
820         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
821     
822         # attempt to use delegated credential first
823         cred = api.getDelegatedCredential(creds)
824         if not cred:
825             cred = api.getCredential()
826         threads = ThreadManager()
827         for aggregate in api.aggregates:
828             # prevent infinite loop. Dont send request back to caller
829             # unless the caller is the aggregate's SM
830             if caller_hrn == aggregate and aggregate != api.hrn:
831                 continue
832             interface = api.aggregates[aggregate]
833             server = api.server_proxy(interface, cred)    
834             threads.run(server.Start, xrn, cred)
835         threads.get_results()    
836         return 1
837      
838     def stop_slice(self, api, xrn, creds):
839         hrn, type = urn_to_hrn(xrn)
840     
841         # get the callers hrn
842         valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
843         caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
844     
845         # attempt to use delegated credential first
846         cred = api.getDelegatedCredential(creds)
847         if not cred:
848             cred = api.getCredential()
849         threads = ThreadManager()
850         for aggregate in api.aggregates:
851             # prevent infinite loop. Dont send request back to caller
852             # unless the caller is the aggregate's SM
853             if caller_hrn == aggregate and aggregate != api.hrn:
854                 continue
855             interface = api.aggregates[aggregate]
856             server = api.server_proxy(interface, cred)
857             threads.run(server.Stop, xrn, cred)
858         threads.get_results()    
859         return 1
860     
861     def reset_slice(self, api, xrn):
862         """
863         Not implemented
864         """
865         return 1
866     
867     def shutdown(self, api, xrn, creds):
868         """
869         Not implemented   
870         """
871         return 1
872     
873     def status(self, api, xrn, creds):
874         """
875         Not implemented 
876         """
877         return 1
878