extend GetVersion to expose:
[sfa.git] / sfa / managers / slice_manager_pl.py
1
2 import sys
3 import time,datetime
4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
7 from copy import copy
8 from lxml import etree
9
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import urn_to_hrn, hrn_to_urn
13 from sfa.util.rspec import *
14 from sfa.util.specdict import *
15 from sfa.util.faults import *
16 from sfa.util.record import SfaRecord
17 from sfa.util.policy import Policy
18 from sfa.util.prefixTree import prefixTree
19 from sfa.util.sfaticket import *
20 from sfa.trust.credential import Credential
21 from sfa.util.threadmanager import ThreadManager
22 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
23 import sfa.plc.peers as peers
24 from sfa.util.version import version_core
25
26 def GetVersion(api):
27     peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items() 
28                    if peername != api.hrn])
29     return version_core({'interface':'slicemgr',
30                          'hrn' : api.hrn,
31                          'peers': peers,
32                          })
33
34 def slice_status(api, slice_xrn, creds ):
35     hrn, type = urn_to_hrn(slice_xrn)
36     # find out where this slice is currently running
37     api.logger.info(hrn)
38     slicename = hrn_to_pl_slicename(hrn)
39     api.logger.info("Checking status for %s" % slicename)
40     slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
41     if len(slices) == 0:        
42         raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
43     slice = slices[0]
44     
45     nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
46                                     ['hostname', 'boot_state', 'last_contact'])
47     api.logger.info(slice)
48     api.logger.info(nodes)
49     
50     result = {}
51     result['geni_urn'] = slice_xrn
52     result['geni_status'] = 'unknown'
53     result['pl_login'] = slice['name']
54     result['pl_expires'] = slice['expires']
55     
56     resources = []
57     
58     for node in nodes:
59         res = {}
60         res['pl_hostname'] = node['hostname']
61         res['pl_boot_state'] = node['boot_state']
62         res['pl_last_contact'] = node['last_contact']
63         res['geni_urn'] = ''
64         res['geni_status'] = 'unknown'
65         res['geni_error'] = ''
66
67         resources.append(res)
68         
69     result['geni_resources'] = resources
70     return result
71
72 def create_slice(api, xrn, creds, rspec, users):
73     hrn, type = urn_to_hrn(xrn)
74
75     # Validate the RSpec against PlanetLab's schema --disabled for now
76     # The schema used here needs to aggregate the PL and VINI schemas
77     # schema = "/var/www/html/schemas/pl.rng"
78     schema = None
79     if schema:
80         try:
81             tree = etree.parse(StringIO(rspec))
82         except etree.XMLSyntaxError:
83             message = str(sys.exc_info()[1])
84             raise InvalidRSpec(message)
85
86         relaxng_doc = etree.parse(schema)
87         relaxng = etree.RelaxNG(relaxng_doc)
88         
89         if not relaxng(tree):
90             error = relaxng.error_log.last_error
91             message = "%s (line %s)" % (error.message, error.line)
92             raise InvalidRSpec(message)
93
94     # get the callers hrn
95     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
96     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
97
98     # attempt to use delegated credential first
99     credential = api.getDelegatedCredential(creds)
100     if not credential:     
101         credential = api.getCredential()
102     threads = ThreadManager()
103     for aggregate in api.aggregates:
104         # prevent infinite loop. Dont send request back to caller
105         # unless the caller is the aggregate's SM 
106         if caller_hrn == aggregate and aggregate != api.hrn:
107             continue
108             
109         # Just send entire RSpec to each aggregate
110         server = api.aggregates[aggregate]
111         threads.run(server.CreateSliver, xrn, credential, rspec, users)
112             
113     results = threads.get_results() 
114     merged_rspec = merge_rspecs(results)
115     return merged_rspec
116
117 def renew_slice(api, xrn, creds, expiration_time):
118     hrn, type = urn_to_hrn(xrn)
119
120     # get the callers hrn
121     valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
122     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
123
124     # attempt to use delegated credential first
125     credential = api.getDelegatedCredential(creds)
126     if not credential:
127         credential = api.getCredential()
128     threads = ThreadManager()
129     for aggregate in api.aggregates:
130         # prevent infinite loop. Dont send request back to caller
131         # unless the caller is the aggregate's SM
132         if caller_hrn == aggregate and aggregate != api.hrn:
133             continue
134
135         server = api.aggregates[aggregate]
136         threads.run(server.RenewSliver, xrn, credential, expiration_time)
137     threads.get_results()
138     return 1
139
140 def get_ticket(api, xrn, creds, rspec, users):
141     slice_hrn, type = urn_to_hrn(xrn)
142     # get the netspecs contained within the clients rspec
143     aggregate_rspecs = {}
144     tree= etree.parse(StringIO(rspec))
145     elements = tree.findall('./network')
146     for element in elements:
147         aggregate_hrn = element.values()[0]
148         aggregate_rspecs[aggregate_hrn] = rspec 
149
150     # get the callers hrn
151     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
152     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
153
154     # attempt to use delegated credential first
155     credential = api.getDelegatedCredential(creds)
156     if not credential:
157         credential = api.getCredential() 
158     threads = ThreadManager()
159     for aggregate, aggregate_rspec in aggregate_rspecs.items():
160         # prevent infinite loop. Dont send request back to caller
161         # unless the caller is the aggregate's SM
162         if caller_hrn == aggregate and aggregate != api.hrn:
163             continue
164         server = None
165         if aggregate in api.aggregates:
166             server = api.aggregates[aggregate]
167         else:
168             net_urn = hrn_to_urn(aggregate, 'authority')     
169             # we may have a peer that knows about this aggregate
170             for agg in api.aggregates:
171                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
172                 if not target_aggs or not 'hrn' in target_aggs[0]:
173                     continue
174                 # send the request to this address 
175                 url = target_aggs[0]['url']
176                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
177                 # aggregate found, no need to keep looping
178                 break   
179         if server is None:
180             continue 
181         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
182
183     results = threads.get_results()
184     
185     # gather information from each ticket 
186     rspecs = []
187     initscripts = []
188     slivers = [] 
189     object_gid = None  
190     for result in results:
191         agg_ticket = SfaTicket(string=result)
192         attrs = agg_ticket.get_attributes()
193         if not object_gid:
194             object_gid = agg_ticket.get_gid_object()
195         rspecs.append(agg_ticket.get_rspec())
196         initscripts.extend(attrs.get('initscripts', [])) 
197         slivers.extend(attrs.get('slivers', [])) 
198     
199     # merge info
200     attributes = {'initscripts': initscripts,
201                  'slivers': slivers}
202     merged_rspec = merge_rspecs(rspecs) 
203
204     # create a new ticket
205     ticket = SfaTicket(subject = slice_hrn)
206     ticket.set_gid_caller(api.auth.client_gid)
207     ticket.set_issuer(key=api.key, subject=api.hrn)
208     ticket.set_gid_object(object_gid)
209     ticket.set_pubkey(object_gid.get_pubkey())
210     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
211     ticket.set_attributes(attributes)
212     ticket.set_rspec(merged_rspec)
213     ticket.encode()
214     ticket.sign()          
215     return ticket.save_to_string(save_parents=True)
216
217
218 def delete_slice(api, xrn, creds):
219     hrn, type = urn_to_hrn(xrn)
220
221     # get the callers hrn
222     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
223     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
224
225     # attempt to use delegated credential first
226     credential = api.getDelegatedCredential(creds)
227     if not credential:
228         credential = api.getCredential()
229     threads = ThreadManager()
230     for aggregate in api.aggregates:
231         # prevent infinite loop. Dont send request back to caller
232         # unless the caller is the aggregate's SM
233         if caller_hrn == aggregate and aggregate != api.hrn:
234             continue
235         server = api.aggregates[aggregate]
236         threads.run(server.DeleteSliver, xrn, credential)
237     threads.get_results()
238     return 1
239
240 def start_slice(api, xrn, creds):
241     hrn, type = urn_to_hrn(xrn)
242
243     # get the callers hrn
244     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
245     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
246
247     # attempt to use delegated credential first
248     credential = api.getDelegatedCredential(creds)
249     if not credential:
250         credential = api.getCredential()
251     threads = ThreadManager()
252     for aggregate in api.aggregates:
253         # prevent infinite loop. Dont send request back to caller
254         # unless the caller is the aggregate's SM
255         if caller_hrn == aggregate and aggregate != api.hrn:
256             continue
257         server = api.aggregates[aggregate]
258         threads.run(server.Start, xrn, credential)
259     threads.get_results()    
260     return 1
261  
262 def stop_slice(api, xrn, creds):
263     hrn, type = urn_to_hrn(xrn)
264
265     # get the callers hrn
266     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
267     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
268
269     # attempt to use delegated credential first
270     credential = api.getDelegatedCredential(creds)
271     if not credential:
272         credential = api.getCredential()
273     threads = ThreadManager()
274     for aggregate in api.aggregates:
275         # prevent infinite loop. Dont send request back to caller
276         # unless the caller is the aggregate's SM
277         if caller_hrn == aggregate and aggregate != api.hrn:
278             continue
279         server = api.aggregates[aggregate]
280         threads.run(server.Stop, xrn, credential)
281     threads.get_results()    
282     return 1
283
284 def reset_slice(api, xrn):
285     """
286     Not implemented
287     """
288     return 1
289
290 def shutdown(api, xrn, creds):
291     """
292     Not implemented   
293     """
294     return 1
295
296 def status(api, xrn, creds):
297     """
298     Not implemented 
299     """
300     return 1
301
302 def get_slices(api, creds):
303
304     # look in cache first
305     if api.cache:
306         slices = api.cache.get('slices')
307         if slices:
308             return slices    
309
310     # get the callers hrn
311     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
312     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
313
314     # attempt to use delegated credential first
315     credential = api.getDelegatedCredential(creds)
316     if not credential:
317         credential = api.getCredential()
318     threads = ThreadManager()
319     # fetch from aggregates
320     for aggregate in api.aggregates:
321         # prevent infinite loop. Dont send request back to caller
322         # unless the caller is the aggregate's SM
323         if caller_hrn == aggregate and aggregate != api.hrn:
324             continue
325         server = api.aggregates[aggregate]
326         threads.run(server.ListSlices, credential)
327
328     # combime results
329     results = threads.get_results()
330     slices = []
331     for result in results:
332         slices.extend(result)
333     
334     # cache the result
335     if api.cache:
336         api.cache.add('slices', slices)
337
338     return slices
339  
340 def get_rspec(api, creds, options):
341     
342     # get slice's hrn from options
343     xrn = options.get('geni_slice_urn', '')
344     hrn, type = urn_to_hrn(xrn)
345
346     # get hrn of the original caller
347     origin_hrn = options.get('origin_hrn', None)
348     if not origin_hrn:
349         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
350     
351     # look in cache first 
352     if api.cache and not xrn:
353         rspec =  api.cache.get('nodes')
354         if rspec:
355             return rspec
356
357     hrn, type = urn_to_hrn(xrn)
358     rspec = None
359
360     # get the callers hrn
361     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
362     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
363
364     # attempt to use delegated credential first
365     credential = api.getDelegatedCredential(creds)
366     if not credential:
367         credential = api.getCredential()
368     threads = ThreadManager()
369     for aggregate in api.aggregates:
370         # prevent infinite loop. Dont send request back to caller
371         # unless the caller is the aggregate's SM
372         if caller_hrn == aggregate and aggregate != api.hrn:
373             continue
374         # get the rspec from the aggregate
375         server = api.aggregates[aggregate]
376         my_opts = copy(options)
377         my_opts['geni_compressed'] = False
378         threads.run(server.ListResources, credential, my_opts)
379         #threads.run(server.get_resources, cred, xrn, origin_hrn)
380                     
381     results = threads.get_results()
382     # combine the rspecs into a single rspec 
383     for agg_rspec in results:
384         try:
385             tree = etree.parse(StringIO(agg_rspec))
386         except etree.XMLSyntaxError:
387             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
388             raise InvalidRSpec(message)
389
390         root = tree.getroot()
391         if root.get("type") in ["SFA"]:
392             if rspec == None:
393                 rspec = root
394             else:
395                 for network in root.iterfind("./network"):
396                     rspec.append(deepcopy(network))
397                 for request in root.iterfind("./request"):
398                     rspec.append(deepcopy(request))
399     
400     sfa_logger().debug('get_rspec: rspec=%r'%rspec)
401     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
402     # cache the result
403     if api.cache and not xrn:
404         api.cache.add('nodes', rspec)
405  
406     return rspec
407
408 def main():
409     r = RSpec()
410     r.parseFile(sys.argv[1])
411     rspec = r.toDict()
412     create_slice(None,'plc.princeton.tmacktestslice',rspec)
413
414 if __name__ == "__main__":
415     main()
416