4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import urn_to_hrn, hrn_to_urn
13 from sfa.util.rspec import *
14 from sfa.util.specdict import *
15 from sfa.util.faults import *
16 from sfa.util.record import SfaRecord
17 from sfa.util.policy import Policy
18 from sfa.util.prefixTree import prefixTree
19 from sfa.util.sfaticket import *
20 from sfa.trust.credential import Credential
21 from sfa.util.threadmanager import ThreadManager
22 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
23 import sfa.plc.peers as peers
24 from sfa.util.version import version_core
27 peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items()
28 if peername != api.hrn])
29 return version_core({'interface':'slicemgr',
34 def slice_status(api, slice_xrn, creds ):
35 hrn, type = urn_to_hrn(slice_xrn)
36 # find out where this slice is currently running
38 slicename = hrn_to_pl_slicename(hrn)
39 api.logger.info("Checking status for %s" % slicename)
40 slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
42 raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
45 nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
46 ['hostname', 'boot_state', 'last_contact'])
47 api.logger.info(slice)
48 api.logger.info(nodes)
51 result['geni_urn'] = slice_xrn
52 result['geni_status'] = 'unknown'
53 result['pl_login'] = slice['name']
54 result['pl_expires'] = slice['expires']
60 res['pl_hostname'] = node['hostname']
61 res['pl_boot_state'] = node['boot_state']
62 res['pl_last_contact'] = node['last_contact']
64 res['geni_status'] = 'unknown'
65 res['geni_error'] = ''
69 result['geni_resources'] = resources
72 def create_slice(api, xrn, creds, rspec, users):
73 hrn, type = urn_to_hrn(xrn)
75 # Validate the RSpec against PlanetLab's schema --disabled for now
76 # The schema used here needs to aggregate the PL and VINI schemas
77 # schema = "/var/www/html/schemas/pl.rng"
81 tree = etree.parse(StringIO(rspec))
82 except etree.XMLSyntaxError:
83 message = str(sys.exc_info()[1])
84 raise InvalidRSpec(message)
86 relaxng_doc = etree.parse(schema)
87 relaxng = etree.RelaxNG(relaxng_doc)
90 error = relaxng.error_log.last_error
91 message = "%s (line %s)" % (error.message, error.line)
92 raise InvalidRSpec(message)
95 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
96 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
98 # attempt to use delegated credential first
99 credential = api.getDelegatedCredential(creds)
101 credential = api.getCredential()
102 threads = ThreadManager()
103 for aggregate in api.aggregates:
104 # prevent infinite loop. Dont send request back to caller
105 # unless the caller is the aggregate's SM
106 if caller_hrn == aggregate and aggregate != api.hrn:
109 # Just send entire RSpec to each aggregate
110 server = api.aggregates[aggregate]
111 threads.run(server.CreateSliver, xrn, credential, rspec, users)
113 results = threads.get_results()
114 merged_rspec = merge_rspecs(results)
117 def renew_slice(api, xrn, creds, expiration_time):
118 hrn, type = urn_to_hrn(xrn)
120 # get the callers hrn
121 valid_cred = api.auth.checkCredentials(creds, 'renewesliver', hrn)[0]
122 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
124 # attempt to use delegated credential first
125 credential = api.getDelegatedCredential(creds)
127 credential = api.getCredential()
128 threads = ThreadManager()
129 for aggregate in api.aggregates:
130 # prevent infinite loop. Dont send request back to caller
131 # unless the caller is the aggregate's SM
132 if caller_hrn == aggregate and aggregate != api.hrn:
135 server = api.aggregates[aggregate]
136 threads.run(server.RenewSliver, xrn, credential, expiration_time)
137 threads.get_results()
140 def get_ticket(api, xrn, creds, rspec, users):
141 slice_hrn, type = urn_to_hrn(xrn)
142 # get the netspecs contained within the clients rspec
143 aggregate_rspecs = {}
144 tree= etree.parse(StringIO(rspec))
145 elements = tree.findall('./network')
146 for element in elements:
147 aggregate_hrn = element.values()[0]
148 aggregate_rspecs[aggregate_hrn] = rspec
150 # get the callers hrn
151 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
152 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
154 # attempt to use delegated credential first
155 credential = api.getDelegatedCredential(creds)
157 credential = api.getCredential()
158 threads = ThreadManager()
159 for aggregate, aggregate_rspec in aggregate_rspecs.items():
160 # prevent infinite loop. Dont send request back to caller
161 # unless the caller is the aggregate's SM
162 if caller_hrn == aggregate and aggregate != api.hrn:
165 if aggregate in api.aggregates:
166 server = api.aggregates[aggregate]
168 net_urn = hrn_to_urn(aggregate, 'authority')
169 # we may have a peer that knows about this aggregate
170 for agg in api.aggregates:
171 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
172 if not target_aggs or not 'hrn' in target_aggs[0]:
174 # send the request to this address
175 url = target_aggs[0]['url']
176 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
177 # aggregate found, no need to keep looping
181 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
183 results = threads.get_results()
185 # gather information from each ticket
190 for result in results:
191 agg_ticket = SfaTicket(string=result)
192 attrs = agg_ticket.get_attributes()
194 object_gid = agg_ticket.get_gid_object()
195 rspecs.append(agg_ticket.get_rspec())
196 initscripts.extend(attrs.get('initscripts', []))
197 slivers.extend(attrs.get('slivers', []))
200 attributes = {'initscripts': initscripts,
202 merged_rspec = merge_rspecs(rspecs)
204 # create a new ticket
205 ticket = SfaTicket(subject = slice_hrn)
206 ticket.set_gid_caller(api.auth.client_gid)
207 ticket.set_issuer(key=api.key, subject=api.hrn)
208 ticket.set_gid_object(object_gid)
209 ticket.set_pubkey(object_gid.get_pubkey())
210 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
211 ticket.set_attributes(attributes)
212 ticket.set_rspec(merged_rspec)
215 return ticket.save_to_string(save_parents=True)
218 def delete_slice(api, xrn, creds):
219 hrn, type = urn_to_hrn(xrn)
221 # get the callers hrn
222 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
223 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
225 # attempt to use delegated credential first
226 credential = api.getDelegatedCredential(creds)
228 credential = api.getCredential()
229 threads = ThreadManager()
230 for aggregate in api.aggregates:
231 # prevent infinite loop. Dont send request back to caller
232 # unless the caller is the aggregate's SM
233 if caller_hrn == aggregate and aggregate != api.hrn:
235 server = api.aggregates[aggregate]
236 threads.run(server.DeleteSliver, xrn, credential)
237 threads.get_results()
240 def start_slice(api, xrn, creds):
241 hrn, type = urn_to_hrn(xrn)
243 # get the callers hrn
244 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
245 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
247 # attempt to use delegated credential first
248 credential = api.getDelegatedCredential(creds)
250 credential = api.getCredential()
251 threads = ThreadManager()
252 for aggregate in api.aggregates:
253 # prevent infinite loop. Dont send request back to caller
254 # unless the caller is the aggregate's SM
255 if caller_hrn == aggregate and aggregate != api.hrn:
257 server = api.aggregates[aggregate]
258 threads.run(server.Start, xrn, credential)
259 threads.get_results()
262 def stop_slice(api, xrn, creds):
263 hrn, type = urn_to_hrn(xrn)
265 # get the callers hrn
266 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
267 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
269 # attempt to use delegated credential first
270 credential = api.getDelegatedCredential(creds)
272 credential = api.getCredential()
273 threads = ThreadManager()
274 for aggregate in api.aggregates:
275 # prevent infinite loop. Dont send request back to caller
276 # unless the caller is the aggregate's SM
277 if caller_hrn == aggregate and aggregate != api.hrn:
279 server = api.aggregates[aggregate]
280 threads.run(server.Stop, xrn, credential)
281 threads.get_results()
284 def reset_slice(api, xrn):
290 def shutdown(api, xrn, creds):
296 def status(api, xrn, creds):
302 def get_slices(api, creds):
304 # look in cache first
306 slices = api.cache.get('slices')
310 # get the callers hrn
311 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
312 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
314 # attempt to use delegated credential first
315 credential = api.getDelegatedCredential(creds)
317 credential = api.getCredential()
318 threads = ThreadManager()
319 # fetch from aggregates
320 for aggregate in api.aggregates:
321 # prevent infinite loop. Dont send request back to caller
322 # unless the caller is the aggregate's SM
323 if caller_hrn == aggregate and aggregate != api.hrn:
325 server = api.aggregates[aggregate]
326 threads.run(server.ListSlices, credential)
329 results = threads.get_results()
331 for result in results:
332 slices.extend(result)
336 api.cache.add('slices', slices)
340 def get_rspec(api, creds, options):
342 # get slice's hrn from options
343 xrn = options.get('geni_slice_urn', '')
344 hrn, type = urn_to_hrn(xrn)
346 # get hrn of the original caller
347 origin_hrn = options.get('origin_hrn', None)
349 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
351 # look in cache first
352 if api.cache and not xrn:
353 rspec = api.cache.get('nodes')
357 hrn, type = urn_to_hrn(xrn)
360 # get the callers hrn
361 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
362 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
364 # attempt to use delegated credential first
365 credential = api.getDelegatedCredential(creds)
367 credential = api.getCredential()
368 threads = ThreadManager()
369 for aggregate in api.aggregates:
370 # prevent infinite loop. Dont send request back to caller
371 # unless the caller is the aggregate's SM
372 if caller_hrn == aggregate and aggregate != api.hrn:
374 # get the rspec from the aggregate
375 server = api.aggregates[aggregate]
376 my_opts = copy(options)
377 my_opts['geni_compressed'] = False
378 threads.run(server.ListResources, credential, my_opts)
379 #threads.run(server.get_resources, cred, xrn, origin_hrn)
381 results = threads.get_results()
382 # combine the rspecs into a single rspec
383 for agg_rspec in results:
385 tree = etree.parse(StringIO(agg_rspec))
386 except etree.XMLSyntaxError:
387 message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
388 raise InvalidRSpec(message)
390 root = tree.getroot()
391 if root.get("type") in ["SFA"]:
395 for network in root.iterfind("./network"):
396 rspec.append(deepcopy(network))
397 for request in root.iterfind("./request"):
398 rspec.append(deepcopy(request))
400 sfa_logger().debug('get_rspec: rspec=%r'%rspec)
401 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
403 if api.cache and not xrn:
404 api.cache.add('nodes', rspec)
410 r.parseFile(sys.argv[1])
412 create_slice(None,'plc.princeton.tmacktestslice',rspec)
414 if __name__ == "__main__":