4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.faults import *
15 from sfa.util.record import SfaRecord
16 from sfa.rspecs.rspec_converter import RSpecConverter
17 from sfa.client.client_helper import sfa_to_pg_users_arg
18 from sfa.rspecs.version_manager import VersionManager
19 from sfa.rspecs.rspec import RSpec
20 from sfa.util.policy import Policy
21 from sfa.util.prefixTree import prefixTree
22 from sfa.trust.sfaticket import SfaTicket
23 from sfa.trust.credential import Credential
24 from sfa.util.threadmanager import ThreadManager
25 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
26 import sfa.plc.peers as peers
27 from sfa.util.version import version_core
28 from sfa.util.callids import Callids
31 def _call_id_supported(api, server):
33 Returns true if server support the optional call_id arg, false otherwise.
35 server_version = api.get_cached_server_version(server)
37 if 'sfa' in server_version:
38 code_tag = server_version['code_tag']
39 code_tag_parts = code_tag.split("-")
41 version_parts = code_tag_parts[0].split(".")
42 major, minor = version_parts[0:2]
43 rev = code_tag_parts[1]
45 if int(minor) > 0 or int(rev) > 20:
49 # we have specialized xmlrpclib.ServerProxy to remember the input url
50 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
51 def get_serverproxy_url (server):
53 return server.get_url()
55 logger.warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
56 return server._ServerProxy__host + server._ServerProxy__handler
59 # peers explicitly in aggregates.xml
60 peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
61 if peername != api.hrn])
62 version_manager = VersionManager()
63 ad_rspec_versions = []
64 request_rspec_versions = []
65 for rspec_version in version_manager.versions:
66 if rspec_version.content_type in ['*', 'ad']:
67 ad_rspec_versions.append(rspec_version.to_dict())
68 if rspec_version.content_type in ['*', 'request']:
69 request_rspec_versions.append(rspec_version.to_dict())
70 default_rspec_version = version_manager.get_version("sfa 1").to_dict()
71 xrn=Xrn(api.hrn, 'authority+sa')
72 version_more = {'interface':'slicemgr',
73 'hrn' : xrn.get_hrn(),
74 'urn' : xrn.get_urn(),
76 'request_rspec_versions': request_rspec_versions,
77 'ad_rspec_versions': ad_rspec_versions,
78 'default_ad_rspec': default_rspec_version
80 sm_version=version_core(version_more)
81 # local aggregate if present needs to have localhost resolved
82 if api.hrn in api.aggregates:
83 local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
84 sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
87 def drop_slicemgr_stats(rspec):
89 stats_elements = rspec.xml.xpath('//statistics')
90 for node in stats_elements:
91 node.getparent().remove(node)
93 api.logger.warn("drop_slicemgr_stats failed: %s " % (str(e)))
95 def add_slicemgr_stat(rspec, callname, aggname, elapsed, status):
97 stats_tags = rspec.xml.xpath('//statistics[@call="%s"]' % callname)
99 stats_tag = stats_tags[0]
101 stats_tag = etree.SubElement(rspec.xml.root, "statistics", call=callname)
103 etree.SubElement(stats_tag, "aggregate", name=str(aggname), elapsed=str(elapsed), status=str(status))
105 api.logger.warn("add_slicemgr_stat failed on %s: %s" %(aggname, str(e)))
107 def ListResources(api, creds, options, call_id):
108 version_manager = VersionManager()
109 def _ListResources(aggregate, server, credential, opts, call_id):
112 args = [credential, my_opts]
115 if _call_id_supported(api, server):
117 version = api.get_cached_server_version(server)
118 # force ProtoGENI aggregates to give us a v2 RSpec
119 if 'sfa' not in version.keys():
120 my_opts['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict()
121 rspec = server.ListResources(*args)
122 return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
124 api.logger.log_exc("ListResources failed at %s" %(server.url))
125 return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
127 if Callids().already_handled(call_id): return ""
129 # get slice's hrn from options
130 xrn = options.get('geni_slice_urn', '')
131 (hrn, type) = urn_to_hrn(xrn)
132 if 'geni_compressed' in options:
133 del(options['geni_compressed'])
135 # get the rspec's return format from options
136 rspec_version = version_manager.get_version(options.get('rspec_version'))
137 version_string = "rspec_%s" % (rspec_version.to_string())
139 # look in cache first
140 if caching and api.cache and not xrn:
141 rspec = api.cache.get(version_string)
145 # get the callers hrn
146 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
147 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
149 # attempt to use delegated credential first
150 cred = api.getDelegatedCredential(creds)
152 cred = api.getCredential()
153 threads = ThreadManager()
154 for aggregate in api.aggregates:
155 # prevent infinite loop. Dont send request back to caller
156 # unless the caller is the aggregate's SM
157 if caller_hrn == aggregate and aggregate != api.hrn:
160 # get the rspec from the aggregate
161 interface = api.aggregates[aggregate]
162 server = api.get_server(interface, cred)
163 threads.run(_ListResources, aggregate, server, [cred], options, call_id)
166 results = threads.get_results()
167 rspec_version = version_manager.get_version(options.get('rspec_version'))
169 result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'manifest')
171 result_version = version_manager._get_version(rspec_version.type, rspec_version.version, 'ad')
172 rspec = RSpec(version=result_version)
173 for result in results:
174 add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"])
175 if result["status"]=="success":
177 rspec.version.merge(result["rspec"])
179 api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec")
182 if caching and api.cache and not xrn:
183 api.cache.add(version_string, rspec.toxml())
188 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
190 version_manager = VersionManager()
191 def _CreateSliver(aggregate, server, xrn, credential, rspec, users, call_id):
194 # Need to call GetVersion at an aggregate to determine the supported
195 # rspec type/format beofre calling CreateSliver at an Aggregate.
196 server_version = api.get_cached_server_version(server)
197 requested_users = users
198 if 'sfa' not in server_version and 'geni_api' in server_version:
199 # sfa aggregtes support both sfa and pg rspecs, no need to convert
200 # if aggregate supports sfa rspecs. otherwise convert to pg rspec
201 rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request'))
202 filter = {'component_manager_id': server_version['urn']}
204 rspec = rspec.toxml()
205 requested_users = sfa_to_pg_users_arg(users)
206 args = [xrn, credential, rspec, requested_users]
207 if _call_id_supported(api, server):
209 rspec = server.CreateSliver(*args)
210 return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"}
212 logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url)
213 return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception"}
215 if Callids().already_handled(call_id): return ""
216 # Validate the RSpec against PlanetLab's schema --disabled for now
217 # The schema used here needs to aggregate the PL and VINI schemas
218 # schema = "/var/www/html/schemas/pl.rng"
219 rspec = RSpec(rspec_str)
222 rspec.validate(schema)
224 # if there is a <statistics> section, the aggregates don't care about it,
226 drop_slicemgr_stats(rspec)
228 # attempt to use delegated credential first
229 cred = api.getDelegatedCredential(creds)
231 cred = api.getCredential()
233 # get the callers hrn
234 hrn, type = urn_to_hrn(xrn)
235 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
236 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
237 threads = ThreadManager()
238 for aggregate in api.aggregates:
239 # prevent infinite loop. Dont send request back to caller
240 # unless the caller is the aggregate's SM
241 if caller_hrn == aggregate and aggregate != api.hrn:
243 interface = api.aggregates[aggregate]
244 server = api.get_server(interface, cred)
245 # Just send entire RSpec to each aggregate
246 threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, call_id)
248 results = threads.get_results()
249 manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest')
250 result_rspec = RSpec(version=manifest_version)
251 for result in results:
252 add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], result["status"])
253 if result["status"]=="success":
255 result_rspec.version.merge(result["rspec"])
257 api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec")
258 return result_rspec.toxml()
260 def RenewSliver(api, xrn, creds, expiration_time, call_id):
261 def _RenewSliver(server, xrn, creds, expiration_time, call_id):
262 server_version = api.get_cached_server_version(server)
263 args = [xrn, creds, expiration_time, call_id]
264 if _call_id_supported(api, server):
266 return server.RenewSliver(*args)
268 if Callids().already_handled(call_id): return True
270 (hrn, type) = urn_to_hrn(xrn)
271 # get the callers hrn
272 valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
273 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
275 # attempt to use delegated credential first
276 cred = api.getDelegatedCredential(creds)
278 cred = api.getCredential()
279 threads = ThreadManager()
280 for aggregate in api.aggregates:
281 # prevent infinite loop. Dont send request back to caller
282 # unless the caller is the aggregate's SM
283 if caller_hrn == aggregate and aggregate != api.hrn:
285 interface = api.aggregates[aggregate]
286 server = api.get_server(interface, cred)
287 threads.run(_RenewSliver, server, xrn, [cred], expiration_time, call_id)
289 return reduce (lambda x,y: x and y, threads.get_results() , True)
291 def DeleteSliver(api, xrn, creds, call_id):
292 def _DeleteSliver(server, xrn, creds, call_id):
293 server_version = api.get_cached_server_version(server)
295 if _call_id_supported(api, server):
297 return server.DeleteSliver(*args)
299 if Callids().already_handled(call_id): return ""
300 (hrn, type) = urn_to_hrn(xrn)
301 # get the callers hrn
302 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
303 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
305 # attempt to use delegated credential first
306 cred = api.getDelegatedCredential(creds)
308 cred = api.getCredential()
309 threads = ThreadManager()
310 for aggregate in api.aggregates:
311 # prevent infinite loop. Dont send request back to caller
312 # unless the caller is the aggregate's SM
313 if caller_hrn == aggregate and aggregate != api.hrn:
315 interface = api.aggregates[aggregate]
316 server = api.get_server(interface, cred)
317 threads.run(_DeleteSliver, server, xrn, [cred], call_id)
318 threads.get_results()
322 # first draft at a merging SliverStatus
323 def SliverStatus(api, slice_xrn, creds, call_id):
324 def _SliverStatus(server, xrn, creds, call_id):
325 server_version = api.get_cached_server_version(server)
327 if _call_id_supported(api, server):
329 return server.SliverStatus(*args)
331 if Callids().already_handled(call_id): return {}
332 # attempt to use delegated credential first
333 cred = api.getDelegatedCredential(creds)
335 cred = api.getCredential()
336 threads = ThreadManager()
337 for aggregate in api.aggregates:
338 interface = api.aggregates[aggregate]
339 server = api.get_server(interface, cred)
340 threads.run (_SliverStatus, server, slice_xrn, [cred], call_id)
341 results = threads.get_results()
343 # get rid of any void result - e.g. when call_id was hit where by convention we return {}
344 results = [ result for result in results if result and result['geni_resources']]
346 # do not try to combine if there's no result
347 if not results : return {}
349 # otherwise let's merge stuff
352 # mmh, it is expected that all results carry the same urn
353 overall['geni_urn'] = results[0]['geni_urn']
354 overall['pl_login'] = results[0]['pl_login']
355 # append all geni_resources
356 overall['geni_resources'] = \
357 reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
358 overall['status'] = 'unknown'
359 if overall['geni_resources']:
360 overall['status'] = 'ready'
366 def ListSlices(api, creds, call_id):
367 def _ListSlices(server, creds, call_id):
368 server_version = api.get_cached_server_version(server)
370 if _call_id_supported(api, server):
372 return server.ListSlices(*args)
374 if Callids().already_handled(call_id): return []
376 # look in cache first
377 if caching and api.cache:
378 slices = api.cache.get('slices')
382 # get the callers hrn
383 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
384 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
386 # attempt to use delegated credential first
387 cred= api.getDelegatedCredential(creds)
389 cred = api.getCredential()
390 threads = ThreadManager()
391 # fetch from aggregates
392 for aggregate in api.aggregates:
393 # prevent infinite loop. Dont send request back to caller
394 # unless the caller is the aggregate's SM
395 if caller_hrn == aggregate and aggregate != api.hrn:
397 interface = api.aggregates[aggregate]
398 server = api.get_server(interface, cred)
399 threads.run(_ListSlices, server, [cred], call_id)
402 results = threads.get_results()
404 for result in results:
405 slices.extend(result)
408 if caching and api.cache:
409 api.cache.add('slices', slices)
414 def get_ticket(api, xrn, creds, rspec, users):
415 slice_hrn, type = urn_to_hrn(xrn)
416 # get the netspecs contained within the clients rspec
417 aggregate_rspecs = {}
418 tree= etree.parse(StringIO(rspec))
419 elements = tree.findall('./network')
420 for element in elements:
421 aggregate_hrn = element.values()[0]
422 aggregate_rspecs[aggregate_hrn] = rspec
424 # get the callers hrn
425 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
426 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
428 # attempt to use delegated credential first
429 cred = api.getDelegatedCredential(creds)
431 cred = api.getCredential()
432 threads = ThreadManager()
433 for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
434 # prevent infinite loop. Dont send request back to caller
435 # unless the caller is the aggregate's SM
436 if caller_hrn == aggregate and aggregate != api.hrn:
439 interface = api.aggregates[aggregate]
440 server = api.get_server(interface, cred)
441 threads.run(server.GetTicket, xrn, [cred], aggregate_rspec, users)
443 results = threads.get_results()
445 # gather information from each ticket
450 for result in results:
451 agg_ticket = SfaTicket(string=result)
452 attrs = agg_ticket.get_attributes()
454 object_gid = agg_ticket.get_gid_object()
455 rspecs.append(agg_ticket.get_rspec())
456 initscripts.extend(attrs.get('initscripts', []))
457 slivers.extend(attrs.get('slivers', []))
460 attributes = {'initscripts': initscripts,
462 merged_rspec = merge_rspecs(rspecs)
464 # create a new ticket
465 ticket = SfaTicket(subject = slice_hrn)
466 ticket.set_gid_caller(api.auth.client_gid)
467 ticket.set_issuer(key=api.key, subject=api.hrn)
468 ticket.set_gid_object(object_gid)
469 ticket.set_pubkey(object_gid.get_pubkey())
470 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
471 ticket.set_attributes(attributes)
472 ticket.set_rspec(merged_rspec)
475 return ticket.save_to_string(save_parents=True)
477 def start_slice(api, xrn, creds):
478 hrn, type = urn_to_hrn(xrn)
480 # get the callers hrn
481 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
482 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
484 # attempt to use delegated credential first
485 cred = api.getDelegatedCredential(creds)
487 cred = api.getCredential()
488 threads = ThreadManager()
489 for aggregate in api.aggregates:
490 # prevent infinite loop. Dont send request back to caller
491 # unless the caller is the aggregate's SM
492 if caller_hrn == aggregate and aggregate != api.hrn:
494 interface = api.aggregates[aggregate]
495 server = api.get_server(interface, cred)
496 threads.run(server.Start, xrn, cred)
497 threads.get_results()
500 def stop_slice(api, xrn, creds):
501 hrn, type = urn_to_hrn(xrn)
503 # get the callers hrn
504 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
505 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
507 # attempt to use delegated credential first
508 cred = api.getDelegatedCredential(creds)
510 cred = api.getCredential()
511 threads = ThreadManager()
512 for aggregate in api.aggregates:
513 # prevent infinite loop. Dont send request back to caller
514 # unless the caller is the aggregate's SM
515 if caller_hrn == aggregate and aggregate != api.hrn:
517 interface = api.aggregates[aggregate]
518 server = api.get_server(interface, cred)
519 threads.run(server.Stop, xrn, cred)
520 threads.get_results()
523 def reset_slice(api, xrn):
529 def shutdown(api, xrn, creds):
535 def status(api, xrn, creds):
543 r.parseFile(sys.argv[1])
545 CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice')
547 if __name__ == "__main__":