Aggregate_manager_slab.py : changed extime in __get_registry_objects (to fix?)
[sfa.git] / sfa / managers / slice_manager_slab.py
1
2 import sys
3 import time,datetime
4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
7 from copy import copy
8 from lxml import etree
9
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.rspecs.pg_rspec import PGRSpec
19 from sfa.rspecs.sfa_rspec import SfaRSpec
20 from sfa.rspecs.rspec_converter import RSpecConverter
21 from sfa.rspecs.rspec_parser import parse_rspec    
22 from sfa.rspecs.rspec_version import RSpecVersion
23 from sfa.rspecs.sfa_rspec import sfa_rspec_version
24 from sfa.rspecs.pg_rspec import pg_rspec_ad_version, pg_rspec_request_version   
25 from sfa.util.policy import Policy
26 from sfa.util.prefixTree import prefixTree
27 from sfa.util.sfaticket import *
28 from sfa.trust.credential import Credential
29 from sfa.util.threadmanager import ThreadManager
30 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
31 import sfa.plc.peers as peers
32 from sfa.util.version import version_core
33 from sfa.util.callids import Callids
34
35 # we have specialized xmlrpclib.ServerProxy to remember the input url
36 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
37 def get_serverproxy_url (server):
38     try:
39         return server.url
40     except:
41         sfa_logger().warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
42         return server._ServerProxy__host + server._ServerProxy__handler 
43
44 def GetVersion(api):
45     # peers explicitly in aggregates.xml
46     peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() 
47                    if peername != api.hrn])
48     xrn=Xrn (api.hrn)
49     request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)]
50     ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)]
51     version_more = {'interface':'slicemgr',
52                     'hrn' : xrn.get_hrn(),
53                     'urn' : xrn.get_urn(),
54                     'peers': peers,
55                     'request_rspec_versions': request_rspec_versions,
56                     'ad_rspec_versions': ad_rspec_versions,
57                     'default_ad_rspec': dict(sfa_rspec_version)
58                     }
59     sm_version=version_core(version_more)
60     # local aggregate if present needs to have localhost resolved
61     if api.hrn in api.aggregates:
62         local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
63         sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
64     return sm_version
65  
66 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
67
68     def _CreateSliver(aggregate, xrn, credential, rspec, users, call_id):
69             # Need to call ParseVersion at an aggregate to determine the supported 
70             # rspec type/format beofre calling CreateSliver at an Aggregate. 
71             # The Aggregate's verion info is cached 
72             print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER _CreateSliver "
73             
74             server = api.aggregates[aggregate]
75             # get cached aggregate version
76             aggregate_version_key = 'version_'+ aggregate
77             aggregate_version = api.cache.get(aggregate_version_key)
78             print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER _CreateSliver aggregate_version WTF ? %s"%(aggregate_version )  
79             if aggregate_version is None:
80                 # get current aggregate version anc cache it for 24 hours 
81                 print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER It s browwwwwn server"  
82                 aggregate_version = server.GetVersion()
83                 print>>sys.stderr, " \r\n \t\t =======SLICE MANAGER _CreateSliver GET aggregate_version %s"%(aggregate_version )                
84                 api.cache.add(aggregate_version_key, aggregate_version, 60 * 60 * 24)
85                 
86             if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version:
87                 # sfa aggregtes support both sfa and pg rspecs, no need to convert
88                 # if aggregate supports sfa rspecs. othewise convert to pg rspec
89                 rspec = RSpecConverter.to_pg_rspec(rspec)
90
91             return server.CreateSliver(xrn, credential, rspec, users, call_id)
92                 
93
94     if Callids().already_handled(call_id): return ""
95
96     # Validate the RSpec against PlanetLab's schema --disabled for now
97     # The schema used here needs to aggregate the PL and VINI schemas
98     # schema = "/var/www/html/schemas/pl.rng"
99     rspec = parse_rspec(rspec_str)
100     schema = None
101     if schema:
102         rspec.validate(schema)
103
104     # attempt to use delegated credential first
105     credential = api.getDelegatedCredential(creds)
106     if not credential:
107         credential = api.getCredential()
108
109     # get the callers hrn
110     hrn, type = urn_to_hrn(xrn)
111     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
112     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
113     threads = ThreadManager()
114     for aggregate in api.aggregates:
115         # prevent infinite loop. Dont send request back to caller
116         # unless the caller is the aggregate's SM 
117         if caller_hrn == aggregate and aggregate != api.hrn:
118             continue
119             
120         # Just send entire RSpec to each aggregate
121         threads.run(_CreateSliver, aggregate, xrn, credential, rspec.toxml(), users, call_id)
122             
123     results = threads.get_results()
124     rspec = SfaRSpec()
125     for result in results:
126         rspec.merge(result)     
127     return rspec.toxml()
128
129 def RenewSliver(api, xrn, creds, expiration_time, call_id):
130     if Callids().already_handled(call_id): return True
131
132     (hrn, type) = urn_to_hrn(xrn)
133     # get the callers hrn
134     valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
135     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
136
137     # attempt to use delegated credential first
138     credential = api.getDelegatedCredential(creds)
139     if not credential:
140         credential = api.getCredential()
141     threads = ThreadManager()
142     for aggregate in api.aggregates:
143         # prevent infinite loop. Dont send request back to caller
144         # unless the caller is the aggregate's SM
145         if caller_hrn == aggregate and aggregate != api.hrn:
146             continue
147
148         server = api.aggregates[aggregate]
149         threads.run(server.RenewSliver, xrn, [credential], expiration_time, call_id)
150     # 'and' the results
151     return reduce (lambda x,y: x and y, threads.get_results() , True)
152
153 def get_ticket(api, xrn, creds, rspec, users):
154     slice_hrn, type = urn_to_hrn(xrn)
155     # get the netspecs contained within the clients rspec
156     aggregate_rspecs = {}
157     tree= etree.parse(StringIO(rspec))
158     elements = tree.findall('./network')
159     for element in elements:
160         aggregate_hrn = element.values()[0]
161         aggregate_rspecs[aggregate_hrn] = rspec 
162
163     # get the callers hrn
164     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
165     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
166
167     # attempt to use delegated credential first
168     credential = api.getDelegatedCredential(creds)
169     if not credential:
170         credential = api.getCredential() 
171     threads = ThreadManager()
172     for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
173         # prevent infinite loop. Dont send request back to caller
174         # unless the caller is the aggregate's SM
175         if caller_hrn == aggregate and aggregate != api.hrn:
176             continue
177         server = None
178         if aggregate in api.aggregates:
179             server = api.aggregates[aggregate]
180         else:
181             net_urn = hrn_to_urn(aggregate, 'authority')     
182             # we may have a peer that knows about this aggregate
183             for agg in api.aggregates:
184                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
185                 if not target_aggs or not 'hrn' in target_aggs[0]:
186                     continue
187                 # send the request to this address 
188                 url = target_aggs[0]['url']
189                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
190                 # aggregate found, no need to keep looping
191                 break   
192         if server is None:
193             continue 
194         threads.run(server.ParseTicket, xrn, credential, aggregate_rspec, users)
195
196     results = threads.get_results()
197     
198     # gather information from each ticket 
199     rspecs = []
200     initscripts = []
201     slivers = [] 
202     object_gid = None  
203     for result in results:
204         agg_ticket = SfaTicket(string=result)
205         attrs = agg_ticket.get_attributes()
206         if not object_gid:
207             object_gid = agg_ticket.get_gid_object()
208         rspecs.append(agg_ticket.get_rspec())
209         initscripts.extend(attrs.get('initscripts', [])) 
210         slivers.extend(attrs.get('slivers', [])) 
211     
212     # merge info
213     attributes = {'initscripts': initscripts,
214                  'slivers': slivers}
215     merged_rspec = merge_rspecs(rspecs) 
216
217     # create a new ticket
218     ticket = SfaTicket(subject = slice_hrn)
219     ticket.set_gid_caller(api.auth.client_gid)
220     ticket.set_issuer(key=api.key, subject=api.hrn)
221     ticket.set_gid_object(object_gid)
222     ticket.set_pubkey(object_gid.get_pubkey())
223     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
224     ticket.set_attributes(attributes)
225     ticket.set_rspec(merged_rspec)
226     ticket.encode()
227     ticket.sign()          
228     return ticket.save_to_string(save_parents=True)
229
230
231 def DeleteSliver(api, xrn, creds, call_id):
232     if Callids().already_handled(call_id): return ""
233     (hrn, type) = urn_to_hrn(xrn)
234     # get the callers hrn
235     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
236     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
237
238     # attempt to use delegated credential first
239     credential = api.getDelegatedCredential(creds)
240     if not credential:
241         credential = api.getCredential()
242     threads = ThreadManager()
243     for aggregate in api.aggregates:
244         # prevent infinite loop. Dont send request back to caller
245         # unless the caller is the aggregate's SM
246         if caller_hrn == aggregate and aggregate != api.hrn:
247             continue
248         server = api.aggregates[aggregate]
249         threads.run(server.DeleteSliver, xrn, credential, call_id)
250     threads.get_results()
251     return 1
252
253 def start_slice(api, xrn, creds):
254     hrn, type = urn_to_hrn(xrn)
255
256     # get the callers hrn
257     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
258     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
259
260     # attempt to use delegated credential first
261     credential = api.getDelegatedCredential(creds)
262     if not credential:
263         credential = api.getCredential()
264     threads = ThreadManager()
265     for aggregate in api.aggregates:
266         # prevent infinite loop. Dont send request back to caller
267         # unless the caller is the aggregate's SM
268         if caller_hrn == aggregate and aggregate != api.hrn:
269             continue
270         server = api.aggregates[aggregate]
271         threads.run(server.Start, xrn, credential)
272     threads.get_results()    
273     return 1
274  
275 def stop_slice(api, xrn, creds):
276     hrn, type = urn_to_hrn(xrn)
277
278     # get the callers hrn
279     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
280     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
281
282     # attempt to use delegated credential first
283     credential = api.getDelegatedCredential(creds)
284     if not credential:
285         credential = api.getCredential()
286     threads = ThreadManager()
287     for aggregate in api.aggregates:
288         # prevent infinite loop. Dont send request back to caller
289         # unless the caller is the aggregate's SM
290         if caller_hrn == aggregate and aggregate != api.hrn:
291             continue
292         server = api.aggregates[aggregate]
293         threads.run(server.Stop, xrn, credential)
294     threads.get_results()    
295     return 1
296
297 def reset_slice(api, xrn):
298     """
299     Not implemented
300     """
301     return 1
302
303 def shutdown(api, xrn, creds):
304     """
305     Not implemented   
306     """
307     return 1
308
309 def status(api, xrn, creds):
310     """
311     Not implemented 
312     """
313     return 1
314
315 # Thierry : caching at the slicemgr level makes sense to some extent
316 #caching=True
317 caching=False
318 def ListSlices(api, creds, call_id):
319
320     if Callids().already_handled(call_id): return []
321
322     # look in cache first
323     if caching and api.cache:
324         slices = api.cache.get('slices')
325         if slices:
326             return slices    
327
328     # get the callers hrn
329     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
330     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
331
332     # attempt to use delegated credential first
333     credential = api.getDelegatedCredential(creds)
334     if not credential:
335         credential = api.getCredential()
336     threads = ThreadManager()
337     # fetch from aggregates
338     for aggregate in api.aggregates:
339         # prevent infinite loop. Dont send request back to caller
340         # unless the caller is the aggregate's SM
341         if caller_hrn == aggregate and aggregate != api.hrn:
342             continue
343         server = api.aggregates[aggregate]
344         threads.run(server.ListSlices, credential, call_id)
345
346     # combime results
347     results = threads.get_results()
348     slices = []
349     for result in results:
350         slices.extend(result)
351     
352     # cache the result
353     if caching and api.cache:
354         api.cache.add('slices', slices)
355
356     return slices
357
358
359 def ListResources(api, creds, options, call_id):
360
361     if Callids().already_handled(call_id): return ""
362
363     # get slice's hrn from options
364     xrn = options.get('geni_slice_urn', '')
365     (hrn, type) = urn_to_hrn(xrn)
366     print >>sys.stderr, " SM_ListResources xrn " , xrn
367     #print >>sys.stderr, " SM ListResources api.__dict__ " , api.__dict__.keys()
368     #print >>sys.stderr, " SM ListResources dir(api)" , dir(api)
369     print >>sys.stderr, "  \r\n avant RspecVersion \r\n \r\n"
370     # get the rspec's return format from options
371     rspec_version = RSpecVersion(options.get('rspec_version'))
372     print >>sys.stderr, " \r\n \r\n ListResources RSpecVersion ", rspec_version
373     version_string = "rspec_%s" % (rspec_version.get_version_name())
374
375     #panos adding the info option to the caching key (can be improved)
376     if options.get('info'):
377         version_string = version_string + "_"+options.get('info')
378    
379     print>>sys.stderr,"version string = ",version_string
380
381     # look in cache first
382     if caching and api.cache and not xrn:
383         print>>sys.stderr," \r\n  caching %s and api.cache %s and not xrn %s"%(caching , api.cache,xrn) 
384         rspec =  api.cache.get(version_string)
385         if rspec:
386             return rspec
387
388     # get the callers hrn
389     print >>sys.stderr, " SM ListResources get the callers hrn "
390     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
391     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
392     print >>sys.stderr, " \r\n SM ListResources get the callers caller_hrn hrn  %s "%(caller_hrn)
393     # attempt to use delegated credential first
394     credential = api.getDelegatedCredential(creds)
395     print >>sys.stderr, " \r\n SM ListResources get the callers credential  %s "%(credential) 
396     if not credential:
397         credential = api.getCredential()
398     threads = ThreadManager()
399     print >>sys.stderr, " \r\n SM ListResources get the callers api.aggregates  %s "%(api.aggregates) 
400     for aggregate in api.aggregates:
401         # prevent infinite loop. Dont send request back to caller
402         # unless the caller is the aggregate's SM
403         if caller_hrn == aggregate and aggregate != api.hrn:
404             continue
405         # get the rspec from the aggregate
406         server = api.aggregates[aggregate]
407         print >>sys.stderr, " Slice Mgr ListResources, server" ,server
408         my_opts = copy(options)
409         my_opts['geni_compressed'] = False
410         threads.run(server.ListResources, credential, my_opts, call_id)
411         print >>sys.stderr, "\r\n  !!!!!!!!!!!!!!!! \r\n"       
412     results = threads.get_results()
413     #results.append(open('/root/protogeni.rspec', 'r').read())
414     rspec_version = RSpecVersion(my_opts.get('rspec_version'))
415     if rspec_version['type'].lower() == 'protogeni':
416         rspec = PGRSpec()
417     else:
418         rspec = SfaRSpec()
419
420     for result in results:
421         print >>sys.stderr, "\r\n  slice_manager  result"   , result
422         try:
423             print >>sys.stderr, "avant merge"  , rspec         
424             rspec.merge(result)        
425             print >>sys.stderr, "AFTERMERGE" , rspec
426         except:
427             raise
428             api.logger.info("SM.ListResources: Failed to merge aggregate rspec")
429
430     # cache the result
431     if caching and api.cache and not xrn:
432         api.cache.add(version_string, rspec.toxml())
433
434     print >>sys.stderr, "\r\n  slice_manager  \r\n"   , rspec
435     return rspec.toxml()
436
437 # first draft at a merging SliverStatus
438 def SliverStatus(api, slice_xrn, creds, call_id):
439     if Callids().already_handled(call_id): return {}
440     # attempt to use delegated credential first
441     credential = api.getDelegatedCredential(creds)
442     if not credential:
443         credential = api.getCredential()
444     threads = ThreadManager()
445     for aggregate in api.aggregates:
446         server = api.aggregates[aggregate]
447         threads.run (server.SliverStatus, slice_xrn, credential, call_id)
448     results = threads.get_results()
449
450     # get rid of any void result - e.g. when call_id was hit where by convention we return {}
451     results = [ result for result in results if result and result['geni_resources']]
452
453     # do not try to combine if there's no result
454     if not results : return {}
455
456     # otherwise let's merge stuff
457     overall = {}
458
459     # mmh, it is expected that all results carry the same urn
460     overall['geni_urn'] = results[0]['geni_urn']
461
462     # consolidate geni_status - simple model using max on a total order
463     states = [ 'ready', 'configuring', 'failed', 'unknown' ]
464     # hash name to index
465     shash = dict ( zip ( states, range(len(states)) ) )
466     def combine_status (x,y):
467         return shash [ max (shash(x),shash(y)) ]
468     overall['geni_status'] = reduce (combine_status, [ result['geni_status'] for result in results], 'ready' )
469
470     # {'ready':0,'configuring':1,'failed':2,'unknown':3}
471     # append all geni_resources
472     overall['geni_resources'] = \
473         reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
474
475     return overall
476
477 def main():
478     r = RSpec()
479     r.parseFile(sys.argv[1])
480     rspec = r.toDict()
481     CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice')
482
483 if __name__ == "__main__":
484     main()
485