fix NameError (cred should be credential)
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspecHelper import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
26 from copy import copy
27
28 def get_version():
29     version = {}
30     version['geni_api'] = 1
31     version['sfa'] = 1
32     return version
33
34 def slice_status(api, slice_xrn, creds ):
35     result = {}
36     result['geni_urn'] = slice_xrn
37     result['geni_status'] = 'unknown'
38     result['geni_resources'] = {}
39     return result
40
41 def create_slice(api, xrn, creds, rspec, users):
42     hrn, type = urn_to_hrn(xrn)
43
44     # Validate the RSpec against PlanetLab's schema --disabled for now
45     # The schema used here needs to aggregate the PL and VINI schemas
46     # schema = "/var/www/html/schemas/pl.rng"
47     schema = None
48     if schema:
49         try:
50             tree = etree.parse(StringIO(rspec))
51         except etree.XMLSyntaxError:
52             message = str(sys.exc_info()[1])
53             raise InvalidRSpec(message)
54
55         relaxng_doc = etree.parse(schema)
56         relaxng = etree.RelaxNG(relaxng_doc)
57         
58         if not relaxng(tree):
59             error = relaxng.error_log.last_error
60             message = "%s (line %s)" % (error.message, error.line)
61             raise InvalidRSpec(message)
62
63     # attempt to use delegated credential first
64     credential = api.getDelegatedCredential(creds)
65     if not credential:     
66         credential = api.getCredential()
67     threads = ThreadManager()
68     for aggregate in api.aggregates:
69         # Just send entire RSpec to each aggregate
70         server = api.aggregates[aggregate]
71         threads.run(server.CreateSliver, xrn, cred, rspec, users)
72             
73     results = threads.get_results() 
74     merged_rspec = merge_rspecs(results)
75     return merged_rspec
76
77 def renew_slice(api, xrn, creds, expiration_time):
78     # attempt to use delegated credential first
79     credential = api.getDelegatedCredential(creds)
80     if not credential:
81         credential = api.getCredential()
82     threads = ThreadManager()
83     for aggregate in api.aggregates:
84         server = api.aggregates[aggregate]
85         threads.run(server.RenewSliver, xrn, credential, expiration_time)
86     threads.get_results()
87     return 1
88
89 def get_ticket(api, xrn, creds, rspec, users):
90     slice_hrn, type = urn_to_hrn(xrn)
91     # get the netspecs contained within the clients rspec
92     aggregate_rspecs = {}
93     tree= etree.parse(StringIO(rspec))
94     elements = tree.findall('./network')
95     for element in elements:
96         aggregate_hrn = element.values()[0]
97         aggregate_rspecs[aggregate_hrn] = rspec 
98
99     # attempt to use delegated credential first
100     credential = api.getDelegatedCredential(creds)
101     if not credential:
102         credential = api.getCredential() 
103     threads = ThreadManager()
104     for aggregate, aggregate_rspec in aggregate_rspecs.items():
105         server = None
106         if aggregate in api.aggregates:
107             server = api.aggregates[aggregate]
108         else:
109             net_urn = hrn_to_urn(aggregate, 'authority')     
110             # we may have a peer that knows about this aggregate
111             for agg in api.aggregates:
112                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
113                 if not target_aggs or not 'hrn' in target_aggs[0]:
114                     continue
115                 # send the request to this address 
116                 url = target_aggs[0]['url']
117                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
118                 # aggregate found, no need to keep looping
119                 break   
120         if server is None:
121             continue 
122         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
123
124     results = threads.get_results()
125     
126     # gather information from each ticket 
127     rspecs = []
128     initscripts = []
129     slivers = [] 
130     object_gid = None  
131     for result in results:
132         agg_ticket = SfaTicket(string=result)
133         attrs = agg_ticket.get_attributes()
134         if not object_gid:
135             object_gid = agg_ticket.get_gid_object()
136         rspecs.append(agg_ticket.get_rspec())
137         initscripts.extend(attrs.get('initscripts', [])) 
138         slivers.extend(attrs.get('slivers', [])) 
139     
140     # merge info
141     attributes = {'initscripts': initscripts,
142                  'slivers': slivers}
143     merged_rspec = merge_rspecs(rspecs) 
144
145     # create a new ticket
146     ticket = SfaTicket(subject = slice_hrn)
147     ticket.set_gid_caller(api.auth.client_gid)
148     ticket.set_issuer(key=api.key, subject=api.hrn)
149     ticket.set_gid_object(object_gid)
150     ticket.set_pubkey(object_gid.get_pubkey())
151     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
152     ticket.set_attributes(attributes)
153     ticket.set_rspec(merged_rspec)
154     ticket.encode()
155     ticket.sign()          
156     return ticket.save_to_string(save_parents=True)
157
158
159 def delete_slice(api, xrn, origin_hrn=None):
160     # attempt to use delegated credential first
161     credential = api.getDelegatedCredential(creds)
162     if not credential:
163         credential = api.getCredential()
164     threads = ThreadManager()
165     for aggregate in api.aggregates:
166         server = api.aggregates[aggregate]
167         threads.run(server.DeleteSliver, xrn, credential)
168     threads.get_results()
169     return 1
170
171 def start_slice(api, xrn, creds):
172     # attempt to use delegated credential first
173     credential = api.getDelegatedCredential(creds)
174     if not credential:
175         credential = api.getCredential()
176     threads = ThreadManager()
177     for aggregate in api.aggregates:
178         server = api.aggregates[aggregate]
179         threads.run(server.Start, xrn, credential)
180     threads.get_results()    
181     return 1
182  
183 def stop_slice(api, xrn, creds):
184     # attempt to use delegated credential first
185     credential = api.getDelegatedCredential(creds)
186     if not credential:
187         credential = api.getCredential()
188     threads = ThreadManager()
189     for aggregate in api.aggregates:
190         server = api.aggregates[aggregate]
191         threads.run(server.Stop, xrn, credential)
192     threads.get_results()    
193     return 1
194
195 def reset_slice(api, xrn):
196     """
197     Not implemented
198     """
199     return 1
200
201 def shutdown(api, xrn, creds):
202     """
203     Not implemented   
204     """
205     return 1
206
207 def status(api, xrn, creds):
208     """
209     Not implemented 
210     """
211     return 1
212
213 def get_slices(api, creds):
214     # look in cache first
215     if api.cache:
216         slices = api.cache.get('slices')
217         if slices:
218             return slices    
219
220     # attempt to use delegated credential first
221     credential = api.getDelegatedCredential(creds)
222     if not credential:
223         credential = api.getCredential()
224     threads = ThreadManager()
225     # fetch from aggregates
226     for aggregate in api.aggregates:
227         server = api.aggregates[aggregate]
228         threads.run(server.ListSlices, credential)
229
230     # combime results
231     results = threads.get_results()
232     slices = []
233     for result in results:
234         slices.extend(result)
235     
236     # cache the result
237     if api.cache:
238         api.cache.add('slices', slices)
239
240     return slices
241  
242 def get_rspec(api, creds, options):
243     # get slice's hrn from options
244     xrn = options.get('geni_slice_urn', None)
245     hrn, type = urn_to_hrn(xrn)
246
247     # get hrn of the original caller
248     origin_hrn = options.get('origin_hrn', None)
249     if not origin_hrn:
250         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
251     
252     # look in cache first 
253     if api.cache and not xrn:
254         rspec =  api.cache.get('nodes')
255         if rspec:
256             return rspec
257
258     hrn, type = urn_to_hrn(xrn)
259     rspec = None
260     
261     # attempt to use delegated credential first
262     credential = api.getDelegatedCredential(creds)
263     if not credential:
264         credential = api.getCredential()
265     threads = ThreadManager()
266     for aggregate in api.aggregates:
267         # get the rspec from the aggregate
268         server = api.aggregates[aggregate]
269         my_opts = copy(options)
270         my_opts['geni_compressed'] = False
271         threads.run(server.ListResources, credential, my_opts)
272         #threads.run(server.get_resources, cred, xrn, origin_hrn)
273                     
274     results = threads.get_results()
275     # combine the rspecs into a single rspec 
276     for agg_rspec in results:
277         try:
278             tree = etree.parse(StringIO(agg_rspec))
279         except etree.XMLSyntaxError:
280             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
281             raise InvalidRSpec(message)
282
283         root = tree.getroot()
284         if root.get("type") in ["SFA"]:
285             if rspec == None:
286                 rspec = root
287             else:
288                 for network in root.iterfind("./network"):
289                     rspec.append(deepcopy(network))
290                 for request in root.iterfind("./request"):
291                     rspec.append(deepcopy(request))
292     
293     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
294     # cache the result
295     if api.cache and not xrn:
296         api.cache.add('nodes', rspec)
297  
298     return rspec
299
300 def main():
301     r = RSpec()
302     r.parseFile(sys.argv[1])
303     rspec = r.toDict()
304     create_slice(None,'plc.princeton.tmacktestslice',rspec)
305
306 if __name__ == "__main__":
307     main()
308