1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.threadmanager import ThreadManager
22 from sfa.util.debug import log
23 import sfa.plc.peers as peers
25 def delete_slice(api, xrn, origin_hrn=None):
26 credential = api.getCredential()
27 threads = ThreadManager()
28 for aggregate in api.aggregates:
29 server = api.aggregates[aggregate]
30 threads.run(server.delete_slice, credential, xrn, origin_hrn)
34 def create_slice(api, xrn, rspec, origin_hrn=None):
35 hrn, type = urn_to_hrn(xrn)
37 # Validate the RSpec against PlanetLab's schema --disabled for now
38 # The schema used here needs to aggregate the PL and VINI schemas
39 # schema = "/var/www/html/schemas/pl.rng"
43 tree = etree.parse(StringIO(rspec))
44 except etree.XMLSyntaxError:
45 message = str(sys.exc_info()[1])
46 raise InvalidRSpec(message)
48 relaxng_doc = etree.parse(schema)
49 relaxng = etree.RelaxNG(relaxng_doc)
52 error = relaxng.error_log.last_error
53 message = "%s (line %s)" % (error.message, error.line)
54 raise InvalidRSpec(message)
56 cred = api.getCredential()
57 threads = ThreadManager()
58 for aggregate in api.aggregates:
59 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
60 server = api.aggregates[aggregate]
61 # Just send entire RSpec to each aggregate
62 threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
66 def get_ticket(api, xrn, rspec, origin_hrn=None):
67 slice_hrn, type = urn_to_hrn(xrn)
68 # get the netspecs contained within the clients rspec
69 client_rspec = RSpec(xml=rspec)
70 netspecs = client_rspec.getDictsByTagName('NetSpec')
72 # create an rspec for each individual rspec
75 for netspec in netspecs:
76 net_hrn = netspec['name']
77 resources = {'start_time': 0, 'end_time': 0 ,
78 'network': {'NetSpec' : netspec}}
79 resourceDict = {'RSpec': resources}
80 temp_rspec.parseDict(resourceDict)
81 rspecs[net_hrn] = temp_rspec.toxml()
83 # send the rspec to the appropiate aggregate/sm
84 aggregates = api.aggregates
85 credential = api.getCredential()
87 for net_hrn in rspecs:
88 net_urn = urn_to_hrn(net_hrn)
90 # if we are directly connected to the aggregate then we can just
91 # send them the request. if not, then we may be connected to an sm
92 # thats connected to the aggregate
93 if net_hrn in aggregates:
94 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
95 rspecs[net_hrn], origin_hrn)
96 tickets[net_hrn] = ticket
98 # lets forward this rspec to a sm that knows about the network
99 for agg in aggregates:
100 network_found = aggregates[agg].get_aggregates(credential, net_urn)
102 ticket = aggregates[aggregate].get_ticket(credential, \
103 slice_hrn, rspecs[net_hrn], origin_hrn)
104 tickets[aggregate] = ticket
106 print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
109 # create a new ticket
110 new_ticket = SfaTicket(subject = slice_hrn)
111 new_ticket.set_gid_caller(api.auth.client_gid)
112 new_ticket.set_issuer(key=api.key, subject=api.hrn)
117 'timestamp': int(time.time()),
121 # merge data from aggregate ticket into new ticket
122 for agg_ticket in tickets.values():
123 # get data from this ticket
124 agg_ticket = SfaTicket(string=agg_ticket)
125 attributes = agg_ticket.get_attributes()
126 if attributes.get('initscripts', []) != None:
127 valid_data['initscripts'].extend(attributes.get('initscripts', []))
128 if attributes.get('slivers', []) != None:
129 valid_data['slivers'].extend(attributes.get('slivers', []))
132 object_gid = agg_ticket.get_gid_object()
133 new_ticket.set_gid_object(object_gid)
134 new_ticket.set_pubkey(object_gid.get_pubkey())
137 tmp_rspec.parseString(agg_ticket.get_rspec())
138 networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
140 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
141 new_ticket.set_attributes(valid_data)
142 resources = {'networks': networks, 'start_time': 0, 'duration': 0}
143 resourceDict = {'RSpec': resources}
144 tmp_rspec.parseDict(resourceDict)
145 new_ticket.set_rspec(tmp_rspec.toxml())
148 return new_ticket.save_to_string(save_parents=True)
150 def start_slice(api, xrn):
151 credential = api.getCredential()
152 threads = ThreadManager()
153 for aggregate in api.aggregates:
154 server = api.aggregates[aggregate]
155 threads.run(server.stop_slice, credential, xrn)
158 def stop_slice(api, xrn):
159 credential = api.getCredential()
160 threads = ThreadManager()
161 for aggregate in api.aggregates:
162 server = api.aggregates[aggregate]
163 threads.run(server.stop_slice, credential, xrn)
166 def reset_slice(api, xrn):
167 # XX not implemented at this interface
171 # look in cache first
173 slices = api.cache.get('slices')
177 # fetch from aggregates
179 credential = api.getCredential()
180 threads = ThreadManager()
181 for aggregate in api.aggregates:
182 server = api.aggregates[aggregate]
183 threads.run(server.get_slices, credential)
186 results = threads.get_results()
188 for result in results:
189 slices.extend(result)
193 api.cache.add('slices', slices)
197 def get_rspec(api, xrn=None, origin_hrn=None):
198 # look in cache first
199 if api.cache and not xrn:
200 rspec = api.cache.get('nodes')
204 hrn, type = urn_to_hrn(xrn)
206 cred = api.getCredential()
207 threads = ThreadManager()
208 for aggregate in api.aggregates:
209 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
210 # get the rspec from the aggregate
211 server = api.aggregates[aggregate]
212 threads.run(server.get_resources, cred, xrn, origin_hrn)
214 results = threads.get_results()
215 # combine the rspecs into a single rspec
216 for agg_rspec in results:
218 tree = etree.parse(StringIO(agg_rspec))
219 except etree.XMLSyntaxError:
220 message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
221 raise InvalidRSpec(message)
223 root = tree.getroot()
224 if root.get("type") in ["SFA"]:
228 for network in root.iterfind("./network"):
229 rspec.append(deepcopy(network))
230 for request in root.iterfind("./request"):
231 rspec.append(deepcopy(request))
233 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
235 if api.cache and not xrn:
236 api.cache.add('nodes', rspec)
241 Returns the request context required by sfatables. At some point, this
242 mechanism should be changed to refer to "contexts", which is the
243 information that sfatables is requesting. But for now, we just return
244 the basic information needed in a dict.
246 def fetch_context(slice_hrn, user_hrn, contexts):
247 #slice_hrn = urn_to_hrn(slice_xrn)[0]
248 #user_hrn = urn_to_hrn(user_xrn)[0]
249 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
254 r.parseFile(sys.argv[1])
256 create_slice(None,'plc.princeton.tmacktestslice',rspec)
258 if __name__ == "__main__":