1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.rspec import *
21 from sfa.util.sfaticket import *
22 from sfa.util.debug import log
23 import sfa.plc.peers as peers
25 def delete_slice(api, xrn, origin_hrn=None):
26 credential = api.getCredential()
27 aggregates = api.aggregates
28 for aggregate in aggregates:
30 # request hash is optional so lets try the call without it
32 aggregates[aggregate].delete_slice(credential, xrn, origin_hrn)
35 print >> log, "%s" % (traceback.format_exc())
36 print >> log, "Error calling delete slice at aggregate %s" % aggregate
39 def create_slice(api, xrn, rspec, origin_hrn=None):
40 hrn, type = urn_to_hrn(xrn)
42 # Validate the RSpec against PlanetLab's schema --disabled for now
43 # The schema used here needs to aggregate the PL and VINI schemas
44 # schema = "/var/www/html/schemas/pl.rng"
48 tree = etree.parse(StringIO(rspec))
49 except etree.XMLSyntaxError:
50 message = str(sys.exc_info()[1])
51 raise InvalidRSpec(message)
53 relaxng_doc = etree.parse(schema)
54 relaxng = etree.RelaxNG(relaxng_doc)
57 error = relaxng.error_log.last_error
58 message = "%s (line %s)" % (error.message, error.line)
59 raise InvalidRSpec(message)
62 cred = api.getCredential()
64 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
66 # Just send entire RSpec to each aggregate
67 aggs[agg].create_slice(cred, xrn, rspec, origin_hrn)
69 print >> log, "Error creating slice %s at %s" % (hrn, agg)
74 def get_ticket(api, xrn, rspec, origin_hrn=None):
75 slice_hrn, type = urn_to_hrn(xrn)
76 # get the netspecs contained within the clients rspec
77 client_rspec = RSpec(xml=rspec)
78 netspecs = client_rspec.getDictsByTagName('NetSpec')
80 # create an rspec for each individual rspec
83 for netspec in netspecs:
84 net_hrn = netspec['name']
85 resources = {'start_time': 0, 'end_time': 0 ,
86 'network': {'NetSpec' : netspec}}
87 resourceDict = {'RSpec': resources}
88 temp_rspec.parseDict(resourceDict)
89 rspecs[net_hrn] = temp_rspec.toxml()
91 # send the rspec to the appropiate aggregate/sm
92 aggregates = api.aggregates
93 credential = api.getCredential()
95 for net_hrn in rspecs:
96 net_urn = urn_to_hrn(net_hrn)
98 # if we are directly connected to the aggregate then we can just
99 # send them the request. if not, then we may be connected to an sm
100 # thats connected to the aggregate
101 if net_hrn in aggregates:
102 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
103 rspecs[net_hrn], origin_hrn)
104 tickets[net_hrn] = ticket
106 # lets forward this rspec to a sm that knows about the network
107 for agg in aggregates:
108 network_found = aggregates[agg].get_aggregates(credential, net_urn)
110 ticket = aggregates[aggregate].get_ticket(credential, \
111 slice_hrn, rspecs[net_hrn], origin_hrn)
112 tickets[aggregate] = ticket
114 print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
117 # create a new ticket
118 new_ticket = SfaTicket(subject = slice_hrn)
119 new_ticket.set_gid_caller(api.auth.client_gid)
120 new_ticket.set_issuer(key=api.key, subject=api.hrn)
125 'timestamp': int(time.time()),
129 # merge data from aggregate ticket into new ticket
130 for agg_ticket in tickets.values():
131 # get data from this ticket
132 agg_ticket = SfaTicket(string=agg_ticket)
133 attributes = agg_ticket.get_attributes()
134 if attributes.get('initscripts', []) != None:
135 valid_data['initscripts'].extend(attributes.get('initscripts', []))
136 if attributes.get('slivers', []) != None:
137 valid_data['slivers'].extend(attributes.get('slivers', []))
140 object_gid = agg_ticket.get_gid_object()
141 new_ticket.set_gid_object(object_gid)
142 new_ticket.set_pubkey(object_gid.get_pubkey())
145 tmp_rspec.parseString(agg_ticket.get_rspec())
146 networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
148 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
149 new_ticket.set_attributes(valid_data)
150 resources = {'networks': networks, 'start_time': 0, 'duration': 0}
151 resourceDict = {'RSpec': resources}
152 tmp_rspec.parseDict(resourceDict)
153 new_ticket.set_rspec(tmp_rspec.toxml())
156 return new_ticket.save_to_string(save_parents=True)
158 def start_slice(api, xrn):
159 hrn, type = urn_to_hrn(xrn)
160 slicename = hrn_to_pl_slicename(hrn)
161 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
163 raise RecordNotFound(hrn)
165 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
166 attribute_id = attreibutes[0]['slice_attribute_id']
167 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" )
171 def stop_slice(api, xrn):
172 hrn, type = urn_to_hrn(xrn)
173 slicename = hrn_to_pl_slicename(hrn)
174 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
176 raise RecordNotFound(hrn)
177 slice_id = slices[0]['slice_id']
178 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
179 attribute_id = attributes[0]['slice_attribute_id']
180 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0")
183 def reset_slice(api, xrn):
184 # XX not implemented at this interface
188 # look in cache first
190 slices = api.cache.get('slices')
194 # fetch from aggregates
196 credential = api.getCredential()
197 for aggregate in api.aggregates:
199 tmp_slices = api.aggregates[aggregate].get_slices(credential)
200 slices.extend(tmp_slices)
202 print >> log, "%s" % (traceback.format_exc())
203 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
206 api.cache.add('slices', slices)
209 def get_rspec(api, xrn=None, origin_hrn=None):
210 hrn, type = urn_to_hrn(xrn)
213 aggs = api.aggregates
214 cred = api.getCredential()
216 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
218 # get the rspec from the aggregate
219 agg_rspec = aggs[agg].get_resources(cred, xrn, origin_hrn)
221 # XX print out to some error log
222 print >> log, "Error getting resources at aggregate %s" % agg
223 traceback.print_exc(log)
224 print >> log, "%s" % (traceback.format_exc())
228 tree = etree.parse(StringIO(agg_rspec))
229 except etree.XMLSyntaxError:
230 message = agg + ": " + str(sys.exc_info()[1])
231 raise InvalidRSpec(message)
233 root = tree.getroot()
234 if root.get("type") in ["SFA"]:
238 for network in root.iterfind("./network"):
239 rspec.append(deepcopy(network))
240 for request in root.iterfind("./request"):
241 rspec.append(deepcopy(request))
243 return etree.tostring(rspec, xml_declaration=True, pretty_print=True)
246 Returns the request context required by sfatables. At some point, this
247 mechanism should be changed to refer to "contexts", which is the
248 information that sfatables is requesting. But for now, we just return
249 the basic information needed in a dict.
251 def fetch_context(slice_hrn, user_hrn, contexts):
252 #slice_hrn = urn_to_hrn(slice_xrn)[0]
253 #user_hrn = urn_to_hrn(user_xrn)[0]
254 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
259 r.parseFile(sys.argv[1])
261 create_slice(None,'plc.princeton.tmacktestslice',rspec)
263 if __name__ == "__main__":