4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
24 import sfa.plc.peers as peers
25 from sfa.util.version import version_core
27 # XX FIX ME: should merge result from multiple aggregates instead of
28 # calling aggregate implementation
29 from sfa.managers.aggregate_manager_pl import slice_status
31 # we have specialized xmlrpclib.ServerProxy to remember the input url
32 # OTOH it's not clear if we're only dealing with XMLRPCServerProxy instances
33 def get_serverproxy_url (server):
37 sfa_logger().warning("GetVersion, falling back to xmlrpclib.ServerProxy internals")
38 return server._ServerProxy__host + server._ServerProxy__handler
41 # peers explicitly in aggregates.xml
42 peers =dict ([ (peername,get_serverproxy_url(v)) for (peername,v) in api.aggregates.items()
43 if peername != api.hrn])
45 sm_version=version_core({'interface':'slicemgr',
46 'hrn' : xrn.get_hrn(),
47 'urn' : xrn.get_urn(),
50 # local aggregate if present needs to have localhost resolved
51 if api.hrn in api.aggregates:
52 local_am_url=get_serverproxy_url(api.aggregates[api.hrn])
53 sm_version['peers'][api.hrn]=local_am_url.replace('localhost',sm_version['hostname'])
56 def create_slice(api, xrn, creds, rspec, users):
57 hrn, type = urn_to_hrn(xrn)
59 # Validate the RSpec against PlanetLab's schema --disabled for now
60 # The schema used here needs to aggregate the PL and VINI schemas
61 # schema = "/var/www/html/schemas/pl.rng"
65 tree = etree.parse(StringIO(rspec))
66 except etree.XMLSyntaxError:
67 message = str(sys.exc_info()[1])
68 raise InvalidRSpec(message)
70 relaxng_doc = etree.parse(schema)
71 relaxng = etree.RelaxNG(relaxng_doc)
74 error = relaxng.error_log.last_error
75 message = "%s (line %s)" % (error.message, error.line)
76 raise InvalidRSpec(message)
79 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
80 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
82 # attempt to use delegated credential first
83 credential = api.getDelegatedCredential(creds)
85 credential = api.getCredential()
86 threads = ThreadManager()
87 for aggregate in api.aggregates:
88 # prevent infinite loop. Dont send request back to caller
89 # unless the caller is the aggregate's SM
90 if caller_hrn == aggregate and aggregate != api.hrn:
93 # Just send entire RSpec to each aggregate
94 server = api.aggregates[aggregate]
95 threads.run(server.CreateSliver, xrn, credential, rspec, users)
97 results = threads.get_results()
98 merged_rspec = merge_rspecs(results)
101 def renew_slice(api, xrn, creds, expiration_time):
102 hrn, type = urn_to_hrn(xrn)
104 # get the callers hrn
105 valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
106 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
108 # attempt to use delegated credential first
109 credential = api.getDelegatedCredential(creds)
111 credential = api.getCredential()
112 threads = ThreadManager()
113 for aggregate in api.aggregates:
114 # prevent infinite loop. Dont send request back to caller
115 # unless the caller is the aggregate's SM
116 if caller_hrn == aggregate and aggregate != api.hrn:
119 server = api.aggregates[aggregate]
120 threads.run(server.RenewSliver, xrn, [credential], expiration_time)
121 threads.get_results()
124 def get_ticket(api, xrn, creds, rspec, users):
125 slice_hrn, type = urn_to_hrn(xrn)
126 # get the netspecs contained within the clients rspec
127 aggregate_rspecs = {}
128 tree= etree.parse(StringIO(rspec))
129 elements = tree.findall('./network')
130 for element in elements:
131 aggregate_hrn = element.values()[0]
132 aggregate_rspecs[aggregate_hrn] = rspec
134 # get the callers hrn
135 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
136 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
138 # attempt to use delegated credential first
139 credential = api.getDelegatedCredential(creds)
141 credential = api.getCredential()
142 threads = ThreadManager()
143 for aggregate, aggregate_rspec in aggregate_rspecs.items():
144 # prevent infinite loop. Dont send request back to caller
145 # unless the caller is the aggregate's SM
146 if caller_hrn == aggregate and aggregate != api.hrn:
149 if aggregate in api.aggregates:
150 server = api.aggregates[aggregate]
152 net_urn = hrn_to_urn(aggregate, 'authority')
153 # we may have a peer that knows about this aggregate
154 for agg in api.aggregates:
155 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
156 if not target_aggs or not 'hrn' in target_aggs[0]:
158 # send the request to this address
159 url = target_aggs[0]['url']
160 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
161 # aggregate found, no need to keep looping
165 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
167 results = threads.get_results()
169 # gather information from each ticket
174 for result in results:
175 agg_ticket = SfaTicket(string=result)
176 attrs = agg_ticket.get_attributes()
178 object_gid = agg_ticket.get_gid_object()
179 rspecs.append(agg_ticket.get_rspec())
180 initscripts.extend(attrs.get('initscripts', []))
181 slivers.extend(attrs.get('slivers', []))
184 attributes = {'initscripts': initscripts,
186 merged_rspec = merge_rspecs(rspecs)
188 # create a new ticket
189 ticket = SfaTicket(subject = slice_hrn)
190 ticket.set_gid_caller(api.auth.client_gid)
191 ticket.set_issuer(key=api.key, subject=api.hrn)
192 ticket.set_gid_object(object_gid)
193 ticket.set_pubkey(object_gid.get_pubkey())
194 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
195 ticket.set_attributes(attributes)
196 ticket.set_rspec(merged_rspec)
199 return ticket.save_to_string(save_parents=True)
202 def delete_slice(api, xrn, creds):
203 hrn, type = urn_to_hrn(xrn)
205 # get the callers hrn
206 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
207 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
209 # attempt to use delegated credential first
210 credential = api.getDelegatedCredential(creds)
212 credential = api.getCredential()
213 threads = ThreadManager()
214 for aggregate in api.aggregates:
215 # prevent infinite loop. Dont send request back to caller
216 # unless the caller is the aggregate's SM
217 if caller_hrn == aggregate and aggregate != api.hrn:
219 server = api.aggregates[aggregate]
220 threads.run(server.DeleteSliver, xrn, credential)
221 threads.get_results()
224 def start_slice(api, xrn, creds):
225 hrn, type = urn_to_hrn(xrn)
227 # get the callers hrn
228 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
229 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
231 # attempt to use delegated credential first
232 credential = api.getDelegatedCredential(creds)
234 credential = api.getCredential()
235 threads = ThreadManager()
236 for aggregate in api.aggregates:
237 # prevent infinite loop. Dont send request back to caller
238 # unless the caller is the aggregate's SM
239 if caller_hrn == aggregate and aggregate != api.hrn:
241 server = api.aggregates[aggregate]
242 threads.run(server.Start, xrn, credential)
243 threads.get_results()
246 def stop_slice(api, xrn, creds):
247 hrn, type = urn_to_hrn(xrn)
249 # get the callers hrn
250 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
251 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
253 # attempt to use delegated credential first
254 credential = api.getDelegatedCredential(creds)
256 credential = api.getCredential()
257 threads = ThreadManager()
258 for aggregate in api.aggregates:
259 # prevent infinite loop. Dont send request back to caller
260 # unless the caller is the aggregate's SM
261 if caller_hrn == aggregate and aggregate != api.hrn:
263 server = api.aggregates[aggregate]
264 threads.run(server.Stop, xrn, credential)
265 threads.get_results()
268 def reset_slice(api, xrn):
274 def shutdown(api, xrn, creds):
280 def status(api, xrn, creds):
286 def get_slices(api, creds):
288 # look in cache first
290 slices = api.cache.get('slices')
294 # get the callers hrn
295 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
296 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
298 # attempt to use delegated credential first
299 credential = api.getDelegatedCredential(creds)
301 credential = api.getCredential()
302 threads = ThreadManager()
303 # fetch from aggregates
304 for aggregate in api.aggregates:
305 # prevent infinite loop. Dont send request back to caller
306 # unless the caller is the aggregate's SM
307 if caller_hrn == aggregate and aggregate != api.hrn:
309 server = api.aggregates[aggregate]
310 threads.run(server.ListSlices, credential)
313 results = threads.get_results()
315 for result in results:
316 slices.extend(result)
320 api.cache.add('slices', slices)
324 def get_rspec(api, creds, options):
326 # get slice's hrn from options
327 xrn = options.get('geni_slice_urn', '')
328 hrn, type = urn_to_hrn(xrn)
330 # get hrn of the original caller
331 origin_hrn = options.get('origin_hrn', None)
333 if isinstance(creds, list):
334 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
336 origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
338 # look in cache first
339 if api.cache and not xrn:
340 rspec = api.cache.get('nodes')
344 hrn, type = urn_to_hrn(xrn)
346 # get the callers hrn
347 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
348 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
350 # attempt to use delegated credential first
351 credential = api.getDelegatedCredential(creds)
353 credential = api.getCredential()
354 threads = ThreadManager()
355 for aggregate in api.aggregates:
356 # prevent infinite loop. Dont send request back to caller
357 # unless the caller is the aggregate's SM
358 if caller_hrn == aggregate and aggregate != api.hrn:
360 # get the rspec from the aggregate
361 server = api.aggregates[aggregate]
362 my_opts = copy(options)
363 my_opts['geni_compressed'] = False
364 threads.run(server.ListResources, credential, my_opts)
365 #threads.run(server.get_resources, cred, xrn, origin_hrn)
367 results = threads.get_results()
368 merged_rspec = merge_rspecs(results)
371 if api.cache and not xrn:
372 api.cache.add('nodes', merged_rspec)
378 r.parseFile(sys.argv[1])
380 create_slice(None,'plc.princeton.tmacktestslice',rspec)
382 if __name__ == "__main__":