4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.rspecs.pg_rspec import PGRSpec
19 from sfa.rspecs.sfa_rspec import SfaRSpec
20 from sfa.rspecs.rspec_converter import RSpecConverter
21 from sfa.rspecs.rspec_parser import parse_rspec
22 from sfa.util.policy import Policy
23 from sfa.util.prefixTree import prefixTree
24 from sfa.util.sfaticket import *
25 from sfa.trust.credential import Credential
26 from sfa.util.threadmanager import ThreadManager
27 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
28 import sfa.plc.peers as peers
29 from sfa.util.version import version_core
30 from sfa.rspecs.rspec_version import RSpecVersion
31 from sfa.util.callids import Callids
33 # we have specialized xmlrpclib.ServerProxy to remember the input url
34 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
35 def get_serverproxy_url (server):
39 sfa_logger().warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
40 return server._ServerProxy__host + server._ServerProxy__handler
43 # peers explicitly in aggregates.xml
44 peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
45 if peername != api.hrn])
47 sm_version=version_core({'interface':'slicemgr',
48 'hrn' : xrn.get_hrn(),
49 'urn' : xrn.get_urn(),
52 # local aggregate if present needs to have localhost resolved
53 if api.hrn in api.aggregates:
54 local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
55 sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
58 def CreateSliver(api, xrn, creds, rspec_str, users, call_id):
60 def _CreateSliver(server, xrn, credentail, rspec, users, call_id):
61 # get aggregate version
62 version = server.GetVersion()
64 # just send the whole rspec to SFA AM/SM
65 server.CreateSliver(xrn, credential, rspec, users, call_id)
66 elif 'geni_api' in version:
71 if Callids().already_handled(call_id): return ""
73 # Validate the RSpec against PlanetLab's schema --disabled for now
74 # The schema used here needs to aggregate the PL and VINI schemas
75 # schema = "/var/www/html/schemas/pl.rng"
76 rspec = parse_rspec(rspec_str)
79 rspec.validate(schema)
81 # attempt to use delegated credential first
82 credential = api.getDelegatedCredential(creds)
84 credential = api.getCredential()
87 hrn, type = urn_to_hrn(xrn)
88 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
89 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
90 threads = ThreadManager()
91 for aggregate in api.aggregates:
92 # prevent infinite loop. Dont send request back to caller
93 # unless the caller is the aggregate's SM
94 if caller_hrn == aggregate and aggregate != api.hrn:
97 # Just send entire RSpec to each aggregate
98 server = api.aggregates[aggregate]
99 threads.run(_CreateSliver, server, xrn, credential, rspec.toxml(), users, call_id)
101 results = threads.get_results()
103 for result in results:
107 def RenewSliver(api, xrn, creds, expiration_time, call_id):
108 if Callids().already_handled(call_id): return True
110 (hrn, type) = urn_to_hrn(xrn)
111 # get the callers hrn
112 valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
113 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
115 # attempt to use delegated credential first
116 credential = api.getDelegatedCredential(creds)
118 credential = api.getCredential()
119 threads = ThreadManager()
120 for aggregate in api.aggregates:
121 # prevent infinite loop. Dont send request back to caller
122 # unless the caller is the aggregate's SM
123 if caller_hrn == aggregate and aggregate != api.hrn:
126 server = api.aggregates[aggregate]
127 threads.run(server.RenewSliver, xrn, [credential], expiration_time, call_id)
129 return reduce (lambda x,y: x and y, threads.get_results() , True)
131 def get_ticket(api, xrn, creds, rspec, users):
132 slice_hrn, type = urn_to_hrn(xrn)
133 # get the netspecs contained within the clients rspec
134 aggregate_rspecs = {}
135 tree= etree.parse(StringIO(rspec))
136 elements = tree.findall('./network')
137 for element in elements:
138 aggregate_hrn = element.values()[0]
139 aggregate_rspecs[aggregate_hrn] = rspec
141 # get the callers hrn
142 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
143 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
145 # attempt to use delegated credential first
146 credential = api.getDelegatedCredential(creds)
148 credential = api.getCredential()
149 threads = ThreadManager()
150 for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
151 # prevent infinite loop. Dont send request back to caller
152 # unless the caller is the aggregate's SM
153 if caller_hrn == aggregate and aggregate != api.hrn:
156 if aggregate in api.aggregates:
157 server = api.aggregates[aggregate]
159 net_urn = hrn_to_urn(aggregate, 'authority')
160 # we may have a peer that knows about this aggregate
161 for agg in api.aggregates:
162 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
163 if not target_aggs or not 'hrn' in target_aggs[0]:
165 # send the request to this address
166 url = target_aggs[0]['url']
167 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
168 # aggregate found, no need to keep looping
172 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
174 results = threads.get_results()
176 # gather information from each ticket
181 for result in results:
182 agg_ticket = SfaTicket(string=result)
183 attrs = agg_ticket.get_attributes()
185 object_gid = agg_ticket.get_gid_object()
186 rspecs.append(agg_ticket.get_rspec())
187 initscripts.extend(attrs.get('initscripts', []))
188 slivers.extend(attrs.get('slivers', []))
191 attributes = {'initscripts': initscripts,
193 merged_rspec = merge_rspecs(rspecs)
195 # create a new ticket
196 ticket = SfaTicket(subject = slice_hrn)
197 ticket.set_gid_caller(api.auth.client_gid)
198 ticket.set_issuer(key=api.key, subject=api.hrn)
199 ticket.set_gid_object(object_gid)
200 ticket.set_pubkey(object_gid.get_pubkey())
201 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
202 ticket.set_attributes(attributes)
203 ticket.set_rspec(merged_rspec)
206 return ticket.save_to_string(save_parents=True)
209 def DeleteSliver(api, xrn, creds, call_id):
210 if Callids().already_handled(call_id): return ""
211 (hrn, type) = urn_to_hrn(xrn)
212 # get the callers hrn
213 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
214 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
216 # attempt to use delegated credential first
217 credential = api.getDelegatedCredential(creds)
219 credential = api.getCredential()
220 threads = ThreadManager()
221 for aggregate in api.aggregates:
222 # prevent infinite loop. Dont send request back to caller
223 # unless the caller is the aggregate's SM
224 if caller_hrn == aggregate and aggregate != api.hrn:
226 server = api.aggregates[aggregate]
227 threads.run(server.DeleteSliver, xrn, credential, call_id)
228 threads.get_results()
231 def start_slice(api, xrn, creds):
232 hrn, type = urn_to_hrn(xrn)
234 # get the callers hrn
235 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
236 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
238 # attempt to use delegated credential first
239 credential = api.getDelegatedCredential(creds)
241 credential = api.getCredential()
242 threads = ThreadManager()
243 for aggregate in api.aggregates:
244 # prevent infinite loop. Dont send request back to caller
245 # unless the caller is the aggregate's SM
246 if caller_hrn == aggregate and aggregate != api.hrn:
248 server = api.aggregates[aggregate]
249 threads.run(server.Start, xrn, credential)
250 threads.get_results()
253 def stop_slice(api, xrn, creds):
254 hrn, type = urn_to_hrn(xrn)
256 # get the callers hrn
257 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
258 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
260 # attempt to use delegated credential first
261 credential = api.getDelegatedCredential(creds)
263 credential = api.getCredential()
264 threads = ThreadManager()
265 for aggregate in api.aggregates:
266 # prevent infinite loop. Dont send request back to caller
267 # unless the caller is the aggregate's SM
268 if caller_hrn == aggregate and aggregate != api.hrn:
270 server = api.aggregates[aggregate]
271 threads.run(server.Stop, xrn, credential)
272 threads.get_results()
275 def reset_slice(api, xrn):
281 def shutdown(api, xrn, creds):
287 def status(api, xrn, creds):
293 # Thierry : caching at the slicemgr level makes sense to some extent
296 def ListSlices(api, creds, call_id):
298 if Callids().already_handled(call_id): return []
300 # look in cache first
301 if caching and api.cache:
302 slices = api.cache.get('slices')
306 # get the callers hrn
307 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
308 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
310 # attempt to use delegated credential first
311 credential = api.getDelegatedCredential(creds)
313 credential = api.getCredential()
314 threads = ThreadManager()
315 # fetch from aggregates
316 for aggregate in api.aggregates:
317 # prevent infinite loop. Dont send request back to caller
318 # unless the caller is the aggregate's SM
319 if caller_hrn == aggregate and aggregate != api.hrn:
321 server = api.aggregates[aggregate]
322 threads.run(server.ListSlices, credential, call_id)
325 results = threads.get_results()
327 for result in results:
328 slices.extend(result)
331 if caching and api.cache:
332 api.cache.add('slices', slices)
337 def ListResources(api, creds, options, call_id):
339 if Callids().already_handled(call_id): return ""
341 # get slice's hrn from options
342 xrn = options.get('geni_slice_urn', '')
343 (hrn, type) = urn_to_hrn(xrn)
345 # get the rspec's return format from options
346 rspec_version = RSpecVersion(options.get('rspec_version', 'SFA 1'))
347 version_string = "rspec_%s_%s" % (rspec_version.format, rspec_version.version)
349 # look in cache first
350 if caching and api.cache and not xrn:
351 rspec = api.cache.get(version_string)
355 # get the callers hrn
356 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
357 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
359 # attempt to use delegated credential first
360 credential = api.getDelegatedCredential(creds)
362 credential = api.getCredential()
363 threads = ThreadManager()
364 for aggregate in api.aggregates:
365 # prevent infinite loop. Dont send request back to caller
366 # unless the caller is the aggregate's SM
367 if caller_hrn == aggregate and aggregate != api.hrn:
369 # get the rspec from the aggregate
370 server = api.aggregates[aggregate]
371 my_opts = copy(options)
372 my_opts['geni_compressed'] = False
373 threads.run(server.ListResources, credential, my_opts, call_id)
375 results = threads.get_results()
376 #results.append(open('/root/protogeni.rspec', 'r').read())
378 for result in results:
380 tmp_rspec = parse_rspec(result)
381 if isinstance(tmp_rspec, SfaRSpec):
383 elif isinstance(tmp_rspec, PGRSpec):
384 rspec.merge(RSpecConverter.to_sfa_rspec(result))
386 api.logger.info("SM.ListResources: invalid aggregate rspec")
388 api.logger.info("SM.ListResources: Failed to merge aggregate rspec")
391 if caching and api.cache and not xrn:
392 api.cache.add(version_string, rspec.toxml())
396 # first draft at a merging SliverStatus
397 def SliverStatus(api, slice_xrn, creds, call_id):
398 if Callids().already_handled(call_id): return {}
399 # attempt to use delegated credential first
400 credential = api.getDelegatedCredential(creds)
402 credential = api.getCredential()
403 threads = ThreadManager()
404 for aggregate in api.aggregates:
405 server = api.aggregates[aggregate]
406 threads.run (server.SliverStatus, slice_xrn, credential, call_id)
407 results = threads.get_results()
409 # get rid of any void result - e.g. when call_id was hit where by convention we return {}
410 results = [ result for result in results if result and result['geni_resources']]
412 # do not try to combine if there's no result
413 if not results : return {}
415 # otherwise let's merge stuff
418 # mmh, it is expected that all results carry the same urn
419 overall['geni_urn'] = results[0]['geni_urn']
421 # consolidate geni_status - simple model using max on a total order
422 states = [ 'ready', 'configuring', 'failed', 'unknown' ]
424 shash = dict ( zip ( states, range(len(states)) ) )
425 def combine_status (x,y):
426 return shash [ max (shash(x),shash(y)) ]
427 overall['geni_status'] = reduce (combine_status, [ result['geni_status'] for result in results], 'ready' )
429 # {'ready':0,'configuring':1,'failed':2,'unknown':3}
430 # append all geni_resources
431 overall['geni_resources'] = \
432 reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
438 r.parseFile(sys.argv[1])
440 CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice')
442 if __name__ == "__main__":