Small edit in order to test the git repo. NT.
[sfa.git] / sfa / managers / slice_manager_pl.py
1
2 import sys
3 import time,datetime
4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
7 from copy import copy
8 from lxml import etree
9
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.rspecs.pg_rspec import PGRSpec
19 from sfa.rspecs.sfa_rspec import SfaRSpec
20 from sfa.rspecs.rspec_converter import RSpecConverter
21 from sfa.rspecs.rspec_parser import parse_rspec    
22 from sfa.rspecs.rspec_version import RSpecVersion
23 from sfa.rspecs.sfa_rspec import sfa_rspec_version
24 from sfa.rspecs.pg_rspec import pg_rspec_ad_version, pg_rspec_request_version   
25 from sfa.util.policy import Policy
26 from sfa.util.prefixTree import prefixTree
27 from sfa.util.sfaticket import *
28 from sfa.trust.credential import Credential
29 from sfa.util.threadmanager import ThreadManager
30 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
31 import sfa.plc.peers as peers
32 from sfa.util.version import version_core
33 from sfa.util.callids import Callids
34
35 # we have specialized xmlrpclib.ServerProxy to remember the input url
36 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
37 def get_serverproxy_url (server):
38     try:
39         return server.url
40     except:
41         sfa_logger().warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
42         return server._ServerProxy__host + server._ServerProxy__handler 
43
44 def GetVersion(api):
45     # peers explicitly in aggregates.xml
46     peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems() 
47                    if peername != api.hrn])
48     xrn=Xrn (api.hrn)
49     request_rspec_versions = [dict(pg_rspec_request_version), dict(sfa_rspec_version)]
50     ad_rspec_versions = [dict(pg_rspec_ad_version), dict(sfa_rspec_version)]
51     version_more = {'interface':'slicemgr',
52                     'hrn' : xrn.get_hrn(),
53                     'urn' : xrn.get_urn(),
54                     'peers': peers,
55                     'request_rspec_versions': request_rspec_versions,
56                     'ad_rspec_versions': ad_rspec_versions,
57                     'default_ad_rspec': dict(sfa_rspec_version)
58                     }
59     sm_version=version_core(version_more)
60     # local aggregate if present needs to have localhost resolved
61     if api.hrn in api.aggregates:
62         local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
63         sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
64     return sm_version
65
66 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
67
68     def _CreateSliver(aggregate, xrn, credential, rspec, users, call_id):
69             # Need to call GetVersion at an aggregate to determine the supported 
70             # rspec type/format beofre calling CreateSliver at an Aggregate. 
71             # The Aggregate's verion info is cached 
72             server = api.aggregates[aggregate]
73             # get cached aggregate version
74             aggregate_version_key = 'version_'+ aggregate
75             aggregate_version = api.cache.get(aggregate_version_key)
76             if not aggregate_version:
77                 # get current aggregate version anc cache it for 24 hours
78                 aggregate_version = server.GetVersion()
79                 api.cache.add(aggregate_version_key, aggregate_version, 60 * 60 * 24)
80                 
81             if 'sfa' not in aggregate_version and 'geni_api' in aggregate_version:
82                 # sfa aggregtes support both sfa and pg rspecs, no need to convert
83                 # if aggregate supports sfa rspecs. othewise convert to pg rspec
84                 rspec = RSpecConverter.to_pg_rspec(rspec)
85
86             return server.CreateSliver(xrn, credential, rspec, users, call_id)
87                 
88
89     if Callids().already_handled(call_id): return ""
90
91     # Validate the RSpec against PlanetLab's schema --disabled for now
92     # The schema used here needs to aggregate the PL and VINI schemas
93     # schema = "/var/www/html/schemas/pl.rng"
94     rspec = parse_rspec(rspec_str)
95     schema = None
96     if schema:
97         rspec.validate(schema)
98
99     # attempt to use delegated credential first
100     credential = api.getDelegatedCredential(creds)
101     if not credential:
102         credential = api.getCredential()
103
104     # get the callers hrn
105     hrn, type = urn_to_hrn(xrn)
106     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
107     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
108     threads = ThreadManager()
109     for aggregate in api.aggregates:
110         # prevent infinite loop. Dont send request back to caller
111         # unless the caller is the aggregate's SM 
112         if caller_hrn == aggregate and aggregate != api.hrn:
113             continue
114             
115         # Just send entire RSpec to each aggregate
116         threads.run(_CreateSliver, aggregate, xrn, credential, rspec.toxml(), users, call_id)
117             
118     results = threads.get_results()
119     rspec = SfaRSpec()
120     for result in results:
121         rspec.merge(result)     
122     return rspec.toxml()
123
124 def RenewSliver(api, xrn, creds, expiration_time, call_id):
125     if Callids().already_handled(call_id): return True
126
127     (hrn, type) = urn_to_hrn(xrn)
128     # get the callers hrn
129     valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
130     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
131
132     # attempt to use delegated credential first
133     credential = api.getDelegatedCredential(creds)
134     if not credential:
135         credential = api.getCredential()
136     threads = ThreadManager()
137     for aggregate in api.aggregates:
138         # prevent infinite loop. Dont send request back to caller
139         # unless the caller is the aggregate's SM
140         if caller_hrn == aggregate and aggregate != api.hrn:
141             continue
142
143         server = api.aggregates[aggregate]
144         threads.run(server.RenewSliver, xrn, [credential], expiration_time, call_id)
145     # 'and' the results
146     return reduce (lambda x,y: x and y, threads.get_results() , True)
147
148 def get_ticket(api, xrn, creds, rspec, users):
149     slice_hrn, type = urn_to_hrn(xrn)
150     # get the netspecs contained within the clients rspec
151     aggregate_rspecs = {}
152     tree= etree.parse(StringIO(rspec))
153     elements = tree.findall('./network')
154     for element in elements:
155         aggregate_hrn = element.values()[0]
156         aggregate_rspecs[aggregate_hrn] = rspec 
157
158     # get the callers hrn
159     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
160     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
161
162     # attempt to use delegated credential first
163     credential = api.getDelegatedCredential(creds)
164     if not credential:
165         credential = api.getCredential() 
166     threads = ThreadManager()
167     for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
168         # prevent infinite loop. Dont send request back to caller
169         # unless the caller is the aggregate's SM
170         if caller_hrn == aggregate and aggregate != api.hrn:
171             continue
172         server = None
173         if aggregate in api.aggregates:
174             server = api.aggregates[aggregate]
175         else:
176             net_urn = hrn_to_urn(aggregate, 'authority')     
177             # we may have a peer that knows about this aggregate
178             for agg in api.aggregates:
179                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
180                 if not target_aggs or not 'hrn' in target_aggs[0]:
181                     continue
182                 # send the request to this address 
183                 url = target_aggs[0]['url']
184                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
185                 # aggregate found, no need to keep looping
186                 break   
187         if server is None:
188             continue 
189         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
190
191     results = threads.get_results()
192     
193     # gather information from each ticket 
194     rspecs = []
195     initscripts = []
196     slivers = [] 
197     object_gid = None  
198     for result in results:
199         agg_ticket = SfaTicket(string=result)
200         attrs = agg_ticket.get_attributes()
201         if not object_gid:
202             object_gid = agg_ticket.get_gid_object()
203         rspecs.append(agg_ticket.get_rspec())
204         initscripts.extend(attrs.get('initscripts', [])) 
205         slivers.extend(attrs.get('slivers', [])) 
206     
207     # merge info
208     attributes = {'initscripts': initscripts,
209                  'slivers': slivers}
210     merged_rspec = merge_rspecs(rspecs) 
211
212     # create a new ticket
213     ticket = SfaTicket(subject = slice_hrn)
214     ticket.set_gid_caller(api.auth.client_gid)
215     ticket.set_issuer(key=api.key, subject=api.hrn)
216     ticket.set_gid_object(object_gid)
217     ticket.set_pubkey(object_gid.get_pubkey())
218     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
219     ticket.set_attributes(attributes)
220     ticket.set_rspec(merged_rspec)
221     ticket.encode()
222     ticket.sign()          
223     return ticket.save_to_string(save_parents=True)
224
225
226 def DeleteSliver(api, xrn, creds, call_id):
227     if Callids().already_handled(call_id): return ""
228     (hrn, type) = urn_to_hrn(xrn)
229     # get the callers hrn
230     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
231     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
232
233     # attempt to use delegated credential first
234     credential = api.getDelegatedCredential(creds)
235     if not credential:
236         credential = api.getCredential()
237     threads = ThreadManager()
238     for aggregate in api.aggregates:
239         # prevent infinite loop. Dont send request back to caller
240         # unless the caller is the aggregate's SM
241         if caller_hrn == aggregate and aggregate != api.hrn:
242             continue
243         server = api.aggregates[aggregate]
244         threads.run(server.DeleteSliver, xrn, credential, call_id)
245     threads.get_results()
246     return 1
247
248 def start_slice(api, xrn, creds):
249     hrn, type = urn_to_hrn(xrn)
250
251     # get the callers hrn
252     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
253     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
254
255     # attempt to use delegated credential first
256     credential = api.getDelegatedCredential(creds)
257     if not credential:
258         credential = api.getCredential()
259     threads = ThreadManager()
260     for aggregate in api.aggregates:
261         # prevent infinite loop. Dont send request back to caller
262         # unless the caller is the aggregate's SM
263         if caller_hrn == aggregate and aggregate != api.hrn:
264             continue
265         server = api.aggregates[aggregate]
266         threads.run(server.Start, xrn, credential)
267     threads.get_results()    
268     return 1
269  
270 def stop_slice(api, xrn, creds):
271     hrn, type = urn_to_hrn(xrn)
272
273     # get the callers hrn
274     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
275     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
276
277     # attempt to use delegated credential first
278     credential = api.getDelegatedCredential(creds)
279     if not credential:
280         credential = api.getCredential()
281     threads = ThreadManager()
282     for aggregate in api.aggregates:
283         # prevent infinite loop. Dont send request back to caller
284         # unless the caller is the aggregate's SM
285         if caller_hrn == aggregate and aggregate != api.hrn:
286             continue
287         server = api.aggregates[aggregate]
288         threads.run(server.Stop, xrn, credential)
289     threads.get_results()    
290     return 1
291
292 def reset_slice(api, xrn):
293     """
294     Not implemented
295     """
296     return 1
297
298 def shutdown(api, xrn, creds):
299     """
300     Not implemented   
301     """
302     return 1
303
304 def status(api, xrn, creds):
305     """
306     Not implemented 
307     """
308     return 1
309
310 # Thierry : caching at the slicemgr level makes sense to some extent
311 caching=True
312 #caching=False
313 def ListSlices(api, creds, call_id):
314
315     if Callids().already_handled(call_id): return []
316
317     # look in cache first
318     if caching and api.cache:
319         slices = api.cache.get('slices')
320         if slices:
321             return slices    
322
323     # get the callers hrn
324     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
325     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
326
327     # attempt to use delegated credential first
328     credential = api.getDelegatedCredential(creds)
329     if not credential:
330         credential = api.getCredential()
331     threads = ThreadManager()
332     # fetch from aggregates
333     for aggregate in api.aggregates:
334         # prevent infinite loop. Dont send request back to caller
335         # unless the caller is the aggregate's SM
336         if caller_hrn == aggregate and aggregate != api.hrn:
337             continue
338         server = api.aggregates[aggregate]
339         threads.run(server.ListSlices, credential, call_id)
340
341     # combime results
342     results = threads.get_results()
343     slices = []
344     for result in results:
345         slices.extend(result)
346     
347     # cache the result
348     if caching and api.cache:
349         api.cache.add('slices', slices)
350
351     return slices
352
353
354 def ListResources(api, creds, options, call_id):
355
356     if Callids().already_handled(call_id): return ""
357
358     # get slice's hrn from options
359     xrn = options.get('geni_slice_urn', '')
360     (hrn, type) = urn_to_hrn(xrn)
361
362     # get the rspec's return format from options
363     rspec_version = RSpecVersion(options.get('rspec_version'))
364     version_string = "rspec_%s" % (rspec_version.get_version_name())
365
366     # look in cache first
367     if caching and api.cache and not xrn:
368         rspec =  api.cache.get(version_string)
369         if rspec:
370             return rspec
371
372     # get the callers hrn
373     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
374     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
375
376     # attempt to use delegated credential first
377     credential = api.getDelegatedCredential(creds)
378     if not credential:
379         credential = api.getCredential()
380     threads = ThreadManager()
381     for aggregate in api.aggregates:
382         # prevent infinite loop. Dont send request back to caller
383         # unless the caller is the aggregate's SM
384         if caller_hrn == aggregate and aggregate != api.hrn:
385             continue
386         # get the rspec from the aggregate
387         server = api.aggregates[aggregate]
388         my_opts = copy(options)
389         my_opts['geni_compressed'] = False
390         threads.run(server.ListResources, credential, my_opts, call_id)
391                     
392     results = threads.get_results()
393     rspec_version = RSpecVersion(my_opts.get('rspec_version'))
394     if rspec_version['type'] == pg_rspec_ad_version['type']:
395         rspec = PGRSpec()
396     else:
397         rspec = SfaRSpec()
398
399     for result in results:
400         try:
401             rspec.merge(result)
402         except:
403             api.logger.info("SM.ListResources: Failed to merge aggregate rspec")
404
405     # cache the result
406     if caching and api.cache and not xrn:
407         api.cache.add(version_string, rspec.toxml())
408  
409     return rspec.toxml()
410
411 # first draft at a merging SliverStatus
412 def SliverStatus(api, slice_xrn, creds, call_id):
413     if Callids().already_handled(call_id): return {}
414     # attempt to use delegated credential first
415     credential = api.getDelegatedCredential(creds)
416     if not credential:
417         credential = api.getCredential()
418     threads = ThreadManager()
419     for aggregate in api.aggregates:
420         server = api.aggregates[aggregate]
421         threads.run (server.SliverStatus, slice_xrn, credential, call_id)
422     results = threads.get_results()
423
424     # get rid of any void result - e.g. when call_id was hit where by convention we return {}
425     results = [ result for result in results if result and result['geni_resources']]
426
427     # do not try to combine if there's no result
428     if not results : return {}
429
430     # otherwise let's merge stuff
431     overall = {}
432
433     # mmh, it is expected that all results carry the same urn
434     overall['geni_urn'] = results[0]['geni_urn']
435     overall['pl_login'] = results[0]['pl_login']
436     # append all geni_resources
437     overall['geni_resources'] = \
438         reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
439     overall['status'] = 'unknown'
440     if overall['geni_resources']:
441         overall['status'] = 'ready'
442
443     return overall
444
445 def main():
446     r = RSpec()
447     r.parseFile(sys.argv[1])
448     rspec = r.toDict()
449     CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice')
450
451 if __name__ == "__main__":
452     main()
453