1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.threadmanager import ThreadManager
22 from sfa.util.debug import log
23 import sfa.plc.peers as peers
25 def delete_slice(api, xrn, origin_hrn=None):
26 credential = api.getCredential()
27 aggregates = api.aggregates
28 for aggregate in aggregates:
30 # request hash is optional so lets try the call without it
32 aggregates[aggregate].delete_slice(credential, xrn, origin_hrn)
35 print >> log, "%s" % (traceback.format_exc())
36 print >> log, "Error calling delete slice at aggregate %s" % aggregate
39 def create_slice(api, xrn, rspec, origin_hrn=None):
40 hrn, type = urn_to_hrn(xrn)
42 # Validate the RSpec against PlanetLab's schema --disabled for now
43 # The schema used here needs to aggregate the PL and VINI schemas
44 # schema = "/var/www/html/schemas/pl.rng"
48 tree = etree.parse(StringIO(rspec))
49 except etree.XMLSyntaxError:
50 message = str(sys.exc_info()[1])
51 raise InvalidRSpec(message)
53 relaxng_doc = etree.parse(schema)
54 relaxng = etree.RelaxNG(relaxng_doc)
57 error = relaxng.error_log.last_error
58 message = "%s (line %s)" % (error.message, error.line)
59 raise InvalidRSpec(message)
62 cred = api.getCredential()
64 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
66 # Just send entire RSpec to each aggregate
67 aggs[agg].create_slice(cred, xrn, rspec, origin_hrn)
69 print >> log, "Error creating slice %s at %s" % (hrn, agg)
74 def get_ticket(api, xrn, rspec, origin_hrn=None):
75 slice_hrn, type = urn_to_hrn(xrn)
76 # get the netspecs contained within the clients rspec
77 client_rspec = RSpec(xml=rspec)
78 netspecs = client_rspec.getDictsByTagName('NetSpec')
80 # create an rspec for each individual rspec
83 for netspec in netspecs:
84 net_hrn = netspec['name']
85 resources = {'start_time': 0, 'end_time': 0 ,
86 'network': {'NetSpec' : netspec}}
87 resourceDict = {'RSpec': resources}
88 temp_rspec.parseDict(resourceDict)
89 rspecs[net_hrn] = temp_rspec.toxml()
91 # send the rspec to the appropiate aggregate/sm
92 aggregates = api.aggregates
93 credential = api.getCredential()
95 for net_hrn in rspecs:
96 net_urn = urn_to_hrn(net_hrn)
98 # if we are directly connected to the aggregate then we can just
99 # send them the request. if not, then we may be connected to an sm
100 # thats connected to the aggregate
101 if net_hrn in aggregates:
102 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
103 rspecs[net_hrn], origin_hrn)
104 tickets[net_hrn] = ticket
106 # lets forward this rspec to a sm that knows about the network
107 for agg in aggregates:
108 network_found = aggregates[agg].get_aggregates(credential, net_urn)
110 ticket = aggregates[aggregate].get_ticket(credential, \
111 slice_hrn, rspecs[net_hrn], origin_hrn)
112 tickets[aggregate] = ticket
114 print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
117 # create a new ticket
118 new_ticket = SfaTicket(subject = slice_hrn)
119 new_ticket.set_gid_caller(api.auth.client_gid)
120 new_ticket.set_issuer(key=api.key, subject=api.hrn)
125 'timestamp': int(time.time()),
129 # merge data from aggregate ticket into new ticket
130 for agg_ticket in tickets.values():
131 # get data from this ticket
132 agg_ticket = SfaTicket(string=agg_ticket)
133 attributes = agg_ticket.get_attributes()
134 if attributes.get('initscripts', []) != None:
135 valid_data['initscripts'].extend(attributes.get('initscripts', []))
136 if attributes.get('slivers', []) != None:
137 valid_data['slivers'].extend(attributes.get('slivers', []))
140 object_gid = agg_ticket.get_gid_object()
141 new_ticket.set_gid_object(object_gid)
142 new_ticket.set_pubkey(object_gid.get_pubkey())
145 tmp_rspec.parseString(agg_ticket.get_rspec())
146 networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
148 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
149 new_ticket.set_attributes(valid_data)
150 resources = {'networks': networks, 'start_time': 0, 'duration': 0}
151 resourceDict = {'RSpec': resources}
152 tmp_rspec.parseDict(resourceDict)
153 new_ticket.set_rspec(tmp_rspec.toxml())
156 return new_ticket.save_to_string(save_parents=True)
158 def start_slice(api, xrn):
159 hrn, type = urn_to_hrn(xrn)
160 slicename = hrn_to_pl_slicename(hrn)
161 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
163 raise RecordNotFound(hrn)
165 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
166 attribute_id = attreibutes[0]['slice_attribute_id']
167 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" )
171 def stop_slice(api, xrn):
172 hrn, type = urn_to_hrn(xrn)
173 slicename = hrn_to_pl_slicename(hrn)
174 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
176 raise RecordNotFound(hrn)
177 slice_id = slices[0]['slice_id']
178 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
179 attribute_id = attributes[0]['slice_attribute_id']
180 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0")
183 def reset_slice(api, xrn):
184 # XX not implemented at this interface
188 # look in cache first
190 slices = api.cache.get('slices')
194 # fetch from aggregates
196 credential = api.getCredential()
197 for aggregate in api.aggregates:
199 tmp_slices = api.aggregates[aggregate].get_slices(credential)
200 slices.extend(tmp_slices)
202 print >> log, "%s" % (traceback.format_exc())
203 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
207 api.cache.add('slices', slices)
211 def get_rspec(api, xrn=None, origin_hrn=None):
212 # look in cache first
213 if api.cache and not xrn:
214 rspec = api.cache.get('nodes')
218 hrn, type = urn_to_hrn(xrn)
220 aggs = api.aggregates
221 cred = api.getCredential()
222 threads = ThreadManager()
224 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
225 # get the rspec from the aggregate
226 threads.run(aggs[agg].get_resources, cred, xrn, origin_hrn)
228 results = threads.get_results()
229 # combine the rspecs into a single rspec
230 for agg_rspec in results:
232 tree = etree.parse(StringIO(agg_rspec))
233 except etree.XMLSyntaxError:
234 message = agg + ": " + str(sys.exc_info()[1])
235 raise InvalidRSpec(message)
237 root = tree.getroot()
238 if root.get("type") in ["SFA"]:
242 for network in root.iterfind("./network"):
243 rspec.append(deepcopy(network))
244 for request in root.iterfind("./request"):
245 rspec.append(deepcopy(request))
247 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
249 if api.cache and not xrn:
250 api.cache.add('nodes', rspec)
255 Returns the request context required by sfatables. At some point, this
256 mechanism should be changed to refer to "contexts", which is the
257 information that sfatables is requesting. But for now, we just return
258 the basic information needed in a dict.
260 def fetch_context(slice_hrn, user_hrn, contexts):
261 #slice_hrn = urn_to_hrn(slice_xrn)[0]
262 #user_hrn = urn_to_hrn(user_xrn)[0]
263 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
268 r.parseFile(sys.argv[1])
270 create_slice(None,'plc.princeton.tmacktestslice',rspec)
272 if __name__ == "__main__":