sfadump more usable
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import sys
5 import time,datetime
6 from StringIO import StringIO
7 from types import StringTypes
8 from copy import deepcopy
9 from copy import copy
10 from lxml import etree
11
12 from sfa.util.sfalogging import sfa_logger
13 from sfa.util.rspecHelper import merge_rspecs
14 from sfa.util.namespace import urn_to_hrn, hrn_to_urn
15 from sfa.util.rspec import *
16 from sfa.util.specdict import *
17 from sfa.util.faults import *
18 from sfa.util.record import SfaRecord
19 from sfa.util.policy import Policy
20 from sfa.util.prefixTree import prefixTree
21 from sfa.util.sfaticket import *
22 from sfa.trust.credential import Credential
23 from sfa.util.threadmanager import ThreadManager
24 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
25 import sfa.plc.peers as peers
26
27 def get_version():
28     version = {}
29     version['geni_api'] = 1
30     version['sfa'] = 1
31     return version
32
33 def slice_status(api, slice_xrn, creds ):
34     result = {}
35     result['geni_urn'] = slice_xrn
36     result['geni_status'] = 'unknown'
37     result['geni_resources'] = {}
38     return result
39
40 def create_slice(api, xrn, creds, rspec, users):
41     hrn, type = urn_to_hrn(xrn)
42
43     # Validate the RSpec against PlanetLab's schema --disabled for now
44     # The schema used here needs to aggregate the PL and VINI schemas
45     # schema = "/var/www/html/schemas/pl.rng"
46     schema = None
47     if schema:
48         try:
49             tree = etree.parse(StringIO(rspec))
50         except etree.XMLSyntaxError:
51             message = str(sys.exc_info()[1])
52             raise InvalidRSpec(message)
53
54         relaxng_doc = etree.parse(schema)
55         relaxng = etree.RelaxNG(relaxng_doc)
56         
57         if not relaxng(tree):
58             error = relaxng.error_log.last_error
59             message = "%s (line %s)" % (error.message, error.line)
60             raise InvalidRSpec(message)
61
62     # get the callers hrn
63     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
64     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
65
66     # attempt to use delegated credential first
67     credential = api.getDelegatedCredential(creds)
68     if not credential:     
69         credential = api.getCredential()
70     threads = ThreadManager()
71     for aggregate in api.aggregates:
72         # prevent infinite loop. Dont send request back to caller
73         # unless the caller is the aggregate's SM 
74         if caller_hrn == aggregate and aggregate != api.hrn:
75             continue
76             
77         # Just send entire RSpec to each aggregate
78         server = api.aggregates[aggregate]
79         threads.run(server.CreateSliver, xrn, credential, rspec, users)
80             
81     results = threads.get_results() 
82     merged_rspec = merge_rspecs(results)
83     return merged_rspec
84
85 def renew_slice(api, xrn, creds, expiration_time):
86     hrn, type = urn_to_hrn(xrn)
87
88     # get the callers hrn
89     valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
90     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
91
92     # attempt to use delegated credential first
93     credential = api.getDelegatedCredential(creds)
94     if not credential:
95         credential = api.getCredential()
96     threads = ThreadManager()
97     for aggregate in api.aggregates:
98         # prevent infinite loop. Dont send request back to caller
99         # unless the caller is the aggregate's SM
100         if caller_hrn == aggregate and aggregate != api.hrn:
101             continue
102
103         server = api.aggregates[aggregate]
104         threads.run(server.RenewSliver, xrn, credential, expiration_time)
105     threads.get_results()
106     return 1
107
108 def get_ticket(api, xrn, creds, rspec, users):
109     slice_hrn, type = urn_to_hrn(xrn)
110     # get the netspecs contained within the clients rspec
111     aggregate_rspecs = {}
112     tree= etree.parse(StringIO(rspec))
113     elements = tree.findall('./network')
114     for element in elements:
115         aggregate_hrn = element.values()[0]
116         aggregate_rspecs[aggregate_hrn] = rspec 
117
118     # get the callers hrn
119     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
120     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
121
122     # attempt to use delegated credential first
123     credential = api.getDelegatedCredential(creds)
124     if not credential:
125         credential = api.getCredential() 
126     threads = ThreadManager()
127     for aggregate, aggregate_rspec in aggregate_rspecs.items():
128         # prevent infinite loop. Dont send request back to caller
129         # unless the caller is the aggregate's SM
130         if caller_hrn == aggregate and aggregate != api.hrn:
131             continue
132         server = None
133         if aggregate in api.aggregates:
134             server = api.aggregates[aggregate]
135         else:
136             net_urn = hrn_to_urn(aggregate, 'authority')     
137             # we may have a peer that knows about this aggregate
138             for agg in api.aggregates:
139                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
140                 if not target_aggs or not 'hrn' in target_aggs[0]:
141                     continue
142                 # send the request to this address 
143                 url = target_aggs[0]['url']
144                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
145                 # aggregate found, no need to keep looping
146                 break   
147         if server is None:
148             continue 
149         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
150
151     results = threads.get_results()
152     
153     # gather information from each ticket 
154     rspecs = []
155     initscripts = []
156     slivers = [] 
157     object_gid = None  
158     for result in results:
159         agg_ticket = SfaTicket(string=result)
160         attrs = agg_ticket.get_attributes()
161         if not object_gid:
162             object_gid = agg_ticket.get_gid_object()
163         rspecs.append(agg_ticket.get_rspec())
164         initscripts.extend(attrs.get('initscripts', [])) 
165         slivers.extend(attrs.get('slivers', [])) 
166     
167     # merge info
168     attributes = {'initscripts': initscripts,
169                  'slivers': slivers}
170     merged_rspec = merge_rspecs(rspecs) 
171
172     # create a new ticket
173     ticket = SfaTicket(subject = slice_hrn)
174     ticket.set_gid_caller(api.auth.client_gid)
175     ticket.set_issuer(key=api.key, subject=api.hrn)
176     ticket.set_gid_object(object_gid)
177     ticket.set_pubkey(object_gid.get_pubkey())
178     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
179     ticket.set_attributes(attributes)
180     ticket.set_rspec(merged_rspec)
181     ticket.encode()
182     ticket.sign()          
183     return ticket.save_to_string(save_parents=True)
184
185
186 def delete_slice(api, xrn, creds):
187     hrn, type = urn_to_hrn(xrn)
188
189     # get the callers hrn
190     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
191     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
192
193     # attempt to use delegated credential first
194     credential = api.getDelegatedCredential(creds)
195     if not credential:
196         credential = api.getCredential()
197     threads = ThreadManager()
198     for aggregate in api.aggregates:
199         # prevent infinite loop. Dont send request back to caller
200         # unless the caller is the aggregate's SM
201         if caller_hrn == aggregate and aggregate != api.hrn:
202             continue
203         server = api.aggregates[aggregate]
204         threads.run(server.DeleteSliver, xrn, credential)
205     threads.get_results()
206     return 1
207
208 def start_slice(api, xrn, creds):
209     hrn, type = urn_to_hrn(xrn)
210
211     # get the callers hrn
212     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
213     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
214
215     # attempt to use delegated credential first
216     credential = api.getDelegatedCredential(creds)
217     if not credential:
218         credential = api.getCredential()
219     threads = ThreadManager()
220     for aggregate in api.aggregates:
221         # prevent infinite loop. Dont send request back to caller
222         # unless the caller is the aggregate's SM
223         if caller_hrn == aggregate and aggregate != api.hrn:
224             continue
225         server = api.aggregates[aggregate]
226         threads.run(server.Start, xrn, credential)
227     threads.get_results()    
228     return 1
229  
230 def stop_slice(api, xrn, creds):
231     hrn, type = urn_to_hrn(xrn)
232
233     # get the callers hrn
234     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
235     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
236
237     # attempt to use delegated credential first
238     credential = api.getDelegatedCredential(creds)
239     if not credential:
240         credential = api.getCredential()
241     threads = ThreadManager()
242     for aggregate in api.aggregates:
243         # prevent infinite loop. Dont send request back to caller
244         # unless the caller is the aggregate's SM
245         if caller_hrn == aggregate and aggregate != api.hrn:
246             continue
247         server = api.aggregates[aggregate]
248         threads.run(server.Stop, xrn, credential)
249     threads.get_results()    
250     return 1
251
252 def reset_slice(api, xrn):
253     """
254     Not implemented
255     """
256     return 1
257
258 def shutdown(api, xrn, creds):
259     """
260     Not implemented   
261     """
262     return 1
263
264 def status(api, xrn, creds):
265     """
266     Not implemented 
267     """
268     return 1
269
270 def get_slices(api, creds):
271
272     # look in cache first
273     if api.cache:
274         slices = api.cache.get('slices')
275         if slices:
276             return slices    
277
278     # get the callers hrn
279     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
280     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
281
282     # attempt to use delegated credential first
283     credential = api.getDelegatedCredential(creds)
284     if not credential:
285         credential = api.getCredential()
286     threads = ThreadManager()
287     # fetch from aggregates
288     for aggregate in api.aggregates:
289         # prevent infinite loop. Dont send request back to caller
290         # unless the caller is the aggregate's SM
291         if caller_hrn == aggregate and aggregate != api.hrn:
292             continue
293         server = api.aggregates[aggregate]
294         threads.run(server.ListSlices, credential)
295
296     # combime results
297     results = threads.get_results()
298     slices = []
299     for result in results:
300         slices.extend(result)
301     
302     # cache the result
303     if api.cache:
304         api.cache.add('slices', slices)
305
306     return slices
307  
308 def get_rspec(api, creds, options):
309     
310     # get slice's hrn from options
311     xrn = options.get('geni_slice_urn', None)
312     hrn, type = urn_to_hrn(xrn)
313
314     # get hrn of the original caller
315     origin_hrn = options.get('origin_hrn', None)
316     if not origin_hrn:
317         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
318     
319     # look in cache first 
320     if api.cache and not xrn:
321         rspec =  api.cache.get('nodes')
322         if rspec:
323             return rspec
324
325     hrn, type = urn_to_hrn(xrn)
326     rspec = None
327
328     # get the callers hrn
329     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
330     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
331
332     # attempt to use delegated credential first
333     credential = api.getDelegatedCredential(creds)
334     if not credential:
335         credential = api.getCredential()
336     threads = ThreadManager()
337     for aggregate in api.aggregates:
338         # prevent infinite loop. Dont send request back to caller
339         # unless the caller is the aggregate's SM
340         if caller_hrn == aggregate and aggregate != api.hrn:
341             continue
342         # get the rspec from the aggregate
343         server = api.aggregates[aggregate]
344         my_opts = copy(options)
345         my_opts['geni_compressed'] = False
346         threads.run(server.ListResources, credential, my_opts)
347         #threads.run(server.get_resources, cred, xrn, origin_hrn)
348                     
349     results = threads.get_results()
350     # combine the rspecs into a single rspec 
351     for agg_rspec in results:
352         try:
353             tree = etree.parse(StringIO(agg_rspec))
354         except etree.XMLSyntaxError:
355             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
356             raise InvalidRSpec(message)
357
358         root = tree.getroot()
359         if root.get("type") in ["SFA"]:
360             if rspec == None:
361                 rspec = root
362             else:
363                 for network in root.iterfind("./network"):
364                     rspec.append(deepcopy(network))
365                 for request in root.iterfind("./request"):
366                     rspec.append(deepcopy(request))
367     
368     sfa_logger().debug('get_rspec: rspec=%r'%rspec)
369     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
370     # cache the result
371     if api.cache and not xrn:
372         api.cache.add('nodes', rspec)
373  
374     return rspec
375
376 def main():
377     r = RSpec()
378     r.parseFile(sys.argv[1])
379     rspec = r.toDict()
380     create_slice(None,'plc.princeton.tmacktestslice',rspec)
381
382 if __name__ == "__main__":
383     main()
384