d7d4e5794b11e2d1f8ca01b8616061c5d3e86f71
[sfa.git] / sfa / managers / slice_manager_pl.py
1 #
2 import sys
3 import time,datetime
4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
7 from copy import copy
8 from lxml import etree
9
10 from sfa.util.sfalogging import logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.rspecs.rspec_converter import RSpecConverter
19 from sfa.rspecs.version_manager import VersionManager
20 from sfa.rspecs.rspec import RSpec 
21 from sfa.util.policy import Policy
22 from sfa.util.prefixTree import prefixTree
23 from sfa.util.sfaticket import *
24 from sfa.trust.credential import Credential
25 from sfa.util.threadmanager import ThreadManager
26 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
27 import sfa.plc.peers as peers
28 from sfa.util.version import version_core
29 from sfa.util.callids import Callids
30
31
32 def _call_id_supported(api, server):
33     """
34     Returns true if server support the optional call_id arg, false otherwise.
35     """
36     server_version = api.get_cached_server_version(server)
37
38     if 'sfa' in server_version:
39         code_tag = server_version['code_tag']
40         code_tag_parts = code_tag.split("-")
41
42         version_parts = code_tag_parts[0].split(".")
43         major, minor = version_parts[0:2]
44         rev = code_tag_parts[1]
45         if int(major) > 1:
46             if int(minor) > 0 or int(rev) > 20:
47                 return True
48     return False
49
50 # we have specialized xmlrpclib.ServerProxy to remember the input url
51 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
52 def get_serverproxy_url (server):
53     try:
54         return server.url
55     except:
56         logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
57         return server._ServerProxy__host + server._ServerProxy__handler
58
59 def GetVersion(api):
60     # peers explicitly in aggregates.xml
61     peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
62                    if peername != api.hrn])
63     version_manager = VersionManager()
64     ad_rspec_versions = []
65     request_rspec_versions = []
66     for rspec_version in version_manager.versions:
67         if rspec_version in ['*', 'ad']:
68             request_rspec_versions.append(rspec_version.to_dict())
69         if rspec_version in ['*', 'request']:
70             request_rspec_version.append(rspec_version.to_dict())
71     default_rspec_version = version_manager.get_version("sfa 1").to_dict()
72     xrn=Xrn(api.hrn)
73     version_more = {'interface':'slicemgr',
74                     'hrn' : xrn.get_hrn(),
75                     'urn' : xrn.get_urn(),
76                     'peers': peers,
77                     'request_rspec_versions': request_rspec_versions,
78                     'ad_rspec_versions': ad_rspec_versions,
79                     'default_ad_rspec': default_rspec_version
80                     }
81     sm_version=version_core(version_more)
82     # local aggregate if present needs to have localhost resolved
83     if api.hrn in api.aggregates:
84         local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
85         sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
86     return sm_version
87
88 def drop_slicemgr_stats(rspec):
89     try:
90         stats_elements = rspec.xml.xpath('//statistics')
91         for node in stats_elements:
92             node.getparent().remove(node)
93     except Exception, e:
94         api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
95
96 def add_slicemgr_stat(rspec, callname, aggname, elapsed, status):
97     try:
98         stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
99         if stats_tags:
100             stats_tag = stats_tags[0]
101         else:
102             stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname)
103
104         etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
105     except Exception, e:
106         api.logger.warn("add_slicemgr_stat failed on  %s: %s" %(aggname, str(e)))
107
108 def ListResources(api, creds, options, call_id):
109     version_manager = VersionManager()
110     def _ListResources(aggregate, server, credential, opts, call_id):
111
112         my_opts = copy(opts)
113         args = [credential, my_opts]
114         tStart = time.time()
115         try:
116             if _call_id_supported(api, server):
117                 args.append(call_id)
118             version = api.get_cached_server_version(server)
119             # force ProtoGENI aggregates to give us a v2 RSpec
120             if 'sfa' not in version.keys():
121                 my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict()
122             rspec = server.ListResources(*args)
123             return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
124         except Exception, e:
125             api.logger.warn("ListResources failed at %s: %s" %(server.url, str(e)))
126             return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
127
128     if Callids().already_handled(call_id): return ""
129
130     # get slice's hrn from options
131     xrn = options.get('geni_slice_urn', '')
132     (hrn, type) = urn_to_hrn(xrn)
133     if 'geni_compressed' in options:
134         del(options['geni_compressed'])
135
136     # get the rspec's return format from options
137     rspec_version = version_manager.get_version(options.get('rspec_version'))
138     version_string = "rspec_%s" % (rspec_version.to_string())
139
140     # look in cache first
141     if caching and api.cache and not xrn:
142         rspec =  api.cache.get(version_string)
143         if rspec:
144             return rspec
145
146     # get the callers hrn
147     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
148     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
149
150     # attempt to use delegated credential first
151     credential = api.getDelegatedCredential(creds)
152     if not credential:
153         credential = api.getCredential()
154     credentials = [credential]
155     threads = ThreadManager()
156     for aggregate in api.aggregates:
157         # prevent infinite loop. Dont send request back to caller
158         # unless the caller is the aggregate's SM
159         if caller_hrn == aggregate and aggregate != api.hrn:
160             continue
161
162         # get the rspec from the aggregate
163         server = api.aggregates[aggregate]
164         threads.run(_ListResources, aggregate, server, credentials, options, call_id)
165
166     results = threads.get_results()
167     rspec_version = version_manager.get_version(options.get('rspec_version'))
168     manifest_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')    
169     rspec = RSpec(version=manifest_version)
170     for result in results:
171         add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
172         if result["status"]=="success":
173             try:
174                 rspec.version.merge(result["rspec"])
175             except:
176                 api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
177
178     # cache the result
179     if caching and api.cache and not xrn:
180         api.cache.add(version_string, rspec.toxml())
181
182     return rspec.toxml()
183
184
185 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
186
187     version_manager = VersionManager()
188     def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
189         tStart = time.time()
190         try:
191             # Need to call GetVersion at an aggregate to determine the supported
192             # rspec type/format beofre calling CreateSliver at an Aggregate.
193             server_version = api.get_cached_server_version(server)
194             if 'sfa' not in server_version and 'geni_api' in server_version:
195                 # sfa aggregtes support both sfa and pg rspecs, no need to convert
196                 # if aggregate supports sfa rspecs. otherwise convert to pg rspec
197                 rspec = RSpecConverter.to_pg_rspec(rspec)
198             args = [xrn, credential, rspec, users]
199             if _call_id_supported(api, server):
200                 args.append(call_id)
201             rspec = server.CreateSliver(*args)
202             return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
203         except: 
204             logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
205             return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
206
207     if Callids().already_handled(call_id): return ""
208     # Validate the RSpec against PlanetLab's schema --disabled for now
209     # The schema used here needs to aggregate the PL and VINI schemas
210     # schema = "/var/www/html/schemas/pl.rng"
211     rspec = RSpec(rspec_str)
212     schema = None
213     if schema:
214         rspec.validate(schema)
215
216     # if there is a <statistics> section, the aggregates don't care about it,
217     # so delete it.
218     drop_slicemgr_stats(rspec)
219
220     # attempt to use delegated credential first
221     credential = api.getDelegatedCredential(creds)
222     if not credential:
223         credential = api.getCredential()
224
225     # get the callers hrn
226     hrn, type = urn_to_hrn(xrn)
227     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
228     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
229     threads = ThreadManager()
230     for aggregate in api.aggregates:
231         # prevent infinite loop. Dont send request back to caller
232         # unless the caller is the aggregate's SM 
233         if caller_hrn == aggregate and aggregate != api.hrn:
234             continue
235         server = api.aggregates[aggregate]
236         # Just send entire RSpec to each aggregate
237         threads.run(_CreateSliver, aggregate, server, xrn, credential, rspec.toxml(), users, call_id)
238             
239     results = threads.get_results()
240     manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
241     result_rspec = RSpec(version=manifest_version)
242     for result in results:
243         add_slicemgr_stat(rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
244         if result["status"]=="success":
245             try:
246                 result_rspec.version.merge(result["rspec"])
247             except:
248                 api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
249     return rspec.toxml()
250
251 def RenewSliver(api, xrn, creds, expiration_time, call_id):
252     def _RenewSliver(server, xrn, creds, expiration_time, call_id):
253         server_version = api.get_cached_server_version(server)
254         args =  [xrn, creds, expiration_time, call_id]
255         if _call_id_supported(api, server):
256             args.append(call_id)
257         return server.RenewSliver(*args)
258
259     if Callids().already_handled(call_id): return True
260
261     (hrn, type) = urn_to_hrn(xrn)
262     # get the callers hrn
263     valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
264     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
265
266     # attempt to use delegated credential first
267     credential = api.getDelegatedCredential(creds)
268     if not credential:
269         credential = api.getCredential()
270     threads = ThreadManager()
271     for aggregate in api.aggregates:
272         # prevent infinite loop. Dont send request back to caller
273         # unless the caller is the aggregate's SM
274         if caller_hrn == aggregate and aggregate != api.hrn:
275             continue
276         server = api.aggregates[aggregate]
277         threads.run(_RenewSliver, server, xrn, [credential], expiration_time, call_id)
278     # 'and' the results
279     return reduce (lambda x,y: x and y, threads.get_results() , True)
280
281 def DeleteSliver(api, xrn, creds, call_id):
282     def _DeleteSliver(server, xrn, creds, call_id):
283         server_version = api.get_cached_server_version(server)
284         args =  [xrn, creds]
285         if _call_id_supported(api, server):
286             args.append(call_id)
287         return server.DeleteSliver(*args)
288
289     if Callids().already_handled(call_id): return ""
290     (hrn, type) = urn_to_hrn(xrn)
291     # get the callers hrn
292     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
293     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
294
295     # attempt to use delegated credential first
296     credential = api.getDelegatedCredential(creds)
297     if not credential:
298         credential = api.getCredential()
299     threads = ThreadManager()
300     for aggregate in api.aggregates:
301         # prevent infinite loop. Dont send request back to caller
302         # unless the caller is the aggregate's SM
303         if caller_hrn == aggregate and aggregate != api.hrn:
304             continue
305         server = api.aggregates[aggregate]
306         threads.run(_DeleteSliver, server, xrn, credential, call_id)
307     threads.get_results()
308     return 1
309
310
311 # first draft at a merging SliverStatus
312 def SliverStatus(api, slice_xrn, creds, call_id):
313     def _SliverStatus(server, xrn, creds, call_id):
314         server_version = api.get_cached_server_version(server)
315         args =  [xrn, creds]
316         if _call_id_supported(api, server):
317             args.append(call_id)
318         return server.SliverStatus(*args)
319     
320     if Callids().already_handled(call_id): return {}
321     # attempt to use delegated credential first
322     credential = api.getDelegatedCredential(creds)
323     if not credential:
324         credential = api.getCredential()
325     threads = ThreadManager()
326     for aggregate in api.aggregates:
327         server = api.aggregates[aggregate]
328         threads.run (_SliverStatus, server, slice_xrn, credential, call_id)
329     results = threads.get_results()
330
331     # get rid of any void result - e.g. when call_id was hit where by convention we return {}
332     results = [ result for result in results if result and result['geni_resources']]
333
334     # do not try to combine if there's no result
335     if not results : return {}
336
337     # otherwise let's merge stuff
338     overall = {}
339
340     # mmh, it is expected that all results carry the same urn
341     overall['geni_urn'] = results[0]['geni_urn']
342     overall['pl_login'] = results[0]['pl_login']
343     # append all geni_resources
344     overall['geni_resources'] = \
345         reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
346     overall['status'] = 'unknown'
347     if overall['geni_resources']:
348         overall['status'] = 'ready'
349
350     return overall
351
352 caching=True
353 #caching=False
354 def ListSlices(api, creds, call_id):
355     def _ListSlices(server, creds, call_id):
356         server_version = api.get_cached_server_version(server)
357         args =  [creds]
358         if _call_id_supported(api, server):
359             args.append(call_id)
360         return server.ListSlices(*args)
361
362     if Callids().already_handled(call_id): return []
363
364     # look in cache first
365     if caching and api.cache:
366         slices = api.cache.get('slices')
367         if slices:
368             return slices
369
370     # get the callers hrn
371     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
372     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
373
374     # attempt to use delegated credential first
375     credential = api.getDelegatedCredential(creds)
376     if not credential:
377         credential = api.getCredential()
378     threads = ThreadManager()
379     # fetch from aggregates
380     for aggregate in api.aggregates:
381         # prevent infinite loop. Dont send request back to caller
382         # unless the caller is the aggregate's SM
383         if caller_hrn == aggregate and aggregate != api.hrn:
384             continue
385         server = api.aggregates[aggregate]
386         threads.run(_ListSlices, server, credential, call_id)
387
388     # combime results
389     results = threads.get_results()
390     slices = []
391     for result in results:
392         slices.extend(result)
393
394     # cache the result
395     if caching and api.cache:
396         api.cache.add('slices', slices)
397
398     return slices
399
400
401 def get_ticket(api, xrn, creds, rspec, users):
402     slice_hrn, type = urn_to_hrn(xrn)
403     # get the netspecs contained within the clients rspec
404     aggregate_rspecs = {}
405     tree= etree.parse(StringIO(rspec))
406     elements = tree.findall('./network')
407     for element in elements:
408         aggregate_hrn = element.values()[0]
409         aggregate_rspecs[aggregate_hrn] = rspec 
410
411     # get the callers hrn
412     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
413     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
414
415     # attempt to use delegated credential first
416     credential = api.getDelegatedCredential(creds)
417     if not credential:
418         credential = api.getCredential() 
419     threads = ThreadManager()
420     for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
421         # prevent infinite loop. Dont send request back to caller
422         # unless the caller is the aggregate's SM
423         if caller_hrn == aggregate and aggregate != api.hrn:
424             continue
425         server = None
426         if aggregate in api.aggregates:
427             server = api.aggregates[aggregate]
428         else:
429             net_urn = hrn_to_urn(aggregate, 'authority')     
430             # we may have a peer that knows about this aggregate
431             for agg in api.aggregates:
432                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
433                 if not target_aggs or not 'hrn' in target_aggs[0]:
434                     continue
435                 # send the request to this address 
436                 url = target_aggs[0]['url']
437                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file, timeout=30)
438                 # aggregate found, no need to keep looping
439                 break   
440         if server is None:
441             continue 
442         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
443
444     results = threads.get_results()
445     
446     # gather information from each ticket 
447     rspecs = []
448     initscripts = []
449     slivers = [] 
450     object_gid = None  
451     for result in results:
452         agg_ticket = SfaTicket(string=result)
453         attrs = agg_ticket.get_attributes()
454         if not object_gid:
455             object_gid = agg_ticket.get_gid_object()
456         rspecs.append(agg_ticket.get_rspec())
457         initscripts.extend(attrs.get('initscripts', [])) 
458         slivers.extend(attrs.get('slivers', [])) 
459     
460     # merge info
461     attributes = {'initscripts': initscripts,
462                  'slivers': slivers}
463     merged_rspec = merge_rspecs(rspecs) 
464
465     # create a new ticket
466     ticket = SfaTicket(subject = slice_hrn)
467     ticket.set_gid_caller(api.auth.client_gid)
468     ticket.set_issuer(key=api.key, subject=api.hrn)
469     ticket.set_gid_object(object_gid)
470     ticket.set_pubkey(object_gid.get_pubkey())
471     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
472     ticket.set_attributes(attributes)
473     ticket.set_rspec(merged_rspec)
474     ticket.encode()
475     ticket.sign()          
476     return ticket.save_to_string(save_parents=True)
477
478 def start_slice(api, xrn, creds):
479     hrn, type = urn_to_hrn(xrn)
480
481     # get the callers hrn
482     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
483     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
484
485     # attempt to use delegated credential first
486     credential = api.getDelegatedCredential(creds)
487     if not credential:
488         credential = api.getCredential()
489     threads = ThreadManager()
490     for aggregate in api.aggregates:
491         # prevent infinite loop. Dont send request back to caller
492         # unless the caller is the aggregate's SM
493         if caller_hrn == aggregate and aggregate != api.hrn:
494             continue
495         server = api.aggregates[aggregate]
496         threads.run(server.Start, xrn, credential)
497     threads.get_results()    
498     return 1
499  
500 def stop_slice(api, xrn, creds):
501     hrn, type = urn_to_hrn(xrn)
502
503     # get the callers hrn
504     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
505     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
506
507     # attempt to use delegated credential first
508     credential = api.getDelegatedCredential(creds)
509     if not credential:
510         credential = api.getCredential()
511     threads = ThreadManager()
512     for aggregate in api.aggregates:
513         # prevent infinite loop. Dont send request back to caller
514         # unless the caller is the aggregate's SM
515         if caller_hrn == aggregate and aggregate != api.hrn:
516             continue
517         server = api.aggregates[aggregate]
518         threads.run(server.Stop, xrn, credential)
519     threads.get_results()    
520     return 1
521
522 def reset_slice(api, xrn):
523     """
524     Not implemented
525     """
526     return 1
527
528 def shutdown(api, xrn, creds):
529     """
530     Not implemented   
531     """
532     return 1
533
534 def status(api, xrn, creds):
535     """
536     Not implemented 
537     """
538     return 1
539
540 def main():
541     r = RSpec()
542     r.parseFile(sys.argv[1])
543     rspec = r.toDict()
544     CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice')
545
546 if __name__ == "__main__":
547     main()
548