4 from StringIO import StringIO
5 from types import StringTypes
6 from copy import deepcopy
10 from sfa.util.sfalogging import sfa_logger
11 from sfa.util.rspecHelper import merge_rspecs
12 from sfa.util.xrn import Xrn, urn_to_hrn, hrn_to_urn
13 from sfa.util.plxrn import hrn_to_pl_slicename
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
24 import sfa.plc.peers as peers
25 from sfa.util.version import version_core
28 peers =dict ([ (peername,v._ServerProxy__host) for (peername,v) in api.aggregates.items()
29 if peername != api.hrn])
31 return version_core({'interface':'slicemgr',
32 'hrn' : xrn.get_hrn(),
33 'urn' : xrn.get_urn(),
37 def slice_status(api, slice_xrn, creds ):
38 hrn, type = urn_to_hrn(slice_xrn)
39 # find out where this slice is currently running
41 slicename = hrn_to_pl_slicename(hrn)
42 api.logger.info("Checking status for %s" % slicename)
43 slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
45 raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
48 nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
49 ['hostname', 'boot_state', 'last_contact'])
50 api.logger.info(slice)
51 api.logger.info(nodes)
54 result['geni_urn'] = Xrn(slice_xrn, 'slice').get_urn()
55 result['geni_status'] = 'unknown'
56 result['pl_login'] = slice['name']
57 result['pl_expires'] = datetime.datetime.fromtimestamp(slice['expires']).ctime()
63 res['pl_hostname'] = node['hostname']
64 res['pl_boot_state'] = node['boot_state']
65 res['pl_last_contact'] = node['last_contact']
66 if not node['last_contact'] is None:
67 res['pl_last_contact'] = datetime.datetime.fromtimestamp(node['last_contact']).ctime()
69 res['geni_status'] = 'unknown'
70 res['geni_error'] = ''
74 result['geni_resources'] = resources
77 def create_slice(api, xrn, creds, rspec, users):
78 hrn, type = urn_to_hrn(xrn)
80 # Validate the RSpec against PlanetLab's schema --disabled for now
81 # The schema used here needs to aggregate the PL and VINI schemas
82 # schema = "/var/www/html/schemas/pl.rng"
86 tree = etree.parse(StringIO(rspec))
87 except etree.XMLSyntaxError:
88 message = str(sys.exc_info()[1])
89 raise InvalidRSpec(message)
91 relaxng_doc = etree.parse(schema)
92 relaxng = etree.RelaxNG(relaxng_doc)
95 error = relaxng.error_log.last_error
96 message = "%s (line %s)" % (error.message, error.line)
97 raise InvalidRSpec(message)
100 valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0]
101 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
103 # attempt to use delegated credential first
104 credential = api.getDelegatedCredential(creds)
106 credential = api.getCredential()
107 threads = ThreadManager()
108 for aggregate in api.aggregates:
109 # prevent infinite loop. Dont send request back to caller
110 # unless the caller is the aggregate's SM
111 if caller_hrn == aggregate and aggregate != api.hrn:
114 # Just send entire RSpec to each aggregate
115 server = api.aggregates[aggregate]
116 threads.run(server.CreateSliver, xrn, credential, rspec, users)
118 results = threads.get_results()
119 merged_rspec = merge_rspecs(results)
122 def renew_slice(api, xrn, creds, expiration_time):
123 hrn, type = urn_to_hrn(xrn)
125 # get the callers hrn
126 valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0]
127 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
129 # attempt to use delegated credential first
130 credential = api.getDelegatedCredential(creds)
132 credential = api.getCredential()
133 threads = ThreadManager()
134 for aggregate in api.aggregates:
135 # prevent infinite loop. Dont send request back to caller
136 # unless the caller is the aggregate's SM
137 if caller_hrn == aggregate and aggregate != api.hrn:
140 server = api.aggregates[aggregate]
141 threads.run(server.RenewSliver, xrn, [credential], expiration_time)
142 threads.get_results()
145 def get_ticket(api, xrn, creds, rspec, users):
146 slice_hrn, type = urn_to_hrn(xrn)
147 # get the netspecs contained within the clients rspec
148 aggregate_rspecs = {}
149 tree= etree.parse(StringIO(rspec))
150 elements = tree.findall('./network')
151 for element in elements:
152 aggregate_hrn = element.values()[0]
153 aggregate_rspecs[aggregate_hrn] = rspec
155 # get the callers hrn
156 valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0]
157 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
159 # attempt to use delegated credential first
160 credential = api.getDelegatedCredential(creds)
162 credential = api.getCredential()
163 threads = ThreadManager()
164 for aggregate, aggregate_rspec in aggregate_rspecs.items():
165 # prevent infinite loop. Dont send request back to caller
166 # unless the caller is the aggregate's SM
167 if caller_hrn == aggregate and aggregate != api.hrn:
170 if aggregate in api.aggregates:
171 server = api.aggregates[aggregate]
173 net_urn = hrn_to_urn(aggregate, 'authority')
174 # we may have a peer that knows about this aggregate
175 for agg in api.aggregates:
176 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
177 if not target_aggs or not 'hrn' in target_aggs[0]:
179 # send the request to this address
180 url = target_aggs[0]['url']
181 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
182 # aggregate found, no need to keep looping
186 threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
188 results = threads.get_results()
190 # gather information from each ticket
195 for result in results:
196 agg_ticket = SfaTicket(string=result)
197 attrs = agg_ticket.get_attributes()
199 object_gid = agg_ticket.get_gid_object()
200 rspecs.append(agg_ticket.get_rspec())
201 initscripts.extend(attrs.get('initscripts', []))
202 slivers.extend(attrs.get('slivers', []))
205 attributes = {'initscripts': initscripts,
207 merged_rspec = merge_rspecs(rspecs)
209 # create a new ticket
210 ticket = SfaTicket(subject = slice_hrn)
211 ticket.set_gid_caller(api.auth.client_gid)
212 ticket.set_issuer(key=api.key, subject=api.hrn)
213 ticket.set_gid_object(object_gid)
214 ticket.set_pubkey(object_gid.get_pubkey())
215 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
216 ticket.set_attributes(attributes)
217 ticket.set_rspec(merged_rspec)
220 return ticket.save_to_string(save_parents=True)
223 def delete_slice(api, xrn, creds):
224 hrn, type = urn_to_hrn(xrn)
226 # get the callers hrn
227 valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0]
228 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
230 # attempt to use delegated credential first
231 credential = api.getDelegatedCredential(creds)
233 credential = api.getCredential()
234 threads = ThreadManager()
235 for aggregate in api.aggregates:
236 # prevent infinite loop. Dont send request back to caller
237 # unless the caller is the aggregate's SM
238 if caller_hrn == aggregate and aggregate != api.hrn:
240 server = api.aggregates[aggregate]
241 threads.run(server.DeleteSliver, xrn, credential)
242 threads.get_results()
245 def start_slice(api, xrn, creds):
246 hrn, type = urn_to_hrn(xrn)
248 # get the callers hrn
249 valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0]
250 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
252 # attempt to use delegated credential first
253 credential = api.getDelegatedCredential(creds)
255 credential = api.getCredential()
256 threads = ThreadManager()
257 for aggregate in api.aggregates:
258 # prevent infinite loop. Dont send request back to caller
259 # unless the caller is the aggregate's SM
260 if caller_hrn == aggregate and aggregate != api.hrn:
262 server = api.aggregates[aggregate]
263 threads.run(server.Start, xrn, credential)
264 threads.get_results()
267 def stop_slice(api, xrn, creds):
268 hrn, type = urn_to_hrn(xrn)
270 # get the callers hrn
271 valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0]
272 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
274 # attempt to use delegated credential first
275 credential = api.getDelegatedCredential(creds)
277 credential = api.getCredential()
278 threads = ThreadManager()
279 for aggregate in api.aggregates:
280 # prevent infinite loop. Dont send request back to caller
281 # unless the caller is the aggregate's SM
282 if caller_hrn == aggregate and aggregate != api.hrn:
284 server = api.aggregates[aggregate]
285 threads.run(server.Stop, xrn, credential)
286 threads.get_results()
289 def reset_slice(api, xrn):
295 def shutdown(api, xrn, creds):
301 def status(api, xrn, creds):
307 def get_slices(api, creds):
309 # look in cache first
311 slices = api.cache.get('slices')
315 # get the callers hrn
316 valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0]
317 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
319 # attempt to use delegated credential first
320 credential = api.getDelegatedCredential(creds)
322 credential = api.getCredential()
323 threads = ThreadManager()
324 # fetch from aggregates
325 for aggregate in api.aggregates:
326 # prevent infinite loop. Dont send request back to caller
327 # unless the caller is the aggregate's SM
328 if caller_hrn == aggregate and aggregate != api.hrn:
330 server = api.aggregates[aggregate]
331 threads.run(server.ListSlices, credential)
334 results = threads.get_results()
336 for result in results:
337 slices.extend(result)
341 api.cache.add('slices', slices)
345 def get_rspec(api, creds, options):
347 # get slice's hrn from options
348 xrn = options.get('geni_slice_urn', '')
349 hrn, type = urn_to_hrn(xrn)
351 # get hrn of the original caller
352 origin_hrn = options.get('origin_hrn', None)
354 if isinstance(creds, list):
355 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
357 origin_hrn = Credential(string=creds).get_gid_caller().get_hrn()
359 # look in cache first
360 if api.cache and not xrn:
361 rspec = api.cache.get('nodes')
365 hrn, type = urn_to_hrn(xrn)
367 # get the callers hrn
368 valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0]
369 caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn()
371 # attempt to use delegated credential first
372 credential = api.getDelegatedCredential(creds)
374 credential = api.getCredential()
375 threads = ThreadManager()
376 for aggregate in api.aggregates:
377 # prevent infinite loop. Dont send request back to caller
378 # unless the caller is the aggregate's SM
379 if caller_hrn == aggregate and aggregate != api.hrn:
381 # get the rspec from the aggregate
382 server = api.aggregates[aggregate]
383 my_opts = copy(options)
384 my_opts['geni_compressed'] = False
385 threads.run(server.ListResources, credential, my_opts)
386 #threads.run(server.get_resources, cred, xrn, origin_hrn)
388 results = threads.get_results()
389 merged_rspec = merge_rspecs(results)
392 if api.cache and not xrn:
393 api.cache.add('nodes', merged_rspec)
399 r.parseFile(sys.argv[1])
401 create_slice(None,'plc.princeton.tmacktestslice',rspec)
403 if __name__ == "__main__":