1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspecHelper import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
29 version['geni_api'] = 1
32 def delete_slice(api, xrn, origin_hrn=None):
33 credential = api.getCredential()
34 threads = ThreadManager()
35 for aggregate in api.aggregates:
36 server = api.aggregates[aggregate]
37 threads.run(server.delete_slice, credential, xrn, origin_hrn)
41 def create_slice(api, xrn, creds, rspec, users):
42 hrn, type = urn_to_hrn(xrn)
44 # Validate the RSpec against PlanetLab's schema --disabled for now
45 # The schema used here needs to aggregate the PL and VINI schemas
46 # schema = "/var/www/html/schemas/pl.rng"
50 tree = etree.parse(StringIO(rspec))
51 except etree.XMLSyntaxError:
52 message = str(sys.exc_info()[1])
53 raise InvalidRSpec(message)
55 relaxng_doc = etree.parse(schema)
56 relaxng = etree.RelaxNG(relaxng_doc)
59 error = relaxng.error_log.last_error
60 message = "%s (line %s)" % (error.message, error.line)
61 raise InvalidRSpec(message)
64 # XX TODO: Should try to use delegated credential first
66 cred = api.getCredential()
67 threads = ThreadManager()
68 for aggregate in api.aggregates:
69 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
70 server = api.aggregates[aggregate]
71 # Just send entire RSpec to each aggregate
72 threads.run(server.CreateSliver, xrn, cred, rspec, users)
77 def get_ticket(api, xrn, rspec, origin_hrn=None):
78 slice_hrn, type = urn_to_hrn(xrn)
79 # get the netspecs contained within the clients rspec
81 tree= etree.parse(StringIO(rspec))
82 elements = tree.findall('./network')
83 for element in elements:
84 aggregate_hrn = element.values()[0]
85 aggregate_rspecs[aggregate_hrn] = rspec
87 # get a ticket from each aggregate
88 credential = api.getCredential()
89 threads = ThreadManager()
90 for aggregate, aggregate_rspec in aggregate_rspecs.items():
92 if aggregate in api.aggregates:
93 server = api.aggregates[aggregate]
95 net_urn = hrn_to_urn(aggregate, 'authority')
96 # we may have a peer that knows about this aggregate
97 for agg in api.aggregates:
98 agg_info = api.aggregates[agg].get_aggregates(credential, net_urn)
100 # send the request to this address
101 url = 'http://%s:%s' % (agg_info['addr'], agg_info['port'])
102 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
106 threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn)
107 results = threads.get_results()
109 # gather information from each ticket
114 for result in results:
115 agg_ticket = SfaTicket(string=result)
116 attrs = agg_ticket.get_attributes()
118 object_gid = agg_ticket.get_gid_object()
119 rspecs.append(agg_ticket.get_rspec())
120 initscripts.extend(attrs.get('initscripts', []))
121 slivers.extend(attrs.get('slivers', []))
124 attributes = {'initscripts': initscripts,
126 merged_rspec = merge_rspecs(rspecs)
128 # create a new ticket
129 ticket = SfaTicket(subject = slice_hrn)
130 ticket.set_gid_caller(api.auth.client_gid)
131 ticket.set_issuer(key=api.key, subject=api.hrn)
132 ticket.set_gid_object(object_gid)
133 ticket.set_pubkey(object_gid.get_pubkey())
134 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
135 ticket.set_attributes(attributes)
136 ticket.set_rspec(merged_rspec)
139 return ticket.save_to_string(save_parents=True)
141 def start_slice(api, xrn):
142 credential = api.getCredential()
143 threads = ThreadManager()
144 for aggregate in api.aggregates:
145 server = api.aggregates[aggregate]
146 threads.run(server.stop_slice, credential, xrn)
147 threads.get_results()
150 def stop_slice(api, xrn):
151 credential = api.getCredential()
152 threads = ThreadManager()
153 for aggregate in api.aggregates:
154 server = api.aggregates[aggregate]
155 threads.run(server.stop_slice, credential, xrn)
156 threads.get_results()
159 def reset_slice(api, xrn):
160 # XX not implemented at this interface
164 # look in cache first
166 slices = api.cache.get('slices')
170 # fetch from aggregates
172 credential = api.getCredential()
173 threads = ThreadManager()
174 for aggregate in api.aggregates:
175 server = api.aggregates[aggregate]
176 threads.run(server.get_slices, credential)
179 results = threads.get_results()
181 for result in results:
182 slices.extend(result)
186 api.cache.add('slices', slices)
190 def get_rspec(api, creds, options):
191 # get slice's hrn from options
192 xrn = options.get('geni_slice_urn', None)
193 hrn, type = urn_to_hrn(xrn)
195 # get hrn of the original caller
196 origin_hrn = options.get('origin_hrn', None)
198 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
200 # look in cache first
201 if api.cache and not xrn:
202 rspec = api.cache.get('nodes')
206 hrn, type = urn_to_hrn(xrn)
209 # XX TODO: Should try to use delegated credential first
211 cred = api.getCredential()
212 threads = ThreadManager()
214 for aggregate in api.aggregates:
215 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
216 # get the rspec from the aggregate
217 server = api.aggregates[aggregate]
218 threads.run(server.ListResources, cred, options)
219 #threads.run(server.get_resources, cred, xrn, origin_hrn)
222 results = threads.get_results()
223 # combine the rspecs into a single rspec
224 for agg_rspec in results:
226 tree = etree.parse(StringIO(agg_rspec))
227 except etree.XMLSyntaxError:
228 message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
229 raise InvalidRSpec(message)
231 root = tree.getroot()
232 if root.get("type") in ["SFA"]:
236 for network in root.iterfind("./network"):
237 rspec.append(deepcopy(network))
238 for request in root.iterfind("./request"):
239 rspec.append(deepcopy(request))
241 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
243 if api.cache and not xrn:
244 api.cache.add('nodes', rspec)
249 Returns the request context required by sfatables. At some point, this
250 mechanism should be changed to refer to "contexts", which is the
251 information that sfatables is requesting. But for now, we just return
252 the basic information needed in a dict.
254 def fetch_context(slice_hrn, user_hrn, contexts):
255 #slice_hrn = urn_to_hrn(slice_xrn)[0]
256 #user_hrn = urn_to_hrn(user_xrn)[0]
257 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
262 r.parseFile(sys.argv[1])
264 create_slice(None,'plc.princeton.tmacktestslice',rspec)
266 if __name__ == "__main__":