1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.debug import log
22 import sfa.plc.peers as peers
24 def delete_slice(api, xrn, origin_hrn=None):
25 credential = api.getCredential()
26 aggregates = api.aggregates
27 for aggregate in aggregates:
29 # request hash is optional so lets try the call without it
31 aggregates[aggregate].delete_slice(credential, xrn, origin_hrn)
34 print >> log, "%s" % (traceback.format_exc())
35 print >> log, "Error calling delete slice at aggregate %s" % aggregate
38 def create_slice(api, xrn, rspec, origin_hrn=None):
39 hrn, type = urn_to_hrn(xrn)
41 # Validate the RSpec against PlanetLab's schema --disabled for now
42 # The schema used here needs to aggregate the PL and VINI schemas
43 # schema = "/var/www/html/schemas/pl.rng"
47 tree = etree.parse(StringIO(rspec))
48 except etree.XMLSyntaxError:
49 message = str(sys.exc_info()[1])
50 raise InvalidRSpec(message)
52 relaxng_doc = etree.parse(schema)
53 relaxng = etree.RelaxNG(relaxng_doc)
56 error = relaxng.error_log.last_error
57 message = "%s (line %s)" % (error.message, error.line)
58 raise InvalidRSpec(message)
61 cred = api.getCredential()
63 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
65 # Just send entire RSpec to each aggregate
66 aggs[agg].create_slice(cred, xrn, rspec, origin_hrn)
68 print >> log, "Error creating slice %s at %s" % (hrn, agg)
73 def get_ticket(api, xrn, rspec, origin_hrn=None):
74 slice_hrn, type = urn_to_hrn(xrn)
75 # get the netspecs contained within the clients rspec
76 client_rspec = RSpec(xml=rspec)
77 netspecs = client_rspec.getDictsByTagName('NetSpec')
79 # create an rspec for each individual rspec
82 for netspec in netspecs:
83 net_hrn = netspec['name']
84 resources = {'start_time': 0, 'end_time': 0 ,
85 'network': {'NetSpec' : netspec}}
86 resourceDict = {'RSpec': resources}
87 temp_rspec.parseDict(resourceDict)
88 rspecs[net_hrn] = temp_rspec.toxml()
90 # send the rspec to the appropiate aggregate/sm
91 aggregates = api.aggregates
92 credential = api.getCredential()
94 for net_hrn in rspecs:
95 net_urn = urn_to_hrn(net_hrn)
97 # if we are directly connected to the aggregate then we can just
98 # send them the request. if not, then we may be connected to an sm
99 # thats connected to the aggregate
100 if net_hrn in aggregates:
101 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
102 rspecs[net_hrn], origin_hrn)
103 tickets[net_hrn] = ticket
105 # lets forward this rspec to a sm that knows about the network
106 for agg in aggregates:
107 network_found = aggregates[agg].get_aggregates(credential, net_urn)
109 ticket = aggregates[aggregate].get_ticket(credential, \
110 slice_hrn, rspecs[net_hrn], origin_hrn)
111 tickets[aggregate] = ticket
113 print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
116 # create a new ticket
117 new_ticket = SfaTicket(subject = slice_hrn)
118 new_ticket.set_gid_caller(api.auth.client_gid)
119 new_ticket.set_issuer(key=api.key, subject=api.hrn)
124 'timestamp': int(time.time()),
128 # merge data from aggregate ticket into new ticket
129 for agg_ticket in tickets.values():
130 # get data from this ticket
131 agg_ticket = SfaTicket(string=agg_ticket)
132 attributes = agg_ticket.get_attributes()
133 if attributes.get('initscripts', []) != None:
134 valid_data['initscripts'].extend(attributes.get('initscripts', []))
135 if attributes.get('slivers', []) != None:
136 valid_data['slivers'].extend(attributes.get('slivers', []))
139 object_gid = agg_ticket.get_gid_object()
140 new_ticket.set_gid_object(object_gid)
141 new_ticket.set_pubkey(object_gid.get_pubkey())
144 tmp_rspec.parseString(agg_ticket.get_rspec())
145 networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
147 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
148 new_ticket.set_attributes(valid_data)
149 resources = {'networks': networks, 'start_time': 0, 'duration': 0}
150 resourceDict = {'RSpec': resources}
151 tmp_rspec.parseDict(resourceDict)
152 new_ticket.set_rspec(tmp_rspec.toxml())
155 return new_ticket.save_to_string(save_parents=True)
157 def start_slice(api, xrn):
158 hrn, type = urn_to_hrn(xrn)
159 slicename = hrn_to_pl_slicename(hrn)
160 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
162 raise RecordNotFound(hrn)
164 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
165 attribute_id = attreibutes[0]['slice_attribute_id']
166 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" )
170 def stop_slice(api, xrn):
171 hrn, type = urn_to_hrn(xrn)
172 slicename = hrn_to_pl_slicename(hrn)
173 slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
175 raise RecordNotFound(hrn)
176 slice_id = slices[0]['slice_id']
177 attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
178 attribute_id = attributes[0]['slice_attribute_id']
179 api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0")
182 def reset_slice(api, xrn):
183 # XX not implemented at this interface
187 # look in cache first
189 slices = api.cache.get('slices')
193 # fetch from aggregates
195 credential = api.getCredential()
196 for aggregate in api.aggregates:
198 tmp_slices = api.aggregates[aggregate].get_slices(credential)
199 slices.extend(tmp_slices)
201 print >> log, "%s" % (traceback.format_exc())
202 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
206 api.cache.add('slices', slices)
210 def get_rspec(api, xrn=None, origin_hrn=None):
211 # look in cache first
212 if api.cache and not xrn:
213 rspec = api.cache.get('nodes')
217 hrn, type = urn_to_hrn(xrn)
219 aggs = api.aggregates
220 cred = api.getCredential()
222 if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
224 # get the rspec from the aggregate
225 agg_rspec = aggs[agg].get_resources(cred, xrn, origin_hrn)
227 # XX print out to some error log
228 print >> log, "Error getting resources at aggregate %s" % agg
229 traceback.print_exc(log)
230 print >> log, "%s" % (traceback.format_exc())
234 tree = etree.parse(StringIO(agg_rspec))
235 except etree.XMLSyntaxError:
236 message = agg + ": " + str(sys.exc_info()[1])
237 raise InvalidRSpec(message)
239 root = tree.getroot()
240 if root.get("type") in ["SFA"]:
244 for network in root.iterfind("./network"):
245 rspec.append(deepcopy(network))
246 for request in root.iterfind("./request"):
247 rspec.append(deepcopy(request))
249 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
250 if api.cache and not xrn:
251 api.cache.add('nodes', rspec)
256 Returns the request context required by sfatables. At some point, this
257 mechanism should be changed to refer to "contexts", which is the
258 information that sfatables is requesting. But for now, we just return
259 the basic information needed in a dict.
261 def fetch_context(slice_hrn, user_hrn, contexts):
262 #slice_hrn = urn_to_hrn(slice_xrn)[0]
263 #user_hrn = urn_to_hrn(user_xrn)[0]
264 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
269 r.parseFile(sys.argv[1])
271 create_slice(None,'plc.princeton.tmacktestslice',rspec)
273 if __name__ == "__main__":