1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspecHelper import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
24 import sfa.plc.peers as peers
29 version['geni_api'] = 1
33 def slice_status(api, slice_xrn, creds ):
34 hrn, type = urn_to_hrn(slice_xrn)
35 # find out where this slice is currently running
37 slicename = hrn_to_pl_slicename(hrn)
39 slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
41 raise Exception("Slice %s not found (used %s as slicename internally)" % slice_xrn, slicename)
44 nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
45 ['hostname', 'boot_state', 'last_contact'])
46 api.logger.info(slice)
47 api.logger.info(nodes)
50 result['geni_urn'] = slice_xrn
51 result['geni_status'] = 'unknown'
52 result['pl_login'] = slice['name']
53 result['pl_expires'] = slice['expires']
59 res['pl_hostname'] = node['hostname']
60 res['pl_boot_state'] = node['boot_state']
61 res['pl_last_contact'] = node['last_contact']
63 res['geni_status'] = 'unknown'
64 res['geni_error'] = ''
68 result['geni_resources'] = resources
71 def create_slice(api, xrn, creds, rspec, users):
72 hrn, type = urn_to_hrn(xrn)
74 # Validate the RSpec against PlanetLab's schema --disabled for now
75 # The schema used here needs to aggregate the PL and VINI schemas
76 # schema = "/var/www/html/schemas/pl.rng"
80 tree = etree.parse(StringIO(rspec))
81 except etree.XMLSyntaxError:
82 message = str(sys.exc_info()[1])
83 raise InvalidRSpec(message)
85 relaxng_doc = etree.parse(schema)
86 relaxng = etree.RelaxNG(relaxng_doc)
89 error = relaxng.error_log.last_error
90 message = "%s (line %s)" % (error.message, error.line)
91 raise InvalidRSpec(message)
94 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
95 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
97 # attempt to use delegated credential first
98 credential = api.getDelegatedCredential(creds)
100 credential = api.getCredential()
101 threads = ThreadManager()
102 for aggregate in api.aggregates:
103 # prevent infinite loop. Dont send request back to caller
104 # unless the caller is the aggregate's SM
105 if caller_hrn == aggregate and aggregate != api.hrn:
108 # Just send entire RSpec to each aggregate
109 server = api.aggregates[aggregate]
110 threads.run(server.CreateSliver, xrn, credential, rspec, users)
112 results = threads.get_results()
113 merged_rspec = merge_rspecs(results)
116 def renew_slice(api, xrn, creds, expiration_time):
117 hrn, type = urn_to_hrn(xrn)
119 # get the callers hrn
120 valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
121 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
123 # attempt to use delegated credential first
124 credential = api.getDelegatedCredential(creds)
126 credential = api.getCredential()
127 threads = ThreadManager()
128 for aggregate in api.aggregates:
129 # prevent infinite loop. Dont send request back to caller
130 # unless the caller is the aggregate's SM
131 if caller_hrn == aggregate and aggregate != api.hrn:
134 server = api.aggregates[aggregate]
135 threads.run(server.RenewSliver, xrn, credential, expiration_time)
136 threads.get_results()
139 def get_ticket(api, xrn, creds, rspec, users):
140 slice_hrn, type = urn_to_hrn(xrn)
141 # get the netspecs contained within the clients rspec
142 aggregate_rspecs = {}
143 tree= etree.parse(StringIO(rspec))
144 elements = tree.findall('./network')
145 for element in elements:
146 aggregate_hrn = element.values()[0]
147 aggregate_rspecs[aggregate_hrn] = rspec
149 # get the callers hrn
150 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
151 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
153 # attempt to use delegated credential first
154 credential = api.getDelegatedCredential(creds)
156 credential = api.getCredential()
157 threads = ThreadManager()
158 for aggregate, aggregate_rspec in aggregate_rspecs.items():
159 # prevent infinite loop. Dont send request back to caller
160 # unless the caller is the aggregate's SM
161 if caller_hrn == aggregate and aggregate != api.hrn:
164 if aggregate in api.aggregates:
165 server = api.aggregates[aggregate]
167 net_urn = hrn_to_urn(aggregate, 'authority')
168 # we may have a peer that knows about this aggregate
169 for agg in api.aggregates:
170 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
171 if not target_aggs or not 'hrn' in target_aggs[0]:
173 # send the request to this address
174 url = target_aggs[0]['url']
175 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
176 # aggregate found, no need to keep looping
180 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
182 results = threads.get_results()
184 # gather information from each ticket
189 for result in results:
190 agg_ticket = SfaTicket(string=result)
191 attrs = agg_ticket.get_attributes()
193 object_gid = agg_ticket.get_gid_object()
194 rspecs.append(agg_ticket.get_rspec())
195 initscripts.extend(attrs.get('initscripts', []))
196 slivers.extend(attrs.get('slivers', []))
199 attributes = {'initscripts': initscripts,
201 merged_rspec = merge_rspecs(rspecs)
203 # create a new ticket
204 ticket = SfaTicket(subject = slice_hrn)
205 ticket.set_gid_caller(api.auth.client_gid)
206 ticket.set_issuer(key=api.key, subject=api.hrn)
207 ticket.set_gid_object(object_gid)
208 ticket.set_pubkey(object_gid.get_pubkey())
209 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
210 ticket.set_attributes(attributes)
211 ticket.set_rspec(merged_rspec)
214 return ticket.save_to_string(save_parents=True)
217 def delete_slice(api, xrn, creds):
218 hrn, type = urn_to_hrn(xrn)
220 # get the callers hrn
221 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
222 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
224 # attempt to use delegated credential first
225 credential = api.getDelegatedCredential(creds)
227 credential = api.getCredential()
228 threads = ThreadManager()
229 for aggregate in api.aggregates:
230 # prevent infinite loop. Dont send request back to caller
231 # unless the caller is the aggregate's SM
232 if caller_hrn == aggregate and aggregate != api.hrn:
234 server = api.aggregates[aggregate]
235 threads.run(server.DeleteSliver, xrn, credential)
236 threads.get_results()
239 def start_slice(api, xrn, creds):
240 hrn, type = urn_to_hrn(xrn)
242 # get the callers hrn
243 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
244 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
246 # attempt to use delegated credential first
247 credential = api.getDelegatedCredential(creds)
249 credential = api.getCredential()
250 threads = ThreadManager()
251 for aggregate in api.aggregates:
252 # prevent infinite loop. Dont send request back to caller
253 # unless the caller is the aggregate's SM
254 if caller_hrn == aggregate and aggregate != api.hrn:
256 server = api.aggregates[aggregate]
257 threads.run(server.Start, xrn, credential)
258 threads.get_results()
261 def stop_slice(api, xrn, creds):
262 hrn, type = urn_to_hrn(xrn)
264 # get the callers hrn
265 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
266 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
268 # attempt to use delegated credential first
269 credential = api.getDelegatedCredential(creds)
271 credential = api.getCredential()
272 threads = ThreadManager()
273 for aggregate in api.aggregates:
274 # prevent infinite loop. Dont send request back to caller
275 # unless the caller is the aggregate's SM
276 if caller_hrn == aggregate and aggregate != api.hrn:
278 server = api.aggregates[aggregate]
279 threads.run(server.Stop, xrn, credential)
280 threads.get_results()
283 def reset_slice(api, xrn):
289 def shutdown(api, xrn, creds):
295 def status(api, xrn, creds):
301 def get_slices(api, creds):
303 # look in cache first
305 slices = api.cache.get('slices')
309 # get the callers hrn
310 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
311 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
313 # attempt to use delegated credential first
314 credential = api.getDelegatedCredential(creds)
316 credential = api.getCredential()
317 threads = ThreadManager()
318 # fetch from aggregates
319 for aggregate in api.aggregates:
320 # prevent infinite loop. Dont send request back to caller
321 # unless the caller is the aggregate's SM
322 if caller_hrn == aggregate and aggregate != api.hrn:
324 server = api.aggregates[aggregate]
325 threads.run(server.ListSlices, credential)
328 results = threads.get_results()
330 for result in results:
331 slices.extend(result)
335 api.cache.add('slices', slices)
339 def get_rspec(api, creds, options):
341 # get slice's hrn from options
342 xrn = options.get('geni_slice_urn', None)
343 hrn, type = urn_to_hrn(xrn)
345 # get hrn of the original caller
346 origin_hrn = options.get('origin_hrn', None)
348 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
350 # look in cache first
351 if api.cache and not xrn:
352 rspec = api.cache.get('nodes')
356 hrn, type = urn_to_hrn(xrn)
359 # get the callers hrn
360 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
361 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
363 # attempt to use delegated credential first
364 credential = api.getDelegatedCredential(creds)
366 credential = api.getCredential()
367 threads = ThreadManager()
368 for aggregate in api.aggregates:
369 # prevent infinite loop. Dont send request back to caller
370 # unless the caller is the aggregate's SM
371 if caller_hrn == aggregate and aggregate != api.hrn:
373 # get the rspec from the aggregate
374 server = api.aggregates[aggregate]
375 my_opts = copy(options)
376 my_opts['geni_compressed'] = False
377 threads.run(server.ListResources, credential, my_opts)
378 #threads.run(server.get_resources, cred, xrn, origin_hrn)
380 results = threads.get_results()
381 # combine the rspecs into a single rspec
382 for agg_rspec in results:
384 tree = etree.parse(StringIO(agg_rspec))
385 except etree.XMLSyntaxError:
386 message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
387 raise InvalidRSpec(message)
389 root = tree.getroot()
390 if root.get("type") in ["SFA"]:
394 for network in root.iterfind("./network"):
395 rspec.append(deepcopy(network))
396 for request in root.iterfind("./request"):
397 rspec.append(deepcopy(request))
399 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
401 if api.cache and not xrn:
402 api.cache.add('nodes', rspec)
408 r.parseFile(sys.argv[1])
410 create_slice(None,'plc.princeton.tmacktestslice',rspec)
412 if __name__ == "__main__":