4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.rspec import *
14 from sfa.util.specdict import *
15 from sfa.util.faults import *
16 from sfa.util.record import SfaRecord
17 from sfa.util.policy import Policy
18 from sfa.util.prefixTree import prefixTree
19 from sfa.util.sfaticket import *
20 from sfa.trust.credential import Credential
21 from sfa.util.threadmanager import ThreadManager
22 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
23 import sfa.plc.peers as peers
24 from sfa.util.version import version_core
27 peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items()
28 if peername != api.hrn])
30 return version_core({'interface':'slicemgr',
31 'hrn' : xrn.get_hrn(),
32 'urn' : xrn.get_urn(),
36 def slice_status(api, slice_xrn, creds ):
37 hrn, type = urn_to_hrn(slice_xrn)
38 # find out where this slice is currently running
40 slicename = hrn_to_pl_slicename(hrn)
41 api.logger.info("Checking status for %s" % slicename)
42 slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
44 raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
47 nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
48 ['hostname', 'boot_state', 'last_contact'])
49 api.logger.info(slice)
50 api.logger.info(nodes)
53 result['geni_urn'] = slice_xrn
54 result['geni_status'] = 'unknown'
55 result['pl_login'] = slice['name']
56 result['pl_expires'] = slice['expires']
62 res['pl_hostname'] = node['hostname']
63 res['pl_boot_state'] = node['boot_state']
64 res['pl_last_contact'] = node['last_contact']
66 res['geni_status'] = 'unknown'
67 res['geni_error'] = ''
71 result['geni_resources'] = resources
74 def create_slice(api, xrn, creds, rspec, users):
75 hrn, type = urn_to_hrn(xrn)
77 # Validate the RSpec against PlanetLab's schema --disabled for now
78 # The schema used here needs to aggregate the PL and VINI schemas
79 # schema = "/var/www/html/schemas/pl.rng"
83 tree = etree.parse(StringIO(rspec))
84 except etree.XMLSyntaxError:
85 message = str(sys.exc_info()[1])
86 raise InvalidRSpec(message)
88 relaxng_doc = etree.parse(schema)
89 relaxng = etree.RelaxNG(relaxng_doc)
92 error = relaxng.error_log.last_error
93 message = "%s (line %s)" % (error.message, error.line)
94 raise InvalidRSpec(message)
97 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
98 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
100 # attempt to use delegated credential first
101 credential = api.getDelegatedCredential(creds)
103 credential = api.getCredential()
104 threads = ThreadManager()
105 for aggregate in api.aggregates:
106 # prevent infinite loop. Dont send request back to caller
107 # unless the caller is the aggregate's SM
108 if caller_hrn == aggregate and aggregate != api.hrn:
111 # Just send entire RSpec to each aggregate
112 server = api.aggregates[aggregate]
113 threads.run(server.CreateSliver, xrn, credential, rspec, users)
115 results = threads.get_results()
116 merged_rspec = merge_rspecs(results)
119 def renew_slice(api, xrn, creds, expiration_time):
120 hrn, type = urn_to_hrn(xrn)
122 # get the callers hrn
123 valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
124 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
126 # attempt to use delegated credential first
127 credential = api.getDelegatedCredential(creds)
129 credential = api.getCredential()
130 threads = ThreadManager()
131 for aggregate in api.aggregates:
132 # prevent infinite loop. Dont send request back to caller
133 # unless the caller is the aggregate's SM
134 if caller_hrn == aggregate and aggregate != api.hrn:
137 server = api.aggregates[aggregate]
138 threads.run(server.RenewSliver, xrn, credential, expiration_time)
139 threads.get_results()
142 def get_ticket(api, xrn, creds, rspec, users):
143 slice_hrn, type = urn_to_hrn(xrn)
144 # get the netspecs contained within the clients rspec
145 aggregate_rspecs = {}
146 tree= etree.parse(StringIO(rspec))
147 elements = tree.findall('./network')
148 for element in elements:
149 aggregate_hrn = element.values()[0]
150 aggregate_rspecs[aggregate_hrn] = rspec
152 # get the callers hrn
153 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
154 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
156 # attempt to use delegated credential first
157 credential = api.getDelegatedCredential(creds)
159 credential = api.getCredential()
160 threads = ThreadManager()
161 for aggregate, aggregate_rspec in aggregate_rspecs.items():
162 # prevent infinite loop. Dont send request back to caller
163 # unless the caller is the aggregate's SM
164 if caller_hrn == aggregate and aggregate != api.hrn:
167 if aggregate in api.aggregates:
168 server = api.aggregates[aggregate]
170 net_urn = hrn_to_urn(aggregate, 'authority')
171 # we may have a peer that knows about this aggregate
172 for agg in api.aggregates:
173 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
174 if not target_aggs or not 'hrn' in target_aggs[0]:
176 # send the request to this address
177 url = target_aggs[0]['url']
178 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
179 # aggregate found, no need to keep looping
183 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
185 results = threads.get_results()
187 # gather information from each ticket
192 for result in results:
193 agg_ticket = SfaTicket(string=result)
194 attrs = agg_ticket.get_attributes()
196 object_gid = agg_ticket.get_gid_object()
197 rspecs.append(agg_ticket.get_rspec())
198 initscripts.extend(attrs.get('initscripts', []))
199 slivers.extend(attrs.get('slivers', []))
202 attributes = {'initscripts': initscripts,
204 merged_rspec = merge_rspecs(rspecs)
206 # create a new ticket
207 ticket = SfaTicket(subject = slice_hrn)
208 ticket.set_gid_caller(api.auth.client_gid)
209 ticket.set_issuer(key=api.key, subject=api.hrn)
210 ticket.set_gid_object(object_gid)
211 ticket.set_pubkey(object_gid.get_pubkey())
212 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
213 ticket.set_attributes(attributes)
214 ticket.set_rspec(merged_rspec)
217 return ticket.save_to_string(save_parents=True)
220 def delete_slice(api, xrn, creds):
221 hrn, type = urn_to_hrn(xrn)
223 # get the callers hrn
224 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
225 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
227 # attempt to use delegated credential first
228 credential = api.getDelegatedCredential(creds)
230 credential = api.getCredential()
231 threads = ThreadManager()
232 for aggregate in api.aggregates:
233 # prevent infinite loop. Dont send request back to caller
234 # unless the caller is the aggregate's SM
235 if caller_hrn == aggregate and aggregate != api.hrn:
237 server = api.aggregates[aggregate]
238 threads.run(server.DeleteSliver, xrn, credential)
239 threads.get_results()
242 def start_slice(api, xrn, creds):
243 hrn, type = urn_to_hrn(xrn)
245 # get the callers hrn
246 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
247 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
249 # attempt to use delegated credential first
250 credential = api.getDelegatedCredential(creds)
252 credential = api.getCredential()
253 threads = ThreadManager()
254 for aggregate in api.aggregates:
255 # prevent infinite loop. Dont send request back to caller
256 # unless the caller is the aggregate's SM
257 if caller_hrn == aggregate and aggregate != api.hrn:
259 server = api.aggregates[aggregate]
260 threads.run(server.Start, xrn, credential)
261 threads.get_results()
264 def stop_slice(api, xrn, creds):
265 hrn, type = urn_to_hrn(xrn)
267 # get the callers hrn
268 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
269 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
271 # attempt to use delegated credential first
272 credential = api.getDelegatedCredential(creds)
274 credential = api.getCredential()
275 threads = ThreadManager()
276 for aggregate in api.aggregates:
277 # prevent infinite loop. Dont send request back to caller
278 # unless the caller is the aggregate's SM
279 if caller_hrn == aggregate and aggregate != api.hrn:
281 server = api.aggregates[aggregate]
282 threads.run(server.Stop, xrn, credential)
283 threads.get_results()
286 def reset_slice(api, xrn):
292 def shutdown(api, xrn, creds):
298 def status(api, xrn, creds):
304 def get_slices(api, creds):
306 # look in cache first
308 slices = api.cache.get('slices')
312 # get the callers hrn
313 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
314 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
316 # attempt to use delegated credential first
317 credential = api.getDelegatedCredential(creds)
319 credential = api.getCredential()
320 threads = ThreadManager()
321 # fetch from aggregates
322 for aggregate in api.aggregates:
323 # prevent infinite loop. Dont send request back to caller
324 # unless the caller is the aggregate's SM
325 if caller_hrn == aggregate and aggregate != api.hrn:
327 server = api.aggregates[aggregate]
328 threads.run(server.ListSlices, credential)
331 results = threads.get_results()
333 for result in results:
334 slices.extend(result)
338 api.cache.add('slices', slices)
342 def get_rspec(api, creds, options):
344 # get slice's hrn from options
345 xrn = options.get('geni_slice_urn', '')
346 hrn, type = urn_to_hrn(xrn)
348 # get hrn of the original caller
349 origin_hrn = options.get('origin_hrn', None)
351 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
353 # look in cache first
354 if api.cache and not xrn:
355 rspec = api.cache.get('nodes')
359 hrn, type = urn_to_hrn(xrn)
362 # get the callers hrn
363 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
364 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
366 # attempt to use delegated credential first
367 credential = api.getDelegatedCredential(creds)
369 credential = api.getCredential()
370 threads = ThreadManager()
371 for aggregate in api.aggregates:
372 # prevent infinite loop. Dont send request back to caller
373 # unless the caller is the aggregate's SM
374 if caller_hrn == aggregate and aggregate != api.hrn:
376 # get the rspec from the aggregate
377 server = api.aggregates[aggregate]
378 my_opts = copy(options)
379 my_opts['geni_compressed'] = False
380 threads.run(server.ListResources, credential, my_opts)
381 #threads.run(server.get_resources, cred, xrn, origin_hrn)
383 results = threads.get_results()
384 # combine the rspecs into a single rspec
385 for agg_rspec in results:
387 tree = etree.parse(StringIO(agg_rspec))
388 except etree.XMLSyntaxError:
389 message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
390 raise InvalidRSpec(message)
392 root = tree.getroot()
393 if root.get("type") in ["SFA"]:
397 for network in root.iterfind("./network"):
398 rspec.append(deepcopy(network))
399 for request in root.iterfind("./request"):
400 rspec.append(deepcopy(request))
402 sfa_logger().debug('get_rspec: rspec=%r'%rspec)
403 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
405 if api.cache and not xrn:
406 api.cache.add('nodes', rspec)
412 r.parseFile(sys.argv[1])
414 create_slice(None,'plc.princeton.tmacktestslice',rspec)
416 if __name__ == "__main__":