namespace module is gone, plxrn provides PL-specific translations
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import sys
5 import time,datetime
6 from StringIO import StringIO
7 from types import StringTypes
8 from copy import deepcopy
9 from copy import copy
10 from lxml import etree
11
12 from sfa.util.sfalogging import sfa_logger
13 from sfa.util.rspecHelper import merge_rspecs
14 from sfa.util.xrn import urn_to_hrn, hrn_to_urn
15 from sfa.util.rspec import *
16 from sfa.util.specdict import *
17 from sfa.util.faults import *
18 from sfa.util.record import SfaRecord
19 from sfa.util.policy import Policy
20 from sfa.util.prefixTree import prefixTree
21 from sfa.util.sfaticket import *
22 from sfa.trust.credential import Credential
23 from sfa.util.threadmanager import ThreadManager
24 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
25 import sfa.plc.peers as peers
26
27 def get_version():
28     version = {}
29     version['geni_api'] = 1
30     version['sfa'] = 1
31     return version
32
33 def slice_status(api, slice_xrn, creds ):
34     hrn, type = urn_to_hrn(slice_xrn)
35     # find out where this slice is currently running
36     api.logger.info(hrn)
37     slicename = hrn_to_pl_slicename(hrn)
38     api.logger.info("Checking status for %s" % slicename)
39     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
40     if len(slices) == 0:        
41         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
42     slice = slices[0]
43     
44     nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
45                                     ['hostname', 'boot_state', 'last_contact'])
46     api.logger.info(slice)
47     api.logger.info(nodes)
48     
49     result = {}
50     result['geni_urn'] = slice_xrn
51     result['geni_status'] = 'unknown'
52     result['pl_login'] = slice['name']
53     result['pl_expires'] = slice['expires']
54     
55     resources = []
56     
57     for node in nodes:
58         res = {}
59         res['pl_hostname'] = node['hostname']
60         res['pl_boot_state'] = node['boot_state']
61         res['pl_last_contact'] = node['last_contact']
62         res['geni_urn'] = ''
63         res['geni_status'] = 'unknown'
64         res['geni_error'] = ''
65
66         resources.append(res)
67         
68     result['geni_resources'] = resources
69     return result
70
71 def create_slice(api, xrn, creds, rspec, users):
72     hrn, type = urn_to_hrn(xrn)
73
74     # Validate the RSpec against PlanetLab's schema --disabled for now
75     # The schema used here needs to aggregate the PL and VINI schemas
76     # schema = "/var/www/html/schemas/pl.rng"
77     schema = None
78     if schema:
79         try:
80             tree = etree.parse(StringIO(rspec))
81         except etree.XMLSyntaxError:
82             message = str(sys.exc_info()[1])
83             raise InvalidRSpec(message)
84
85         relaxng_doc = etree.parse(schema)
86         relaxng = etree.RelaxNG(relaxng_doc)
87         
88         if not relaxng(tree):
89             error = relaxng.error_log.last_error
90             message = "%s (line %s)" % (error.message, error.line)
91             raise InvalidRSpec(message)
92
93     # get the callers hrn
94     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
95     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
96
97     # attempt to use delegated credential first
98     credential = api.getDelegatedCredential(creds)
99     if not credential:     
100         credential = api.getCredential()
101     threads = ThreadManager()
102     for aggregate in api.aggregates:
103         # prevent infinite loop. Dont send request back to caller
104         # unless the caller is the aggregate's SM 
105         if caller_hrn == aggregate and aggregate != api.hrn:
106             continue
107             
108         # Just send entire RSpec to each aggregate
109         server = api.aggregates[aggregate]
110         threads.run(server.CreateSliver, xrn, credential, rspec, users)
111             
112     results = threads.get_results() 
113     merged_rspec = merge_rspecs(results)
114     return merged_rspec
115
116 def renew_slice(api, xrn, creds, expiration_time):
117     hrn, type = urn_to_hrn(xrn)
118
119     # get the callers hrn
120     valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
121     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
122
123     # attempt to use delegated credential first
124     credential = api.getDelegatedCredential(creds)
125     if not credential:
126         credential = api.getCredential()
127     threads = ThreadManager()
128     for aggregate in api.aggregates:
129         # prevent infinite loop. Dont send request back to caller
130         # unless the caller is the aggregate's SM
131         if caller_hrn == aggregate and aggregate != api.hrn:
132             continue
133
134         server = api.aggregates[aggregate]
135         threads.run(server.RenewSliver, xrn, credential, expiration_time)
136     threads.get_results()
137     return 1
138
139 def get_ticket(api, xrn, creds, rspec, users):
140     slice_hrn, type = urn_to_hrn(xrn)
141     # get the netspecs contained within the clients rspec
142     aggregate_rspecs = {}
143     tree= etree.parse(StringIO(rspec))
144     elements = tree.findall('./network')
145     for element in elements:
146         aggregate_hrn = element.values()[0]
147         aggregate_rspecs[aggregate_hrn] = rspec 
148
149     # get the callers hrn
150     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
151     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
152
153     # attempt to use delegated credential first
154     credential = api.getDelegatedCredential(creds)
155     if not credential:
156         credential = api.getCredential() 
157     threads = ThreadManager()
158     for aggregate, aggregate_rspec in aggregate_rspecs.items():
159         # prevent infinite loop. Dont send request back to caller
160         # unless the caller is the aggregate's SM
161         if caller_hrn == aggregate and aggregate != api.hrn:
162             continue
163         server = None
164         if aggregate in api.aggregates:
165             server = api.aggregates[aggregate]
166         else:
167             net_urn = hrn_to_urn(aggregate, 'authority')     
168             # we may have a peer that knows about this aggregate
169             for agg in api.aggregates:
170                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
171                 if not target_aggs or not 'hrn' in target_aggs[0]:
172                     continue
173                 # send the request to this address 
174                 url = target_aggs[0]['url']
175                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
176                 # aggregate found, no need to keep looping
177                 break   
178         if server is None:
179             continue 
180         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
181
182     results = threads.get_results()
183     
184     # gather information from each ticket 
185     rspecs = []
186     initscripts = []
187     slivers = [] 
188     object_gid = None  
189     for result in results:
190         agg_ticket = SfaTicket(string=result)
191         attrs = agg_ticket.get_attributes()
192         if not object_gid:
193             object_gid = agg_ticket.get_gid_object()
194         rspecs.append(agg_ticket.get_rspec())
195         initscripts.extend(attrs.get('initscripts', [])) 
196         slivers.extend(attrs.get('slivers', [])) 
197     
198     # merge info
199     attributes = {'initscripts': initscripts,
200                  'slivers': slivers}
201     merged_rspec = merge_rspecs(rspecs) 
202
203     # create a new ticket
204     ticket = SfaTicket(subject = slice_hrn)
205     ticket.set_gid_caller(api.auth.client_gid)
206     ticket.set_issuer(key=api.key, subject=api.hrn)
207     ticket.set_gid_object(object_gid)
208     ticket.set_pubkey(object_gid.get_pubkey())
209     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
210     ticket.set_attributes(attributes)
211     ticket.set_rspec(merged_rspec)
212     ticket.encode()
213     ticket.sign()          
214     return ticket.save_to_string(save_parents=True)
215
216
217 def delete_slice(api, xrn, creds):
218     hrn, type = urn_to_hrn(xrn)
219
220     # get the callers hrn
221     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
222     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
223
224     # attempt to use delegated credential first
225     credential = api.getDelegatedCredential(creds)
226     if not credential:
227         credential = api.getCredential()
228     threads = ThreadManager()
229     for aggregate in api.aggregates:
230         # prevent infinite loop. Dont send request back to caller
231         # unless the caller is the aggregate's SM
232         if caller_hrn == aggregate and aggregate != api.hrn:
233             continue
234         server = api.aggregates[aggregate]
235         threads.run(server.DeleteSliver, xrn, credential)
236     threads.get_results()
237     return 1
238
239 def start_slice(api, xrn, creds):
240     hrn, type = urn_to_hrn(xrn)
241
242     # get the callers hrn
243     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
244     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
245
246     # attempt to use delegated credential first
247     credential = api.getDelegatedCredential(creds)
248     if not credential:
249         credential = api.getCredential()
250     threads = ThreadManager()
251     for aggregate in api.aggregates:
252         # prevent infinite loop. Dont send request back to caller
253         # unless the caller is the aggregate's SM
254         if caller_hrn == aggregate and aggregate != api.hrn:
255             continue
256         server = api.aggregates[aggregate]
257         threads.run(server.Start, xrn, credential)
258     threads.get_results()    
259     return 1
260  
261 def stop_slice(api, xrn, creds):
262     hrn, type = urn_to_hrn(xrn)
263
264     # get the callers hrn
265     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
266     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
267
268     # attempt to use delegated credential first
269     credential = api.getDelegatedCredential(creds)
270     if not credential:
271         credential = api.getCredential()
272     threads = ThreadManager()
273     for aggregate in api.aggregates:
274         # prevent infinite loop. Dont send request back to caller
275         # unless the caller is the aggregate's SM
276         if caller_hrn == aggregate and aggregate != api.hrn:
277             continue
278         server = api.aggregates[aggregate]
279         threads.run(server.Stop, xrn, credential)
280     threads.get_results()    
281     return 1
282
283 def reset_slice(api, xrn):
284     """
285     Not implemented
286     """
287     return 1
288
289 def shutdown(api, xrn, creds):
290     """
291     Not implemented   
292     """
293     return 1
294
295 def status(api, xrn, creds):
296     """
297     Not implemented 
298     """
299     return 1
300
301 def get_slices(api, creds):
302
303     # look in cache first
304     if api.cache:
305         slices = api.cache.get('slices')
306         if slices:
307             return slices    
308
309     # get the callers hrn
310     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
311     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
312
313     # attempt to use delegated credential first
314     credential = api.getDelegatedCredential(creds)
315     if not credential:
316         credential = api.getCredential()
317     threads = ThreadManager()
318     # fetch from aggregates
319     for aggregate in api.aggregates:
320         # prevent infinite loop. Dont send request back to caller
321         # unless the caller is the aggregate's SM
322         if caller_hrn == aggregate and aggregate != api.hrn:
323             continue
324         server = api.aggregates[aggregate]
325         threads.run(server.ListSlices, credential)
326
327     # combime results
328     results = threads.get_results()
329     slices = []
330     for result in results:
331         slices.extend(result)
332     
333     # cache the result
334     if api.cache:
335         api.cache.add('slices', slices)
336
337     return slices
338  
339 def get_rspec(api, creds, options):
340     
341     # get slice's hrn from options
342     xrn = options.get('geni_slice_urn', '')
343     hrn, type = urn_to_hrn(xrn)
344
345     # get hrn of the original caller
346     origin_hrn = options.get('origin_hrn', None)
347     if not origin_hrn:
348         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
349     
350     # look in cache first 
351     if api.cache and not xrn:
352         rspec =  api.cache.get('nodes')
353         if rspec:
354             return rspec
355
356     hrn, type = urn_to_hrn(xrn)
357     rspec = None
358
359     # get the callers hrn
360     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
361     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
362
363     # attempt to use delegated credential first
364     credential = api.getDelegatedCredential(creds)
365     if not credential:
366         credential = api.getCredential()
367     threads = ThreadManager()
368     for aggregate in api.aggregates:
369         # prevent infinite loop. Dont send request back to caller
370         # unless the caller is the aggregate's SM
371         if caller_hrn == aggregate and aggregate != api.hrn:
372             continue
373         # get the rspec from the aggregate
374         server = api.aggregates[aggregate]
375         my_opts = copy(options)
376         my_opts['geni_compressed'] = False
377         threads.run(server.ListResources, credential, my_opts)
378         #threads.run(server.get_resources, cred, xrn, origin_hrn)
379                     
380     results = threads.get_results()
381     # combine the rspecs into a single rspec 
382     for agg_rspec in results:
383         try:
384             tree = etree.parse(StringIO(agg_rspec))
385         except etree.XMLSyntaxError:
386             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
387             raise InvalidRSpec(message)
388
389         root = tree.getroot()
390         if root.get("type") in ["SFA"]:
391             if rspec == None:
392                 rspec = root
393             else:
394                 for network in root.iterfind("./network"):
395                     rspec.append(deepcopy(network))
396                 for request in root.iterfind("./request"):
397                     rspec.append(deepcopy(request))
398     
399     sfa_logger().debug('get_rspec: rspec=%r'%rspec)
400     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
401     # cache the result
402     if api.cache and not xrn:
403         api.cache.add('nodes', rspec)
404  
405     return rspec
406
407 def main():
408     r = RSpec()
409     r.parseFile(sys.argv[1])
410     rspec = r.toDict()
411     create_slice(None,'plc.princeton.tmacktestslice',rspec)
412
413 if __name__ == "__main__":
414     main()
415