1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.debug import log
22 from sfa.util.sfalogging import logger
23 import sfa.plc.peers as peers
26 def delete_slice(api, xrn, origin_hrn=None):
27 credential = api.getCredential()
28 aggregates = api.aggregates
29 for aggregate in aggregates:
31 # request hash is optional so lets try the call without it
33 aggregates[aggregate].delete_slice(credential, xrn, origin_hrn)
36 print >> log, "%s" % (traceback.format_exc())
37 print >> log, "Error calling delete slice at aggregate %s" % aggregate
40 def create_slice(api, xrn, rspec, origin_hrn=None):
41 hrn, type = urn_to_hrn(xrn)
43 # Validate the RSpec against PlanetLab's schema --disabled for now
44 # The schema used here needs to aggregate the PL and VINI schemas
45 # schema = "/var/www/html/schemas/pl.rng"
49 tree = etree.parse(StringIO(rspec))
50 except etree.XMLSyntaxError:
51 message = str(sys.exc_info()[1])
52 raise InvalidRSpec(message)
54 relaxng_doc = etree.parse(schema)
55 relaxng = etree.RelaxNG(relaxng_doc)
58 error = relaxng.error_log.last_error
59 message = "%s (line %s)" % (error.message, error.line)
60 raise InvalidRSpec(message)
63 cred = api.getCredential()
65 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
67 # Just send entire RSpec to each aggregate
68 aggs[agg].create_slice(cred, xrn, rspec, origin_hrn)
70 print >> log, "Error creating slice %s at %s" % (hrn, agg)
75 def get_ticket(api, xrn, rspec, origin_hrn=None):
76 slice_hrn, type = urn_to_hrn(xrn)
77 # get the netspecs contained within the clients rspec
78 client_rspec = RSpec(xml=rspec)
79 netspecs = client_rspec.getDictsByTagName('NetSpec')
81 # create an rspec for each individual rspec
84 for netspec in netspecs:
85 net_hrn = netspec['name']
86 resources = {'start_time': 0, 'end_time': 0 ,
87 'network': {'NetSpec' : netspec}}
88 resourceDict = {'RSpec': resources}
89 temp_rspec.parseDict(resourceDict)
90 rspecs[net_hrn] = temp_rspec.toxml()
92 # send the rspec to the appropiate aggregate/sm
93 aggregates = api.aggregates
94 credential = api.getCredential()
96 for net_hrn in rspecs:
97 net_urn = urn_to_hrn(net_hrn)
99 # if we are directly connected to the aggregate then we can just
100 # send them the request. if not, then we may be connected to an sm
101 # thats connected to the aggregate
102 if net_hrn in aggregates:
103 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
104 rspecs[net_hrn], origin_hrn)
105 tickets[net_hrn] = ticket
107 # lets forward this rspec to a sm that knows about the network
108 for agg in aggregates:
109 network_found = aggregates[agg].get_aggregates(credential, net_urn)
111 ticket = aggregates[aggregate].get_ticket(credential, \
112 slice_hrn, rspecs[net_hrn], origin_hrn)
113 tickets[aggregate] = ticket
115 print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
118 # create a new ticket
119 new_ticket = SfaTicket(subject = slice_hrn)
120 new_ticket.set_gid_caller(api.auth.client_gid)
121 new_ticket.set_issuer(key=api.key, subject=api.hrn)
126 'timestamp': int(time.time()),
130 # merge data from aggregate ticket into new ticket
131 for agg_ticket in tickets.values():
132 # get data from this ticket
133 agg_ticket = SfaTicket(string=agg_ticket)
134 attributes = agg_ticket.get_attributes()
135 if attributes.get('initscripts', []) != None:
136 valid_data['initscripts'].extend(attributes.get('initscripts', []))
137 if attributes.get('slivers', []) != None:
138 valid_data['slivers'].extend(attributes.get('slivers', []))
141 object_gid = agg_ticket.get_gid_object()
142 new_ticket.set_gid_object(object_gid)
143 new_ticket.set_pubkey(object_gid.get_pubkey())
146 tmp_rspec.parseString(agg_ticket.get_rspec())
147 networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
149 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
150 new_ticket.set_attributes(valid_data)
151 resources = {'networks': networks, 'start_time': 0, 'duration': 0}
152 resourceDict = {'RSpec': resources}
153 tmp_rspec.parseDict(resourceDict)
154 new_ticket.set_rspec(tmp_rspec.toxml())
157 return new_ticket.save_to_string(save_parents=True)
159 def start_slice(api, xrn):
160 hrn, type = urn_to_hrn(xrn)
161 slicename = hrn_to_pl_slicename(hrn)
162 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
164 raise RecordNotFound(hrn)
166 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
167 attribute_id = attreibutes[0]['slice_attribute_id']
168 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" )
172 def stop_slice(api, xrn):
173 hrn, type = urn_to_hrn(xrn)
174 slicename = hrn_to_pl_slicename(hrn)
175 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
177 raise RecordNotFound(hrn)
178 slice_id = slices[0]['slice_id']
179 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
180 attribute_id = attributes[0]['slice_attribute_id']
181 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0")
184 def reset_slice(api, xrn):
185 # XX not implemented at this interface
189 # look in cache first
191 slices = api.cache.get('slices')
195 # fetch from aggregates
197 credential = api.getCredential()
198 for aggregate in api.aggregates:
200 tmp_slices = api.aggregates[aggregate].get_slices(credential)
201 slices.extend(tmp_slices)
203 print >> log, "%s" % (traceback.format_exc())
204 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
208 api.cache.add('slices', slices)
212 def get_rspec(api, xrn=None, origin_hrn=None):
213 # look in cache first
214 if api.cache and not xrn:
215 rspec = api.cache.get('nodes')
219 hrn, type = urn_to_hrn(xrn)
221 aggs = api.aggregates
222 cred = api.getCredential()
224 print >> log, "Aggregates = %s" % aggs
226 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
228 # get the rspec from the aggregate
229 agg_rspec = aggs[agg].get_resources(cred, xrn, origin_hrn)
232 # XX print out to some error log
233 print >> log, "Error getting resources at aggregate %s" % agg
234 traceback.print_exc(log)
235 print >> log, "%s" % (traceback.format_exc())
239 tree = etree.parse(StringIO(agg_rspec))
240 except etree.XMLSyntaxError:
241 message = agg + ": " + str(sys.exc_info()[1])
242 raise InvalidRSpec(message)
244 root = tree.getroot()
245 if root.get("type") in ["SFA"]:
249 for network in root.iterfind("./network"):
250 rspec.append(deepcopy(network))
251 for request in root.iterfind("./request"):
252 rspec.append(deepcopy(request))
254 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
255 if api.cache and not xrn:
256 api.cache.add('nodes', rspec)
261 Returns the request context required by sfatables. At some point, this
262 mechanism should be changed to refer to "contexts", which is the
263 information that sfatables is requesting. But for now, we just return
264 the basic information needed in a dict.
266 def fetch_context(slice_hrn, user_hrn, contexts):
267 #slice_hrn = urn_to_hrn(slice_xrn)[0]
268 #user_hrn = urn_to_hrn(user_xrn)[0]
269 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
274 r.parseFile(sys.argv[1])
276 create_slice(None,'plc.princeton.tmacktestslice',rspec)
278 if __name__ == "__main__":