GetVersion to expose urn together with hrn
[sfa.git] / sfa / managers / slice_manager_pl.py
1
2 import sys
3 import time,datetime
4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
7 from copy import copy
8 from lxml import etree
9
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.rspec import *
14 from sfa.util.specdict import *
15 from sfa.util.faults import *
16 from sfa.util.record import SfaRecord
17 from sfa.util.policy import Policy
18 from sfa.util.prefixTree import prefixTree
19 from sfa.util.sfaticket import *
20 from sfa.trust.credential import Credential
21 from sfa.util.threadmanager import ThreadManager
22 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
23 import sfa.plc.peers as peers
24 from sfa.util.version import version_core
25
26 def GetVersion(api):
27     peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items() 
28                    if peername != api.hrn])
29     xrn=Xrn (api.hrn)
30     return version_core({'interface':'slicemgr',
31                          'hrn' : xrn.get_hrn(),
32                          'urn' : xrn.get_urn(),
33                          'peers': peers,
34                          })
35
36 def slice_status(api, slice_xrn, creds ):
37     hrn, type = urn_to_hrn(slice_xrn)
38     # find out where this slice is currently running
39     api.logger.info(hrn)
40     slicename = hrn_to_pl_slicename(hrn)
41     api.logger.info("Checking status for %s" % slicename)
42     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
43     if len(slices) == 0:        
44         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
45     slice = slices[0]
46     
47     nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
48                                     ['hostname', 'boot_state', 'last_contact'])
49     api.logger.info(slice)
50     api.logger.info(nodes)
51     
52     result = {}
53     result['geni_urn'] = slice_xrn
54     result['geni_status'] = 'unknown'
55     result['pl_login'] = slice['name']
56     result['pl_expires'] = slice['expires']
57     
58     resources = []
59     
60     for node in nodes:
61         res = {}
62         res['pl_hostname'] = node['hostname']
63         res['pl_boot_state'] = node['boot_state']
64         res['pl_last_contact'] = node['last_contact']
65         res['geni_urn'] = ''
66         res['geni_status'] = 'unknown'
67         res['geni_error'] = ''
68
69         resources.append(res)
70         
71     result['geni_resources'] = resources
72     return result
73
74 def create_slice(api, xrn, creds, rspec, users):
75     hrn, type = urn_to_hrn(xrn)
76
77     # Validate the RSpec against PlanetLab's schema --disabled for now
78     # The schema used here needs to aggregate the PL and VINI schemas
79     # schema = "/var/www/html/schemas/pl.rng"
80     schema = None
81     if schema:
82         try:
83             tree = etree.parse(StringIO(rspec))
84         except etree.XMLSyntaxError:
85             message = str(sys.exc_info()[1])
86             raise InvalidRSpec(message)
87
88         relaxng_doc = etree.parse(schema)
89         relaxng = etree.RelaxNG(relaxng_doc)
90         
91         if not relaxng(tree):
92             error = relaxng.error_log.last_error
93             message = "%s (line %s)" % (error.message, error.line)
94             raise InvalidRSpec(message)
95
96     # get the callers hrn
97     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
98     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
99
100     # attempt to use delegated credential first
101     credential = api.getDelegatedCredential(creds)
102     if not credential:     
103         credential = api.getCredential()
104     threads = ThreadManager()
105     for aggregate in api.aggregates:
106         # prevent infinite loop. Dont send request back to caller
107         # unless the caller is the aggregate's SM 
108         if caller_hrn == aggregate and aggregate != api.hrn:
109             continue
110             
111         # Just send entire RSpec to each aggregate
112         server = api.aggregates[aggregate]
113         threads.run(server.CreateSliver, xrn, credential, rspec, users)
114             
115     results = threads.get_results() 
116     merged_rspec = merge_rspecs(results)
117     return merged_rspec
118
119 def renew_slice(api, xrn, creds, expiration_time):
120     hrn, type = urn_to_hrn(xrn)
121
122     # get the callers hrn
123     valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
124     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
125
126     # attempt to use delegated credential first
127     credential = api.getDelegatedCredential(creds)
128     if not credential:
129         credential = api.getCredential()
130     threads = ThreadManager()
131     for aggregate in api.aggregates:
132         # prevent infinite loop. Dont send request back to caller
133         # unless the caller is the aggregate's SM
134         if caller_hrn == aggregate and aggregate != api.hrn:
135             continue
136
137         server = api.aggregates[aggregate]
138         threads.run(server.RenewSliver, xrn, credential, expiration_time)
139     threads.get_results()
140     return 1
141
142 def get_ticket(api, xrn, creds, rspec, users):
143     slice_hrn, type = urn_to_hrn(xrn)
144     # get the netspecs contained within the clients rspec
145     aggregate_rspecs = {}
146     tree= etree.parse(StringIO(rspec))
147     elements = tree.findall('./network')
148     for element in elements:
149         aggregate_hrn = element.values()[0]
150         aggregate_rspecs[aggregate_hrn] = rspec 
151
152     # get the callers hrn
153     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
154     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
155
156     # attempt to use delegated credential first
157     credential = api.getDelegatedCredential(creds)
158     if not credential:
159         credential = api.getCredential() 
160     threads = ThreadManager()
161     for aggregate, aggregate_rspec in aggregate_rspecs.items():
162         # prevent infinite loop. Dont send request back to caller
163         # unless the caller is the aggregate's SM
164         if caller_hrn == aggregate and aggregate != api.hrn:
165             continue
166         server = None
167         if aggregate in api.aggregates:
168             server = api.aggregates[aggregate]
169         else:
170             net_urn = hrn_to_urn(aggregate, 'authority')     
171             # we may have a peer that knows about this aggregate
172             for agg in api.aggregates:
173                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
174                 if not target_aggs or not 'hrn' in target_aggs[0]:
175                     continue
176                 # send the request to this address 
177                 url = target_aggs[0]['url']
178                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
179                 # aggregate found, no need to keep looping
180                 break   
181         if server is None:
182             continue 
183         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
184
185     results = threads.get_results()
186     
187     # gather information from each ticket 
188     rspecs = []
189     initscripts = []
190     slivers = [] 
191     object_gid = None  
192     for result in results:
193         agg_ticket = SfaTicket(string=result)
194         attrs = agg_ticket.get_attributes()
195         if not object_gid:
196             object_gid = agg_ticket.get_gid_object()
197         rspecs.append(agg_ticket.get_rspec())
198         initscripts.extend(attrs.get('initscripts', [])) 
199         slivers.extend(attrs.get('slivers', [])) 
200     
201     # merge info
202     attributes = {'initscripts': initscripts,
203                  'slivers': slivers}
204     merged_rspec = merge_rspecs(rspecs) 
205
206     # create a new ticket
207     ticket = SfaTicket(subject = slice_hrn)
208     ticket.set_gid_caller(api.auth.client_gid)
209     ticket.set_issuer(key=api.key, subject=api.hrn)
210     ticket.set_gid_object(object_gid)
211     ticket.set_pubkey(object_gid.get_pubkey())
212     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
213     ticket.set_attributes(attributes)
214     ticket.set_rspec(merged_rspec)
215     ticket.encode()
216     ticket.sign()          
217     return ticket.save_to_string(save_parents=True)
218
219
220 def delete_slice(api, xrn, creds):
221     hrn, type = urn_to_hrn(xrn)
222
223     # get the callers hrn
224     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
225     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
226
227     # attempt to use delegated credential first
228     credential = api.getDelegatedCredential(creds)
229     if not credential:
230         credential = api.getCredential()
231     threads = ThreadManager()
232     for aggregate in api.aggregates:
233         # prevent infinite loop. Dont send request back to caller
234         # unless the caller is the aggregate's SM
235         if caller_hrn == aggregate and aggregate != api.hrn:
236             continue
237         server = api.aggregates[aggregate]
238         threads.run(server.DeleteSliver, xrn, credential)
239     threads.get_results()
240     return 1
241
242 def start_slice(api, xrn, creds):
243     hrn, type = urn_to_hrn(xrn)
244
245     # get the callers hrn
246     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
247     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
248
249     # attempt to use delegated credential first
250     credential = api.getDelegatedCredential(creds)
251     if not credential:
252         credential = api.getCredential()
253     threads = ThreadManager()
254     for aggregate in api.aggregates:
255         # prevent infinite loop. Dont send request back to caller
256         # unless the caller is the aggregate's SM
257         if caller_hrn == aggregate and aggregate != api.hrn:
258             continue
259         server = api.aggregates[aggregate]
260         threads.run(server.Start, xrn, credential)
261     threads.get_results()    
262     return 1
263  
264 def stop_slice(api, xrn, creds):
265     hrn, type = urn_to_hrn(xrn)
266
267     # get the callers hrn
268     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
269     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
270
271     # attempt to use delegated credential first
272     credential = api.getDelegatedCredential(creds)
273     if not credential:
274         credential = api.getCredential()
275     threads = ThreadManager()
276     for aggregate in api.aggregates:
277         # prevent infinite loop. Dont send request back to caller
278         # unless the caller is the aggregate's SM
279         if caller_hrn == aggregate and aggregate != api.hrn:
280             continue
281         server = api.aggregates[aggregate]
282         threads.run(server.Stop, xrn, credential)
283     threads.get_results()    
284     return 1
285
286 def reset_slice(api, xrn):
287     """
288     Not implemented
289     """
290     return 1
291
292 def shutdown(api, xrn, creds):
293     """
294     Not implemented   
295     """
296     return 1
297
298 def status(api, xrn, creds):
299     """
300     Not implemented 
301     """
302     return 1
303
304 def get_slices(api, creds):
305
306     # look in cache first
307     if api.cache:
308         slices = api.cache.get('slices')
309         if slices:
310             return slices    
311
312     # get the callers hrn
313     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
314     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
315
316     # attempt to use delegated credential first
317     credential = api.getDelegatedCredential(creds)
318     if not credential:
319         credential = api.getCredential()
320     threads = ThreadManager()
321     # fetch from aggregates
322     for aggregate in api.aggregates:
323         # prevent infinite loop. Dont send request back to caller
324         # unless the caller is the aggregate's SM
325         if caller_hrn == aggregate and aggregate != api.hrn:
326             continue
327         server = api.aggregates[aggregate]
328         threads.run(server.ListSlices, credential)
329
330     # combime results
331     results = threads.get_results()
332     slices = []
333     for result in results:
334         slices.extend(result)
335     
336     # cache the result
337     if api.cache:
338         api.cache.add('slices', slices)
339
340     return slices
341  
342 def get_rspec(api, creds, options):
343     
344     # get slice's hrn from options
345     xrn = options.get('geni_slice_urn', '')
346     hrn, type = urn_to_hrn(xrn)
347
348     # get hrn of the original caller
349     origin_hrn = options.get('origin_hrn', None)
350     if not origin_hrn:
351         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
352     
353     # look in cache first 
354     if api.cache and not xrn:
355         rspec =  api.cache.get('nodes')
356         if rspec:
357             return rspec
358
359     hrn, type = urn_to_hrn(xrn)
360     rspec = None
361
362     # get the callers hrn
363     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
364     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
365
366     # attempt to use delegated credential first
367     credential = api.getDelegatedCredential(creds)
368     if not credential:
369         credential = api.getCredential()
370     threads = ThreadManager()
371     for aggregate in api.aggregates:
372         # prevent infinite loop. Dont send request back to caller
373         # unless the caller is the aggregate's SM
374         if caller_hrn == aggregate and aggregate != api.hrn:
375             continue
376         # get the rspec from the aggregate
377         server = api.aggregates[aggregate]
378         my_opts = copy(options)
379         my_opts['geni_compressed'] = False
380         threads.run(server.ListResources, credential, my_opts)
381         #threads.run(server.get_resources, cred, xrn, origin_hrn)
382                     
383     results = threads.get_results()
384     # combine the rspecs into a single rspec 
385     for agg_rspec in results:
386         try:
387             tree = etree.parse(StringIO(agg_rspec))
388         except etree.XMLSyntaxError:
389             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
390             raise InvalidRSpec(message)
391
392         root = tree.getroot()
393         if root.get("type") in ["SFA"]:
394             if rspec == None:
395                 rspec = root
396             else:
397                 for network in root.iterfind("./network"):
398                     rspec.append(deepcopy(network))
399                 for request in root.iterfind("./request"):
400                     rspec.append(deepcopy(request))
401     
402     sfa_logger().debug('get_rspec: rspec=%r'%rspec)
403     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
404     # cache the result
405     if api.cache and not xrn:
406         api.cache.add('nodes', rspec)
407  
408     return rspec
409
410 def main():
411     r = RSpec()
412     r.parseFile(sys.argv[1])
413     rspec = r.toDict()
414     create_slice(None,'plc.princeton.tmacktestslice',rspec)
415
416 if __name__ == "__main__":
417     main()
418