creds may be a single cred or list of creds
[sfa.git] / sfa / managers / slice_manager_pl.py
1
2 import sys
3 import time,datetime
4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
7 from copy import copy
8 from lxml import etree
9
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 import sfa.plc.peers as peers
25 from sfa.util.version import version_core
26
27 def GetVersion(api):
28     peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items() 
29                    if peername != api.hrn])
30     xrn=Xrn (api.hrn)
31     return version_core({'interface':'slicemgr',
32                          'hrn' : xrn.get_hrn(),
33                          'urn' : xrn.get_urn(),
34                          'peers': peers,
35                          })
36
37 def slice_status(api, slice_xrn, creds ):
38     hrn, type = urn_to_hrn(slice_xrn)
39     # find out where this slice is currently running
40     api.logger.info(hrn)
41     slicename = hrn_to_pl_slicename(hrn)
42     api.logger.info("Checking status for %s" % slicename)
43     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
44     if len(slices) == 0:        
45         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
46     slice = slices[0]
47     
48     nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
49                                     ['hostname', 'boot_state', 'last_contact'])
50     api.logger.info(slice)
51     api.logger.info(nodes)
52     
53     result = {}
54     result['geni_urn'] = slice_xrn
55     result['geni_status'] = 'unknown'
56     result['pl_login'] = slice['name']
57     result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
58     
59     resources = []
60     
61     for node in nodes:
62         res = {}
63         res['pl_hostname'] = node['hostname']
64         res['pl_boot_state'] = node['boot_state']
65         res['pl_last_contact'] = node['last_contact']
66         if not node['last_contact'] is None:
67             res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
68         res['geni_urn'] = ''
69         res['geni_status'] = 'unknown'
70         res['geni_error'] = ''
71
72         resources.append(res)
73         
74     result['geni_resources'] = resources
75     return result
76
77 def create_slice(api, xrn, creds, rspec, users):
78     hrn, type = urn_to_hrn(xrn)
79
80     # Validate the RSpec against PlanetLab's schema --disabled for now
81     # The schema used here needs to aggregate the PL and VINI schemas
82     # schema = "/var/www/html/schemas/pl.rng"
83     schema = None
84     if schema:
85         try:
86             tree = etree.parse(StringIO(rspec))
87         except etree.XMLSyntaxError:
88             message = str(sys.exc_info()[1])
89             raise InvalidRSpec(message)
90
91         relaxng_doc = etree.parse(schema)
92         relaxng = etree.RelaxNG(relaxng_doc)
93         
94         if not relaxng(tree):
95             error = relaxng.error_log.last_error
96             message = "%s (line %s)" % (error.message, error.line)
97             raise InvalidRSpec(message)
98
99     # get the callers hrn
100     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
101     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
102
103     # attempt to use delegated credential first
104     credential = api.getDelegatedCredential(creds)
105     if not credential:     
106         credential = api.getCredential()
107     threads = ThreadManager()
108     for aggregate in api.aggregates:
109         # prevent infinite loop. Dont send request back to caller
110         # unless the caller is the aggregate's SM 
111         if caller_hrn == aggregate and aggregate != api.hrn:
112             continue
113             
114         # Just send entire RSpec to each aggregate
115         server = api.aggregates[aggregate]
116         threads.run(server.CreateSliver, xrn, credential, rspec, users)
117             
118     results = threads.get_results() 
119     merged_rspec = merge_rspecs(results)
120     return merged_rspec
121
122 def renew_slice(api, xrn, creds, expiration_time):
123     hrn, type = urn_to_hrn(xrn)
124
125     # get the callers hrn
126     valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
127     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
128
129     # attempt to use delegated credential first
130     credential = api.getDelegatedCredential(creds)
131     if not credential:
132         credential = api.getCredential()
133     threads = ThreadManager()
134     for aggregate in api.aggregates:
135         # prevent infinite loop. Dont send request back to caller
136         # unless the caller is the aggregate's SM
137         if caller_hrn == aggregate and aggregate != api.hrn:
138             continue
139
140         server = api.aggregates[aggregate]
141         threads.run(server.RenewSliver, xrn, [credential], expiration_time)
142     threads.get_results()
143     return 1
144
145 def get_ticket(api, xrn, creds, rspec, users):
146     slice_hrn, type = urn_to_hrn(xrn)
147     # get the netspecs contained within the clients rspec
148     aggregate_rspecs = {}
149     tree= etree.parse(StringIO(rspec))
150     elements = tree.findall('./network')
151     for element in elements:
152         aggregate_hrn = element.values()[0]
153         aggregate_rspecs[aggregate_hrn] = rspec 
154
155     # get the callers hrn
156     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
157     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
158
159     # attempt to use delegated credential first
160     credential = api.getDelegatedCredential(creds)
161     if not credential:
162         credential = api.getCredential() 
163     threads = ThreadManager()
164     for aggregate, aggregate_rspec in aggregate_rspecs.items():
165         # prevent infinite loop. Dont send request back to caller
166         # unless the caller is the aggregate's SM
167         if caller_hrn == aggregate and aggregate != api.hrn:
168             continue
169         server = None
170         if aggregate in api.aggregates:
171             server = api.aggregates[aggregate]
172         else:
173             net_urn = hrn_to_urn(aggregate, 'authority')     
174             # we may have a peer that knows about this aggregate
175             for agg in api.aggregates:
176                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
177                 if not target_aggs or not 'hrn' in target_aggs[0]:
178                     continue
179                 # send the request to this address 
180                 url = target_aggs[0]['url']
181                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
182                 # aggregate found, no need to keep looping
183                 break   
184         if server is None:
185             continue 
186         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
187
188     results = threads.get_results()
189     
190     # gather information from each ticket 
191     rspecs = []
192     initscripts = []
193     slivers = [] 
194     object_gid = None  
195     for result in results:
196         agg_ticket = SfaTicket(string=result)
197         attrs = agg_ticket.get_attributes()
198         if not object_gid:
199             object_gid = agg_ticket.get_gid_object()
200         rspecs.append(agg_ticket.get_rspec())
201         initscripts.extend(attrs.get('initscripts', [])) 
202         slivers.extend(attrs.get('slivers', [])) 
203     
204     # merge info
205     attributes = {'initscripts': initscripts,
206                  'slivers': slivers}
207     merged_rspec = merge_rspecs(rspecs) 
208
209     # create a new ticket
210     ticket = SfaTicket(subject = slice_hrn)
211     ticket.set_gid_caller(api.auth.client_gid)
212     ticket.set_issuer(key=api.key, subject=api.hrn)
213     ticket.set_gid_object(object_gid)
214     ticket.set_pubkey(object_gid.get_pubkey())
215     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
216     ticket.set_attributes(attributes)
217     ticket.set_rspec(merged_rspec)
218     ticket.encode()
219     ticket.sign()          
220     return ticket.save_to_string(save_parents=True)
221
222
223 def delete_slice(api, xrn, creds):
224     hrn, type = urn_to_hrn(xrn)
225
226     # get the callers hrn
227     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
228     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
229
230     # attempt to use delegated credential first
231     credential = api.getDelegatedCredential(creds)
232     if not credential:
233         credential = api.getCredential()
234     threads = ThreadManager()
235     for aggregate in api.aggregates:
236         # prevent infinite loop. Dont send request back to caller
237         # unless the caller is the aggregate's SM
238         if caller_hrn == aggregate and aggregate != api.hrn:
239             continue
240         server = api.aggregates[aggregate]
241         threads.run(server.DeleteSliver, xrn, credential)
242     threads.get_results()
243     return 1
244
245 def start_slice(api, xrn, creds):
246     hrn, type = urn_to_hrn(xrn)
247
248     # get the callers hrn
249     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
250     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
251
252     # attempt to use delegated credential first
253     credential = api.getDelegatedCredential(creds)
254     if not credential:
255         credential = api.getCredential()
256     threads = ThreadManager()
257     for aggregate in api.aggregates:
258         # prevent infinite loop. Dont send request back to caller
259         # unless the caller is the aggregate's SM
260         if caller_hrn == aggregate and aggregate != api.hrn:
261             continue
262         server = api.aggregates[aggregate]
263         threads.run(server.Start, xrn, credential)
264     threads.get_results()    
265     return 1
266  
267 def stop_slice(api, xrn, creds):
268     hrn, type = urn_to_hrn(xrn)
269
270     # get the callers hrn
271     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
272     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
273
274     # attempt to use delegated credential first
275     credential = api.getDelegatedCredential(creds)
276     if not credential:
277         credential = api.getCredential()
278     threads = ThreadManager()
279     for aggregate in api.aggregates:
280         # prevent infinite loop. Dont send request back to caller
281         # unless the caller is the aggregate's SM
282         if caller_hrn == aggregate and aggregate != api.hrn:
283             continue
284         server = api.aggregates[aggregate]
285         threads.run(server.Stop, xrn, credential)
286     threads.get_results()    
287     return 1
288
289 def reset_slice(api, xrn):
290     """
291     Not implemented
292     """
293     return 1
294
295 def shutdown(api, xrn, creds):
296     """
297     Not implemented   
298     """
299     return 1
300
301 def status(api, xrn, creds):
302     """
303     Not implemented 
304     """
305     return 1
306
307 def get_slices(api, creds):
308
309     # look in cache first
310     if api.cache:
311         slices = api.cache.get('slices')
312         if slices:
313             return slices    
314
315     # get the callers hrn
316     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
317     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
318
319     # attempt to use delegated credential first
320     credential = api.getDelegatedCredential(creds)
321     if not credential:
322         credential = api.getCredential()
323     threads = ThreadManager()
324     # fetch from aggregates
325     for aggregate in api.aggregates:
326         # prevent infinite loop. Dont send request back to caller
327         # unless the caller is the aggregate's SM
328         if caller_hrn == aggregate and aggregate != api.hrn:
329             continue
330         server = api.aggregates[aggregate]
331         threads.run(server.ListSlices, credential)
332
333     # combime results
334     results = threads.get_results()
335     slices = []
336     for result in results:
337         slices.extend(result)
338     
339     # cache the result
340     if api.cache:
341         api.cache.add('slices', slices)
342
343     return slices
344  
345 def get_rspec(api, creds, options):
346     
347     # get slice's hrn from options
348     xrn = options.get('geni_slice_urn', '')
349     hrn, type = urn_to_hrn(xrn)
350
351     # get hrn of the original caller
352     origin_hrn = options.get('origin_hrn', None)
353     if not origin_hrn:
354         if isinstance(creds, list):
355             origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
356         else:
357             origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
358     
359     # look in cache first 
360     if api.cache and not xrn:
361         rspec =  api.cache.get('nodes')
362         if rspec:
363             return rspec
364
365     hrn, type = urn_to_hrn(xrn)
366     rspec = None
367
368     # get the callers hrn
369     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
370     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
371
372     # attempt to use delegated credential first
373     credential = api.getDelegatedCredential(creds)
374     if not credential:
375         credential = api.getCredential()
376     threads = ThreadManager()
377     for aggregate in api.aggregates:
378         # prevent infinite loop. Dont send request back to caller
379         # unless the caller is the aggregate's SM
380         if caller_hrn == aggregate and aggregate != api.hrn:
381             continue
382         # get the rspec from the aggregate
383         server = api.aggregates[aggregate]
384         my_opts = copy(options)
385         my_opts['geni_compressed'] = False
386         threads.run(server.ListResources, credential, my_opts)
387         #threads.run(server.get_resources, cred, xrn, origin_hrn)
388                     
389     results = threads.get_results()
390     # combine the rspecs into a single rspec 
391     for agg_rspec in results:
392         try:
393             tree = etree.parse(StringIO(agg_rspec))
394         except etree.XMLSyntaxError:
395             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
396             raise InvalidRSpec(message)
397
398         root = tree.getroot()
399         if root.get("type") in ["SFA"]:
400             if rspec == None:
401                 rspec = root
402             else:
403                 for network in root.iterfind("./network"):
404                     rspec.append(deepcopy(network))
405                 for request in root.iterfind("./request"):
406                     rspec.append(deepcopy(request))
407     
408     sfa_logger().debug('get_rspec: rspec=%r'%rspec)
409     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
410     # cache the result
411     if api.cache and not xrn:
412         api.cache.add('nodes', rspec)
413  
414     return rspec
415
416 def main():
417     r = RSpec()
418     r.parseFile(sys.argv[1])
419     rspec = r.toDict()
420     create_slice(None,'plc.princeton.tmacktestslice',rspec)
421
422 if __name__ == "__main__":
423     main()
424