4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.rspecs.pg_rspec import PGRSpec
19 from sfa.rspecs.sfa_rspec import SfaRSpec
20 from sfa.rspecs.pg_rspec_converter import PGRSpecConverter
21 from sfa.rspecs.rspec_parser import parse_rspec
22 from sfa.util.policy import Policy
23 from sfa.util.prefixTree import prefixTree
24 from sfa.util.sfaticket import *
25 from sfa.trust.credential import Credential
26 from sfa.util.threadmanager import ThreadManager
27 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
28 import sfa.plc.peers as peers
29 from sfa.util.version import version_core
30 from sfa.rspecs.rspec_version import RSpecVersion
31 from sfa.util.callids import Callids
33 # we have specialized xmlrpclib.ServerProxy to remember the input url
34 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
35 def get_serverproxy_url (server):
39 sfa_logger().warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
40 return server._ServerProxy__host + server._ServerProxy__handler
43 # peers explicitly in aggregates.xml
44 peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.iteritems()
45 if peername != api.hrn])
47 sm_version=version_core({'interface':'slicemgr',
48 'hrn' : xrn.get_hrn(),
49 'urn' : xrn.get_urn(),
52 # local aggregate if present needs to have localhost resolved
53 if api.hrn in api.aggregates:
54 local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
55 sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
58 def CreateSliver(api, xrn, creds, rspec, users, call_id):
60 if Callids().already_handled(call_id): return ""
62 hrn, type = urn_to_hrn(xrn)
64 # Validate the RSpec against PlanetLab's schema --disabled for now
65 # The schema used here needs to aggregate the PL and VINI schemas
66 # schema = "/var/www/html/schemas/pl.rng"
70 tree = etree.parse(StringIO(rspec))
71 except etree.XMLSyntaxError:
72 message = str(sys.exc_info()[1])
73 raise InvalidRSpec(message)
75 relaxng_doc = etree.parse(schema)
76 relaxng = etree.RelaxNG(relaxng_doc)
79 error = relaxng.error_log.last_error
80 message = "%s (line %s)" % (error.message, error.line)
81 raise InvalidRSpec(message)
84 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
85 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
87 # attempt to use delegated credential first
88 credential = api.getDelegatedCredential(creds)
90 credential = api.getCredential()
91 threads = ThreadManager()
92 for aggregate in api.aggregates:
93 # prevent infinite loop. Dont send request back to caller
94 # unless the caller is the aggregate's SM
95 if caller_hrn == aggregate and aggregate != api.hrn:
98 # Just send entire RSpec to each aggregate
99 server = api.aggregates[aggregate]
100 threads.run(server.CreateSliver, xrn, credential, rspec, users, call_id)
102 results = threads.get_results()
104 for result in results:
108 def RenewSliver(api, xrn, creds, expiration_time, call_id):
109 if Callids().already_handled(call_id): return True
111 (hrn, type) = urn_to_hrn(xrn)
112 # get the callers hrn
113 valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
114 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
116 # attempt to use delegated credential first
117 credential = api.getDelegatedCredential(creds)
119 credential = api.getCredential()
120 threads = ThreadManager()
121 for aggregate in api.aggregates:
122 # prevent infinite loop. Dont send request back to caller
123 # unless the caller is the aggregate's SM
124 if caller_hrn == aggregate and aggregate != api.hrn:
127 server = api.aggregates[aggregate]
128 threads.run(server.RenewSliver, xrn, [credential], expiration_time, call_id)
130 return reduce (lambda x,y: x and y, threads.get_results() , True)
132 def get_ticket(api, xrn, creds, rspec, users):
133 slice_hrn, type = urn_to_hrn(xrn)
134 # get the netspecs contained within the clients rspec
135 aggregate_rspecs = {}
136 tree= etree.parse(StringIO(rspec))
137 elements = tree.findall('./network')
138 for element in elements:
139 aggregate_hrn = element.values()[0]
140 aggregate_rspecs[aggregate_hrn] = rspec
142 # get the callers hrn
143 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
144 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
146 # attempt to use delegated credential first
147 credential = api.getDelegatedCredential(creds)
149 credential = api.getCredential()
150 threads = ThreadManager()
151 for (aggregate, aggregate_rspec) in aggregate_rspecs.iteritems():
152 # prevent infinite loop. Dont send request back to caller
153 # unless the caller is the aggregate's SM
154 if caller_hrn == aggregate and aggregate != api.hrn:
157 if aggregate in api.aggregates:
158 server = api.aggregates[aggregate]
160 net_urn = hrn_to_urn(aggregate, 'authority')
161 # we may have a peer that knows about this aggregate
162 for agg in api.aggregates:
163 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
164 if not target_aggs or not 'hrn' in target_aggs[0]:
166 # send the request to this address
167 url = target_aggs[0]['url']
168 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
169 # aggregate found, no need to keep looping
173 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
175 results = threads.get_results()
177 # gather information from each ticket
182 for result in results:
183 agg_ticket = SfaTicket(string=result)
184 attrs = agg_ticket.get_attributes()
186 object_gid = agg_ticket.get_gid_object()
187 rspecs.append(agg_ticket.get_rspec())
188 initscripts.extend(attrs.get('initscripts', []))
189 slivers.extend(attrs.get('slivers', []))
192 attributes = {'initscripts': initscripts,
194 merged_rspec = merge_rspecs(rspecs)
196 # create a new ticket
197 ticket = SfaTicket(subject = slice_hrn)
198 ticket.set_gid_caller(api.auth.client_gid)
199 ticket.set_issuer(key=api.key, subject=api.hrn)
200 ticket.set_gid_object(object_gid)
201 ticket.set_pubkey(object_gid.get_pubkey())
202 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
203 ticket.set_attributes(attributes)
204 ticket.set_rspec(merged_rspec)
207 return ticket.save_to_string(save_parents=True)
210 def DeleteSliver(api, xrn, creds, call_id):
211 if Callids().already_handled(call_id): return ""
212 (hrn, type) = urn_to_hrn(xrn)
213 # get the callers hrn
214 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
215 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
217 # attempt to use delegated credential first
218 credential = api.getDelegatedCredential(creds)
220 credential = api.getCredential()
221 threads = ThreadManager()
222 for aggregate in api.aggregates:
223 # prevent infinite loop. Dont send request back to caller
224 # unless the caller is the aggregate's SM
225 if caller_hrn == aggregate and aggregate != api.hrn:
227 server = api.aggregates[aggregate]
228 threads.run(server.DeleteSliver, xrn, credential, call_id)
229 threads.get_results()
232 def start_slice(api, xrn, creds):
233 hrn, type = urn_to_hrn(xrn)
235 # get the callers hrn
236 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
237 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
239 # attempt to use delegated credential first
240 credential = api.getDelegatedCredential(creds)
242 credential = api.getCredential()
243 threads = ThreadManager()
244 for aggregate in api.aggregates:
245 # prevent infinite loop. Dont send request back to caller
246 # unless the caller is the aggregate's SM
247 if caller_hrn == aggregate and aggregate != api.hrn:
249 server = api.aggregates[aggregate]
250 threads.run(server.Start, xrn, credential)
251 threads.get_results()
254 def stop_slice(api, xrn, creds):
255 hrn, type = urn_to_hrn(xrn)
257 # get the callers hrn
258 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
259 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
261 # attempt to use delegated credential first
262 credential = api.getDelegatedCredential(creds)
264 credential = api.getCredential()
265 threads = ThreadManager()
266 for aggregate in api.aggregates:
267 # prevent infinite loop. Dont send request back to caller
268 # unless the caller is the aggregate's SM
269 if caller_hrn == aggregate and aggregate != api.hrn:
271 server = api.aggregates[aggregate]
272 threads.run(server.Stop, xrn, credential)
273 threads.get_results()
276 def reset_slice(api, xrn):
282 def shutdown(api, xrn, creds):
288 def status(api, xrn, creds):
294 # Thierry : caching at the slicemgr level makes sense to some extent
297 def ListSlices(api, creds, call_id):
299 if Callids().already_handled(call_id): return []
301 # look in cache first
302 if caching and api.cache:
303 slices = api.cache.get('slices')
307 # get the callers hrn
308 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
309 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
311 # attempt to use delegated credential first
312 credential = api.getDelegatedCredential(creds)
314 credential = api.getCredential()
315 threads = ThreadManager()
316 # fetch from aggregates
317 for aggregate in api.aggregates:
318 # prevent infinite loop. Dont send request back to caller
319 # unless the caller is the aggregate's SM
320 if caller_hrn == aggregate and aggregate != api.hrn:
322 server = api.aggregates[aggregate]
323 threads.run(server.ListSlices, credential, call_id)
326 results = threads.get_results()
328 for result in results:
329 slices.extend(result)
332 if caching and api.cache:
333 api.cache.add('slices', slices)
338 def ListResources(api, creds, options, call_id):
340 if Callids().already_handled(call_id): return ""
342 # get slice's hrn from options
343 xrn = options.get('geni_slice_urn', '')
344 (hrn, type) = urn_to_hrn(xrn)
346 # get the rspec's return format from options
347 rspec_version = RSpecVersion(options.get('rspec_version', 'SFA 1'))
348 version_string = "rspec_%s_%s" % (rspec_version.format, rspec_version.version)
350 # get hrn of the original caller
351 origin_hrn = options.get('origin_hrn', None)
353 if isinstance(creds, list):
354 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
356 origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
358 # look in cache first
359 if caching and api.cache and not xrn:
360 rspec = api.cache.get(version_string)
364 # get the callers hrn
365 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
366 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
368 # attempt to use delegated credential first
369 credential = api.getDelegatedCredential(creds)
371 credential = api.getCredential()
372 threads = ThreadManager()
373 for aggregate in api.aggregates:
374 # prevent infinite loop. Dont send request back to caller
375 # unless the caller is the aggregate's SM
376 if caller_hrn == aggregate and aggregate != api.hrn:
378 # get the rspec from the aggregate
379 server = api.aggregates[aggregate]
380 my_opts = copy(options)
381 my_opts['geni_compressed'] = False
382 threads.run(server.ListResources, credential, my_opts, call_id)
383 #threads.run(server.get_resources, cred, xrn, origin_hrn)
385 results = threads.get_results()
386 #results.append(open('/root/protogeni.rspec', 'r').read())
388 for result in results:
390 tmp_rspec = parse_rspec(result)
391 if isinstance(tmp_rspec, SfaRSpec):
393 elif isinstance(tmp_rspec, PGRSpec):
394 rspec.merge(PGRSpecConverter.to_sfa_rspec(result))
396 api.logger.info("SM.ListResources: invalid aggregate rspec")
398 api.logger.info("SM.ListResources: Failed to merge aggregate rspec")
401 if caching and api.cache and not xrn:
402 api.cache.add(version_string, rspec.toxml())
406 # first draft at a merging SliverStatus
407 def SliverStatus(api, slice_xrn, creds, call_id):
408 if Callids().already_handled(call_id): return {}
409 # attempt to use delegated credential first
410 credential = api.getDelegatedCredential(creds)
412 credential = api.getCredential()
413 threads = ThreadManager()
414 for aggregate in api.aggregates:
415 server = api.aggregates[aggregate]
416 threads.run (server.SliverStatus, slice_xrn, credential, call_id)
417 results = threads.get_results()
419 # get rid of any void result - e.g. when call_id was hit where by convention we return {}
420 results = [ result for result in results if result and result['geni_resources']]
422 # do not try to combine if there's no result
423 if not results : return {}
425 # otherwise let's merge stuff
428 # mmh, it is expected that all results carry the same urn
429 overall['geni_urn'] = results[0]['geni_urn']
431 # consolidate geni_status - simple model using max on a total order
432 states = [ 'ready', 'configuring', 'failed', 'unknown' ]
434 shash = dict ( zip ( states, range(len(states)) ) )
435 def combine_status (x,y):
436 return shash [ max (shash(x),shash(y)) ]
437 overall['geni_status'] = reduce (combine_status, [ result['geni_status'] for result in results], 'ready' )
439 # {'ready':0,'configuring':1,'failed':2,'unknown':3}
440 # append all geni_resources
441 overall['geni_resources'] = \
442 reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , [])
448 r.parseFile(sys.argv[1])
450 CreateSliver(None,'plc.princeton.tmacktestslice',rspec,'create-slice-tmacktestslice')
452 if __name__ == "__main__":