05cd043a947bd99446a427d54d699bd1d7b1f03b
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspecHelper import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
26 from copy import copy
27
28 def get_version():
29     version = {}
30     version['geni_api'] = 1
31     version['sfa'] = 1
32     return version
33
34 def slice_status(api, slice_xrn, creds ):
35     result = {}
36     result['geni_urn'] = slice_xrn
37     result['geni_status'] = 'unknown'
38     result['geni_resources'] = {}
39     return result
40
41 def create_slice(api, xrn, creds, rspec, users):
42     hrn, type = urn_to_hrn(xrn)
43
44     # Validate the RSpec against PlanetLab's schema --disabled for now
45     # The schema used here needs to aggregate the PL and VINI schemas
46     # schema = "/var/www/html/schemas/pl.rng"
47     schema = None
48     if schema:
49         try:
50             tree = etree.parse(StringIO(rspec))
51         except etree.XMLSyntaxError:
52             message = str(sys.exc_info()[1])
53             raise InvalidRSpec(message)
54
55         relaxng_doc = etree.parse(schema)
56         relaxng = etree.RelaxNG(relaxng_doc)
57         
58         if not relaxng(tree):
59             error = relaxng.error_log.last_error
60             message = "%s (line %s)" % (error.message, error.line)
61             raise InvalidRSpec(message)
62
63     # get the callers hrn
64     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
65     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
66
67     # attempt to use delegated credential first
68     credential = api.getDelegatedCredential(creds)
69     if not credential:     
70         credential = api.getCredential()
71     threads = ThreadManager()
72     for aggregate in api.aggregates:
73         # prevent infinite loop. Dont send request back to caller
74         # unless the caller is the aggregate's SM 
75         if caller_hrn == aggregate and aggregate != api.hrn:
76             continue
77             
78         # Just send entire RSpec to each aggregate
79         server = api.aggregates[aggregate]
80         threads.run(server.CreateSliver, xrn, credential, rspec, users)
81             
82     results = threads.get_results() 
83     merged_rspec = merge_rspecs(results)
84     return merged_rspec
85
86 def renew_slice(api, xrn, creds, expiration_time):
87     # get the callers hrn
88     valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
89     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
90
91     # attempt to use delegated credential first
92     credential = api.getDelegatedCredential(creds)
93     if not credential:
94         credential = api.getCredential()
95     threads = ThreadManager()
96     for aggregate in api.aggregates:
97         # prevent infinite loop. Dont send request back to caller
98         # unless the caller is the aggregate's SM
99         if caller_hrn == aggregate and aggregate != api.hrn:
100             continue
101
102         server = api.aggregates[aggregate]
103         threads.run(server.RenewSliver, xrn, credential, expiration_time)
104     threads.get_results()
105     return 1
106
107 def get_ticket(api, xrn, creds, rspec, users):
108     slice_hrn, type = urn_to_hrn(xrn)
109     # get the netspecs contained within the clients rspec
110     aggregate_rspecs = {}
111     tree= etree.parse(StringIO(rspec))
112     elements = tree.findall('./network')
113     for element in elements:
114         aggregate_hrn = element.values()[0]
115         aggregate_rspecs[aggregate_hrn] = rspec 
116
117     # get the callers hrn
118     valid_cred = api.auth.checkCredentials(creds, 'getticket', hrn)[0]
119     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
120
121     # attempt to use delegated credential first
122     credential = api.getDelegatedCredential(creds)
123     if not credential:
124         credential = api.getCredential() 
125     threads = ThreadManager()
126     for aggregate, aggregate_rspec in aggregate_rspecs.items():
127         # prevent infinite loop. Dont send request back to caller
128         # unless the caller is the aggregate's SM
129         if caller_hrn == aggregate and aggregate != api.hrn:
130             continue
131         server = None
132         if aggregate in api.aggregates:
133             server = api.aggregates[aggregate]
134         else:
135             net_urn = hrn_to_urn(aggregate, 'authority')     
136             # we may have a peer that knows about this aggregate
137             for agg in api.aggregates:
138                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
139                 if not target_aggs or not 'hrn' in target_aggs[0]:
140                     continue
141                 # send the request to this address 
142                 url = target_aggs[0]['url']
143                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
144                 # aggregate found, no need to keep looping
145                 break   
146         if server is None:
147             continue 
148         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
149
150     results = threads.get_results()
151     
152     # gather information from each ticket 
153     rspecs = []
154     initscripts = []
155     slivers = [] 
156     object_gid = None  
157     for result in results:
158         agg_ticket = SfaTicket(string=result)
159         attrs = agg_ticket.get_attributes()
160         if not object_gid:
161             object_gid = agg_ticket.get_gid_object()
162         rspecs.append(agg_ticket.get_rspec())
163         initscripts.extend(attrs.get('initscripts', [])) 
164         slivers.extend(attrs.get('slivers', [])) 
165     
166     # merge info
167     attributes = {'initscripts': initscripts,
168                  'slivers': slivers}
169     merged_rspec = merge_rspecs(rspecs) 
170
171     # create a new ticket
172     ticket = SfaTicket(subject = slice_hrn)
173     ticket.set_gid_caller(api.auth.client_gid)
174     ticket.set_issuer(key=api.key, subject=api.hrn)
175     ticket.set_gid_object(object_gid)
176     ticket.set_pubkey(object_gid.get_pubkey())
177     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
178     ticket.set_attributes(attributes)
179     ticket.set_rspec(merged_rspec)
180     ticket.encode()
181     ticket.sign()          
182     return ticket.save_to_string(save_parents=True)
183
184
185 def delete_slice(api, xrn, creds):
186     # get the callers hrn
187     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
188     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
189
190     # attempt to use delegated credential first
191     credential = api.getDelegatedCredential(creds)
192     if not credential:
193         credential = api.getCredential()
194     threads = ThreadManager()
195     for aggregate in api.aggregates:
196         # prevent infinite loop. Dont send request back to caller
197         # unless the caller is the aggregate's SM
198         if caller_hrn == aggregate and aggregate != api.hrn:
199             continue
200         server = api.aggregates[aggregate]
201         threads.run(server.DeleteSliver, xrn, credential)
202     threads.get_results()
203     return 1
204
205 def start_slice(api, xrn, creds):
206     # get the callers hrn
207     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
208     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
209
210     # attempt to use delegated credential first
211     credential = api.getDelegatedCredential(creds)
212     if not credential:
213         credential = api.getCredential()
214     threads = ThreadManager()
215     for aggregate in api.aggregates:
216         # prevent infinite loop. Dont send request back to caller
217         # unless the caller is the aggregate's SM
218         if caller_hrn == aggregate and aggregate != api.hrn:
219             continue
220         server = api.aggregates[aggregate]
221         threads.run(server.Start, xrn, credential)
222     threads.get_results()    
223     return 1
224  
225 def stop_slice(api, xrn, creds):
226     # get the callers hrn
227     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
228     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
229
230     # attempt to use delegated credential first
231     credential = api.getDelegatedCredential(creds)
232     if not credential:
233         credential = api.getCredential()
234     threads = ThreadManager()
235     for aggregate in api.aggregates:
236         # prevent infinite loop. Dont send request back to caller
237         # unless the caller is the aggregate's SM
238         if caller_hrn == aggregate and aggregate != api.hrn:
239             continue
240         server = api.aggregates[aggregate]
241         threads.run(server.Stop, xrn, credential)
242     threads.get_results()    
243     return 1
244
245 def reset_slice(api, xrn):
246     """
247     Not implemented
248     """
249     return 1
250
251 def shutdown(api, xrn, creds):
252     """
253     Not implemented   
254     """
255     return 1
256
257 def status(api, xrn, creds):
258     """
259     Not implemented 
260     """
261     return 1
262
263 def get_slices(api, creds):
264
265     # look in cache first
266     if api.cache:
267         slices = api.cache.get('slices')
268         if slices:
269             return slices    
270
271     # get the callers hrn
272     valid_cred = api.auth.checkCredentials(creds, 'listslices', hrn)[0]
273     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
274
275     # attempt to use delegated credential first
276     credential = api.getDelegatedCredential(creds)
277     if not credential:
278         credential = api.getCredential()
279     threads = ThreadManager()
280     # fetch from aggregates
281     for aggregate in api.aggregates:
282         # prevent infinite loop. Dont send request back to caller
283         # unless the caller is the aggregate's SM
284         if caller_hrn == aggregate and aggregate != api.hrn:
285             continue
286         server = api.aggregates[aggregate]
287         threads.run(server.ListSlices, credential)
288
289     # combime results
290     results = threads.get_results()
291     slices = []
292     for result in results:
293         slices.extend(result)
294     
295     # cache the result
296     if api.cache:
297         api.cache.add('slices', slices)
298
299     return slices
300  
301 def get_rspec(api, creds, options):
302     
303     # get slice's hrn from options
304     xrn = options.get('geni_slice_urn', None)
305     hrn, type = urn_to_hrn(xrn)
306
307     # get hrn of the original caller
308     origin_hrn = options.get('origin_hrn', None)
309     if not origin_hrn:
310         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
311     
312     # look in cache first 
313     if api.cache and not xrn:
314         rspec =  api.cache.get('nodes')
315         if rspec:
316             return rspec
317
318     hrn, type = urn_to_hrn(xrn)
319     rspec = None
320
321     # get the callers hrn
322     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
323     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
324
325     # attempt to use delegated credential first
326     credential = api.getDelegatedCredential(creds)
327     if not credential:
328         credential = api.getCredential()
329     threads = ThreadManager()
330     for aggregate in api.aggregates:
331         # prevent infinite loop. Dont send request back to caller
332         # unless the caller is the aggregate's SM
333         if caller_hrn == aggregate and aggregate != api.hrn:
334             continue
335         # get the rspec from the aggregate
336         server = api.aggregates[aggregate]
337         my_opts = copy(options)
338         my_opts['geni_compressed'] = False
339         threads.run(server.ListResources, credential, my_opts)
340         #threads.run(server.get_resources, cred, xrn, origin_hrn)
341                     
342     results = threads.get_results()
343     # combine the rspecs into a single rspec 
344     for agg_rspec in results:
345         try:
346             tree = etree.parse(StringIO(agg_rspec))
347         except etree.XMLSyntaxError:
348             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
349             raise InvalidRSpec(message)
350
351         root = tree.getroot()
352         if root.get("type") in ["SFA"]:
353             if rspec == None:
354                 rspec = root
355             else:
356                 for network in root.iterfind("./network"):
357                     rspec.append(deepcopy(network))
358                 for request in root.iterfind("./request"):
359                     rspec.append(deepcopy(request))
360     
361     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
362     # cache the result
363     if api.cache and not xrn:
364         api.cache.add('nodes', rspec)
365  
366     return rspec
367
368 def main():
369     r = RSpec()
370     r.parseFile(sys.argv[1])
371     rspec = r.toDict()
372     create_slice(None,'plc.princeton.tmacktestslice',rspec)
373
374 if __name__ == "__main__":
375     main()
376