modifications to slice_status
[sfa.git] / sfa / managers / slice_manager_pl.py
1
2 import sys
3 import time,datetime
4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
7 from copy import copy
8 from lxml import etree
9
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 import sfa.plc.peers as peers
25 from sfa.util.version import version_core
26
27 # XX FIX ME:  should merge result from multiple aggregates instead of 
28 # calling aggregate implementation
29 from sfa.managers.aggregate_manager_pl import slice_status
30
31 def GetVersion(api):
32     peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items() 
33                    if peername != api.hrn])
34     xrn=Xrn (api.hrn)
35     return version_core({'interface':'slicemgr',
36                          'hrn' : xrn.get_hrn(),
37                          'urn' : xrn.get_urn(),
38                          'peers': peers,
39                          })
40
41 def create_slice(api, xrn, creds, rspec, users):
42     hrn, type = urn_to_hrn(xrn)
43
44     # Validate the RSpec against PlanetLab's schema --disabled for now
45     # The schema used here needs to aggregate the PL and VINI schemas
46     # schema = "/var/www/html/schemas/pl.rng"
47     schema = None
48     if schema:
49         try:
50             tree = etree.parse(StringIO(rspec))
51         except etree.XMLSyntaxError:
52             message = str(sys.exc_info()[1])
53             raise InvalidRSpec(message)
54
55         relaxng_doc = etree.parse(schema)
56         relaxng = etree.RelaxNG(relaxng_doc)
57         
58         if not relaxng(tree):
59             error = relaxng.error_log.last_error
60             message = "%s (line %s)" % (error.message, error.line)
61             raise InvalidRSpec(message)
62
63     # get the callers hrn
64     valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
65     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
66
67     # attempt to use delegated credential first
68     credential = api.getDelegatedCredential(creds)
69     if not credential:     
70         credential = api.getCredential()
71     threads = ThreadManager()
72     for aggregate in api.aggregates:
73         # prevent infinite loop. Dont send request back to caller
74         # unless the caller is the aggregate's SM 
75         if caller_hrn == aggregate and aggregate != api.hrn:
76             continue
77             
78         # Just send entire RSpec to each aggregate
79         server = api.aggregates[aggregate]
80         threads.run(server.CreateSliver, xrn, credential, rspec, users)
81             
82     results = threads.get_results() 
83     merged_rspec = merge_rspecs(results)
84     return merged_rspec
85
86 def renew_slice(api, xrn, creds, expiration_time):
87     hrn, type = urn_to_hrn(xrn)
88
89     # get the callers hrn
90     valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
91     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
92
93     # attempt to use delegated credential first
94     credential = api.getDelegatedCredential(creds)
95     if not credential:
96         credential = api.getCredential()
97     threads = ThreadManager()
98     for aggregate in api.aggregates:
99         # prevent infinite loop. Dont send request back to caller
100         # unless the caller is the aggregate's SM
101         if caller_hrn == aggregate and aggregate != api.hrn:
102             continue
103
104         server = api.aggregates[aggregate]
105         threads.run(server.RenewSliver, xrn, [credential], expiration_time)
106     threads.get_results()
107     return 1
108
109 def get_ticket(api, xrn, creds, rspec, users):
110     slice_hrn, type = urn_to_hrn(xrn)
111     # get the netspecs contained within the clients rspec
112     aggregate_rspecs = {}
113     tree= etree.parse(StringIO(rspec))
114     elements = tree.findall('./network')
115     for element in elements:
116         aggregate_hrn = element.values()[0]
117         aggregate_rspecs[aggregate_hrn] = rspec 
118
119     # get the callers hrn
120     valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
121     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
122
123     # attempt to use delegated credential first
124     credential = api.getDelegatedCredential(creds)
125     if not credential:
126         credential = api.getCredential() 
127     threads = ThreadManager()
128     for aggregate, aggregate_rspec in aggregate_rspecs.items():
129         # prevent infinite loop. Dont send request back to caller
130         # unless the caller is the aggregate's SM
131         if caller_hrn == aggregate and aggregate != api.hrn:
132             continue
133         server = None
134         if aggregate in api.aggregates:
135             server = api.aggregates[aggregate]
136         else:
137             net_urn = hrn_to_urn(aggregate, 'authority')     
138             # we may have a peer that knows about this aggregate
139             for agg in api.aggregates:
140                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
141                 if not target_aggs or not 'hrn' in target_aggs[0]:
142                     continue
143                 # send the request to this address 
144                 url = target_aggs[0]['url']
145                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
146                 # aggregate found, no need to keep looping
147                 break   
148         if server is None:
149             continue 
150         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
151
152     results = threads.get_results()
153     
154     # gather information from each ticket 
155     rspecs = []
156     initscripts = []
157     slivers = [] 
158     object_gid = None  
159     for result in results:
160         agg_ticket = SfaTicket(string=result)
161         attrs = agg_ticket.get_attributes()
162         if not object_gid:
163             object_gid = agg_ticket.get_gid_object()
164         rspecs.append(agg_ticket.get_rspec())
165         initscripts.extend(attrs.get('initscripts', [])) 
166         slivers.extend(attrs.get('slivers', [])) 
167     
168     # merge info
169     attributes = {'initscripts': initscripts,
170                  'slivers': slivers}
171     merged_rspec = merge_rspecs(rspecs) 
172
173     # create a new ticket
174     ticket = SfaTicket(subject = slice_hrn)
175     ticket.set_gid_caller(api.auth.client_gid)
176     ticket.set_issuer(key=api.key, subject=api.hrn)
177     ticket.set_gid_object(object_gid)
178     ticket.set_pubkey(object_gid.get_pubkey())
179     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
180     ticket.set_attributes(attributes)
181     ticket.set_rspec(merged_rspec)
182     ticket.encode()
183     ticket.sign()          
184     return ticket.save_to_string(save_parents=True)
185
186
187 def delete_slice(api, xrn, creds):
188     hrn, type = urn_to_hrn(xrn)
189
190     # get the callers hrn
191     valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
192     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
193
194     # attempt to use delegated credential first
195     credential = api.getDelegatedCredential(creds)
196     if not credential:
197         credential = api.getCredential()
198     threads = ThreadManager()
199     for aggregate in api.aggregates:
200         # prevent infinite loop. Dont send request back to caller
201         # unless the caller is the aggregate's SM
202         if caller_hrn == aggregate and aggregate != api.hrn:
203             continue
204         server = api.aggregates[aggregate]
205         threads.run(server.DeleteSliver, xrn, credential)
206     threads.get_results()
207     return 1
208
209 def start_slice(api, xrn, creds):
210     hrn, type = urn_to_hrn(xrn)
211
212     # get the callers hrn
213     valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
214     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
215
216     # attempt to use delegated credential first
217     credential = api.getDelegatedCredential(creds)
218     if not credential:
219         credential = api.getCredential()
220     threads = ThreadManager()
221     for aggregate in api.aggregates:
222         # prevent infinite loop. Dont send request back to caller
223         # unless the caller is the aggregate's SM
224         if caller_hrn == aggregate and aggregate != api.hrn:
225             continue
226         server = api.aggregates[aggregate]
227         threads.run(server.Start, xrn, credential)
228     threads.get_results()    
229     return 1
230  
231 def stop_slice(api, xrn, creds):
232     hrn, type = urn_to_hrn(xrn)
233
234     # get the callers hrn
235     valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
236     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
237
238     # attempt to use delegated credential first
239     credential = api.getDelegatedCredential(creds)
240     if not credential:
241         credential = api.getCredential()
242     threads = ThreadManager()
243     for aggregate in api.aggregates:
244         # prevent infinite loop. Dont send request back to caller
245         # unless the caller is the aggregate's SM
246         if caller_hrn == aggregate and aggregate != api.hrn:
247             continue
248         server = api.aggregates[aggregate]
249         threads.run(server.Stop, xrn, credential)
250     threads.get_results()    
251     return 1
252
253 def reset_slice(api, xrn):
254     """
255     Not implemented
256     """
257     return 1
258
259 def shutdown(api, xrn, creds):
260     """
261     Not implemented   
262     """
263     return 1
264
265 def status(api, xrn, creds):
266     """
267     Not implemented 
268     """
269     return 1
270
271 def get_slices(api, creds):
272
273     # look in cache first
274     if api.cache:
275         slices = api.cache.get('slices')
276         if slices:
277             return slices    
278
279     # get the callers hrn
280     valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
281     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
282
283     # attempt to use delegated credential first
284     credential = api.getDelegatedCredential(creds)
285     if not credential:
286         credential = api.getCredential()
287     threads = ThreadManager()
288     # fetch from aggregates
289     for aggregate in api.aggregates:
290         # prevent infinite loop. Dont send request back to caller
291         # unless the caller is the aggregate's SM
292         if caller_hrn == aggregate and aggregate != api.hrn:
293             continue
294         server = api.aggregates[aggregate]
295         threads.run(server.ListSlices, credential)
296
297     # combime results
298     results = threads.get_results()
299     slices = []
300     for result in results:
301         slices.extend(result)
302     
303     # cache the result
304     if api.cache:
305         api.cache.add('slices', slices)
306
307     return slices
308  
309 def get_rspec(api, creds, options):
310     
311     # get slice's hrn from options
312     xrn = options.get('geni_slice_urn', '')
313     hrn, type = urn_to_hrn(xrn)
314
315     # get hrn of the original caller
316     origin_hrn = options.get('origin_hrn', None)
317     if not origin_hrn:
318         if isinstance(creds, list):
319             origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
320         else:
321             origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
322     
323     # look in cache first 
324     if api.cache and not xrn:
325         rspec =  api.cache.get('nodes')
326         if rspec:
327             return rspec
328
329     hrn, type = urn_to_hrn(xrn)
330
331     # get the callers hrn
332     valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
333     caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
334
335     # attempt to use delegated credential first
336     credential = api.getDelegatedCredential(creds)
337     if not credential:
338         credential = api.getCredential()
339     threads = ThreadManager()
340     for aggregate in api.aggregates:
341         # prevent infinite loop. Dont send request back to caller
342         # unless the caller is the aggregate's SM
343         if caller_hrn == aggregate and aggregate != api.hrn:
344             continue
345         # get the rspec from the aggregate
346         server = api.aggregates[aggregate]
347         my_opts = copy(options)
348         my_opts['geni_compressed'] = False
349         threads.run(server.ListResources, credential, my_opts)
350         #threads.run(server.get_resources, cred, xrn, origin_hrn)
351                     
352     results = threads.get_results()
353     merged_rspec = merge_rspecs(results)
354
355     # cache the result
356     if api.cache and not xrn:
357         api.cache.add('nodes', merged_rspec)
358  
359     return merged_rspec
360
361 def main():
362     r = RSpec()
363     r.parseFile(sys.argv[1])
364     rspec = r.toDict()
365     create_slice(None,'plc.princeton.tmacktestslice',rspec)
366
367 if __name__ == "__main__":
368     main()
369