1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspec import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.threadmanager import ThreadManager
22 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
23 from sfa.util.debug import log
24 import sfa.plc.peers as peers
26 def delete_slice(api, xrn, origin_hrn=None):
27 credential = api.getCredential()
28 threads = ThreadManager()
29 for aggregate in api.aggregates:
30 server = api.aggregates[aggregate]
31 threads.run(server.delete_slice, credential, xrn, origin_hrn)
35 def create_slice(api, xrn, rspec, origin_hrn=None):
36 hrn, type = urn_to_hrn(xrn)
38 # Validate the RSpec against PlanetLab's schema --disabled for now
39 # The schema used here needs to aggregate the PL and VINI schemas
40 # schema = "/var/www/html/schemas/pl.rng"
44 tree = etree.parse(StringIO(rspec))
45 except etree.XMLSyntaxError:
46 message = str(sys.exc_info()[1])
47 raise InvalidRSpec(message)
49 relaxng_doc = etree.parse(schema)
50 relaxng = etree.RelaxNG(relaxng_doc)
53 error = relaxng.error_log.last_error
54 message = "%s (line %s)" % (error.message, error.line)
55 raise InvalidRSpec(message)
57 cred = api.getCredential()
58 threads = ThreadManager()
59 for aggregate in api.aggregates:
60 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
61 server = api.aggregates[aggregate]
62 # Just send entire RSpec to each aggregate
63 threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
67 def get_ticket(api, xrn, rspec, origin_hrn=None):
68 slice_hrn, type = urn_to_hrn(xrn)
69 # get the netspecs contained within the clients rspec
71 tree= etree.parse(StringIO(rspec))
72 elements = tree.findall('./network')
73 for element in elements:
74 aggregate_hrn = element.values()[0]
75 aggregate_rspecs[aggregate_hrn] = rspec
77 # get a ticket from each aggregate
78 credential = api.getCredential()
79 threads = ThreadManager()
80 for aggregate, aggregate_rspec in aggregate_rspecs.items():
82 if aggregate in api.aggregates:
83 server = api.aggregates[aggregate]
85 net_urn = hrn_to_urn(aggregate, 'authority')
86 # we may have a peer that knows about this aggregate
87 for agg in api.aggregates:
88 agg_info = api.aggregates[agg].get_aggregates(credential, net_urn)
90 # send the request to this address
91 url = 'http://%s:%s' % (agg_info['addr'], agg_info['port'])
92 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
96 threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn)
97 results = threads.get_results()
99 # gather information from each ticket
104 for result in results:
105 agg_ticket = SfaTicket(string=result)
106 attrs = agg_ticket.get_attributes()
108 object_gid = agg_ticket.get_gid_object()
110 rspecs.append(agg_ticket.get_rspec())
111 initscripts.extend(attrs.get('initscripts', []))
112 slivers.extend(attrs.get('slivers', []))
115 attributes = {'initscripts': initscripts,
117 merged_rspec = merge_rspecs(rspecs)
119 # create a new ticket
120 ticket = SfaTicket(subject = slice_hrn)
121 ticket.set_gid_caller(api.auth.client_gid)
122 ticket.set_issuer(key=api.key, subject=api.hrn)
123 ticket.set_gid_object(object_gid)
124 ticket.set_pubkey(object_gid.get_pubkey())
125 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
126 ticket.set_attributes(attributes)
127 ticket.set_rspec(merged_rspec)
130 return ticket.save_to_string(save_parents=True)
132 def start_slice(api, xrn):
133 credential = api.getCredential()
134 threads = ThreadManager()
135 for aggregate in api.aggregates:
136 server = api.aggregates[aggregate]
137 threads.run(server.stop_slice, credential, xrn)
138 threads.get_results()
141 def stop_slice(api, xrn):
142 credential = api.getCredential()
143 threads = ThreadManager()
144 for aggregate in api.aggregates:
145 server = api.aggregates[aggregate]
146 threads.run(server.stop_slice, credential, xrn)
147 threads.get_results()
150 def reset_slice(api, xrn):
151 # XX not implemented at this interface
155 # look in cache first
157 slices = api.cache.get('slices')
161 # fetch from aggregates
163 credential = api.getCredential()
164 threads = ThreadManager()
165 for aggregate in api.aggregates:
166 server = api.aggregates[aggregate]
167 threads.run(server.get_slices, credential)
170 results = threads.get_results()
172 for result in results:
173 slices.extend(result)
177 api.cache.add('slices', slices)
181 def get_rspec(api, xrn=None, origin_hrn=None):
182 # look in cache first
183 if api.cache and not xrn:
184 rspec = api.cache.get('nodes')
188 hrn, type = urn_to_hrn(xrn)
190 cred = api.getCredential()
191 threads = ThreadManager()
192 for aggregate in api.aggregates:
193 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
194 # get the rspec from the aggregate
195 server = api.aggregates[aggregate]
196 threads.run(server.get_resources, cred, xrn, origin_hrn)
198 results = threads.get_results()
199 # combine the rspecs into a single rspec
200 for agg_rspec in results:
202 tree = etree.parse(StringIO(agg_rspec))
203 except etree.XMLSyntaxError:
204 message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
205 raise InvalidRSpec(message)
207 root = tree.getroot()
208 if root.get("type") in ["SFA"]:
212 for network in root.iterfind("./network"):
213 rspec.append(deepcopy(network))
214 for request in root.iterfind("./request"):
215 rspec.append(deepcopy(request))
217 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
219 if api.cache and not xrn:
220 api.cache.add('nodes', rspec)
225 Returns the request context required by sfatables. At some point, this
226 mechanism should be changed to refer to "contexts", which is the
227 information that sfatables is requesting. But for now, we just return
228 the basic information needed in a dict.
230 def fetch_context(slice_hrn, user_hrn, contexts):
231 #slice_hrn = urn_to_hrn(slice_xrn)[0]
232 #user_hrn = urn_to_hrn(user_xrn)[0]
233 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
238 r.parseFile(sys.argv[1])
240 create_slice(None,'plc.princeton.tmacktestslice',rspec)
242 if __name__ == "__main__":