4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
24 import sfa.plc.peers as peers
25 from sfa.util.version import version_core
27 # XX FIX ME: should merge result from multiple aggregates instead of
28 # calling aggregate implementation
29 from sfa.managers.aggregate_manager_pl import slice_status
32 # peers explicitly in aggregates.xml
33 peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items()
34 if peername != api.hrn])
36 sm_version=version_core({'interface':'slicemgr',
37 'hrn' : xrn.get_hrn(),
38 'urn' : xrn.get_urn(),
41 # local aggregate if present needs to have localhost resolved
42 if api.hrn in api.aggregates:
43 local_am_url=api.aggregates[api.hrn]._ServerProxy__host
44 sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
47 def create_slice(api, xrn, creds, rspec, users):
48 hrn, type = urn_to_hrn(xrn)
50 # Validate the RSpec against PlanetLab's schema --disabled for now
51 # The schema used here needs to aggregate the PL and VINI schemas
52 # schema = "/var/www/html/schemas/pl.rng"
56 tree = etree.parse(StringIO(rspec))
57 except etree.XMLSyntaxError:
58 message = str(sys.exc_info()[1])
59 raise InvalidRSpec(message)
61 relaxng_doc = etree.parse(schema)
62 relaxng = etree.RelaxNG(relaxng_doc)
65 error = relaxng.error_log.last_error
66 message = "%s (line %s)" % (error.message, error.line)
67 raise InvalidRSpec(message)
70 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
71 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
73 # attempt to use delegated credential first
74 credential = api.getDelegatedCredential(creds)
76 credential = api.getCredential()
77 threads = ThreadManager()
78 for aggregate in api.aggregates:
79 # prevent infinite loop. Dont send request back to caller
80 # unless the caller is the aggregate's SM
81 if caller_hrn == aggregate and aggregate != api.hrn:
84 # Just send entire RSpec to each aggregate
85 server = api.aggregates[aggregate]
86 threads.run(server.CreateSliver, xrn, credential, rspec, users)
88 results = threads.get_results()
89 merged_rspec = merge_rspecs(results)
92 def renew_slice(api, xrn, creds, expiration_time):
93 hrn, type = urn_to_hrn(xrn)
96 valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
97 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
99 # attempt to use delegated credential first
100 credential = api.getDelegatedCredential(creds)
102 credential = api.getCredential()
103 threads = ThreadManager()
104 for aggregate in api.aggregates:
105 # prevent infinite loop. Dont send request back to caller
106 # unless the caller is the aggregate's SM
107 if caller_hrn == aggregate and aggregate != api.hrn:
110 server = api.aggregates[aggregate]
111 threads.run(server.RenewSliver, xrn, [credential], expiration_time)
112 threads.get_results()
115 def get_ticket(api, xrn, creds, rspec, users):
116 slice_hrn, type = urn_to_hrn(xrn)
117 # get the netspecs contained within the clients rspec
118 aggregate_rspecs = {}
119 tree= etree.parse(StringIO(rspec))
120 elements = tree.findall('./network')
121 for element in elements:
122 aggregate_hrn = element.values()[0]
123 aggregate_rspecs[aggregate_hrn] = rspec
125 # get the callers hrn
126 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
127 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
129 # attempt to use delegated credential first
130 credential = api.getDelegatedCredential(creds)
132 credential = api.getCredential()
133 threads = ThreadManager()
134 for aggregate, aggregate_rspec in aggregate_rspecs.items():
135 # prevent infinite loop. Dont send request back to caller
136 # unless the caller is the aggregate's SM
137 if caller_hrn == aggregate and aggregate != api.hrn:
140 if aggregate in api.aggregates:
141 server = api.aggregates[aggregate]
143 net_urn = hrn_to_urn(aggregate, 'authority')
144 # we may have a peer that knows about this aggregate
145 for agg in api.aggregates:
146 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
147 if not target_aggs or not 'hrn' in target_aggs[0]:
149 # send the request to this address
150 url = target_aggs[0]['url']
151 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
152 # aggregate found, no need to keep looping
156 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
158 results = threads.get_results()
160 # gather information from each ticket
165 for result in results:
166 agg_ticket = SfaTicket(string=result)
167 attrs = agg_ticket.get_attributes()
169 object_gid = agg_ticket.get_gid_object()
170 rspecs.append(agg_ticket.get_rspec())
171 initscripts.extend(attrs.get('initscripts', []))
172 slivers.extend(attrs.get('slivers', []))
175 attributes = {'initscripts': initscripts,
177 merged_rspec = merge_rspecs(rspecs)
179 # create a new ticket
180 ticket = SfaTicket(subject = slice_hrn)
181 ticket.set_gid_caller(api.auth.client_gid)
182 ticket.set_issuer(key=api.key, subject=api.hrn)
183 ticket.set_gid_object(object_gid)
184 ticket.set_pubkey(object_gid.get_pubkey())
185 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
186 ticket.set_attributes(attributes)
187 ticket.set_rspec(merged_rspec)
190 return ticket.save_to_string(save_parents=True)
193 def delete_slice(api, xrn, creds):
194 hrn, type = urn_to_hrn(xrn)
196 # get the callers hrn
197 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
198 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
200 # attempt to use delegated credential first
201 credential = api.getDelegatedCredential(creds)
203 credential = api.getCredential()
204 threads = ThreadManager()
205 for aggregate in api.aggregates:
206 # prevent infinite loop. Dont send request back to caller
207 # unless the caller is the aggregate's SM
208 if caller_hrn == aggregate and aggregate != api.hrn:
210 server = api.aggregates[aggregate]
211 threads.run(server.DeleteSliver, xrn, credential)
212 threads.get_results()
215 def start_slice(api, xrn, creds):
216 hrn, type = urn_to_hrn(xrn)
218 # get the callers hrn
219 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
220 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
222 # attempt to use delegated credential first
223 credential = api.getDelegatedCredential(creds)
225 credential = api.getCredential()
226 threads = ThreadManager()
227 for aggregate in api.aggregates:
228 # prevent infinite loop. Dont send request back to caller
229 # unless the caller is the aggregate's SM
230 if caller_hrn == aggregate and aggregate != api.hrn:
232 server = api.aggregates[aggregate]
233 threads.run(server.Start, xrn, credential)
234 threads.get_results()
237 def stop_slice(api, xrn, creds):
238 hrn, type = urn_to_hrn(xrn)
240 # get the callers hrn
241 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
242 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
244 # attempt to use delegated credential first
245 credential = api.getDelegatedCredential(creds)
247 credential = api.getCredential()
248 threads = ThreadManager()
249 for aggregate in api.aggregates:
250 # prevent infinite loop. Dont send request back to caller
251 # unless the caller is the aggregate's SM
252 if caller_hrn == aggregate and aggregate != api.hrn:
254 server = api.aggregates[aggregate]
255 threads.run(server.Stop, xrn, credential)
256 threads.get_results()
259 def reset_slice(api, xrn):
265 def shutdown(api, xrn, creds):
271 def status(api, xrn, creds):
277 def get_slices(api, creds):
279 # look in cache first
281 slices = api.cache.get('slices')
285 # get the callers hrn
286 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
287 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
289 # attempt to use delegated credential first
290 credential = api.getDelegatedCredential(creds)
292 credential = api.getCredential()
293 threads = ThreadManager()
294 # fetch from aggregates
295 for aggregate in api.aggregates:
296 # prevent infinite loop. Dont send request back to caller
297 # unless the caller is the aggregate's SM
298 if caller_hrn == aggregate and aggregate != api.hrn:
300 server = api.aggregates[aggregate]
301 threads.run(server.ListSlices, credential)
304 results = threads.get_results()
306 for result in results:
307 slices.extend(result)
311 api.cache.add('slices', slices)
315 def get_rspec(api, creds, options):
317 # get slice's hrn from options
318 xrn = options.get('geni_slice_urn', '')
319 hrn, type = urn_to_hrn(xrn)
321 # get hrn of the original caller
322 origin_hrn = options.get('origin_hrn', None)
324 if isinstance(creds, list):
325 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
327 origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
329 # look in cache first
330 if api.cache and not xrn:
331 rspec = api.cache.get('nodes')
335 hrn, type = urn_to_hrn(xrn)
337 # get the callers hrn
338 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
339 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
341 # attempt to use delegated credential first
342 credential = api.getDelegatedCredential(creds)
344 credential = api.getCredential()
345 threads = ThreadManager()
346 for aggregate in api.aggregates:
347 # prevent infinite loop. Dont send request back to caller
348 # unless the caller is the aggregate's SM
349 if caller_hrn == aggregate and aggregate != api.hrn:
351 # get the rspec from the aggregate
352 server = api.aggregates[aggregate]
353 my_opts = copy(options)
354 my_opts['geni_compressed'] = False
355 threads.run(server.ListResources, credential, my_opts)
356 #threads.run(server.get_resources, cred, xrn, origin_hrn)
358 results = threads.get_results()
359 merged_rspec = merge_rspecs(results)
362 if api.cache and not xrn:
363 api.cache.add('nodes', merged_rspec)
369 r.parseFile(sys.argv[1])
371 create_slice(None,'plc.princeton.tmacktestslice',rspec)
373 if __name__ == "__main__":