1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
8 from copy import deepcopy
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspec import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
29 version['geni_api'] = 1
32 def delete_slice(api, xrn, origin_hrn=None):
33 credential = api.getCredential()
34 threads = ThreadManager()
35 for aggregate in api.aggregates:
36 server = api.aggregates[aggregate]
37 threads.run(server.delete_slice, credential, xrn, origin_hrn)
41 def create_slice(api, xrn, rspec, origin_hrn=None):
42 hrn, type = urn_to_hrn(xrn)
44 # Validate the RSpec against PlanetLab's schema --disabled for now
45 # The schema used here needs to aggregate the PL and VINI schemas
46 # schema = "/var/www/html/schemas/pl.rng"
50 tree = etree.parse(StringIO(rspec))
51 except etree.XMLSyntaxError:
52 message = str(sys.exc_info()[1])
53 raise InvalidRSpec(message)
55 relaxng_doc = etree.parse(schema)
56 relaxng = etree.RelaxNG(relaxng_doc)
59 error = relaxng.error_log.last_error
60 message = "%s (line %s)" % (error.message, error.line)
61 raise InvalidRSpec(message)
63 cred = api.getCredential()
64 threads = ThreadManager()
65 for aggregate in api.aggregates:
66 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
67 server = api.aggregates[aggregate]
68 # Just send entire RSpec to each aggregate
69 threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
73 def get_ticket(api, xrn, rspec, origin_hrn=None):
74 slice_hrn, type = urn_to_hrn(xrn)
75 # get the netspecs contained within the clients rspec
77 tree= etree.parse(StringIO(rspec))
78 elements = tree.findall('./network')
79 for element in elements:
80 aggregate_hrn = element.values()[0]
81 aggregate_rspecs[aggregate_hrn] = rspec
83 # get a ticket from each aggregate
84 credential = api.getCredential()
85 threads = ThreadManager()
86 for aggregate, aggregate_rspec in aggregate_rspecs.items():
88 if aggregate in api.aggregates:
89 server = api.aggregates[aggregate]
91 net_urn = hrn_to_urn(aggregate, 'authority')
92 # we may have a peer that knows about this aggregate
93 for agg in api.aggregates:
94 agg_info = api.aggregates[agg].get_aggregates(credential, net_urn)
96 # send the request to this address
97 url = 'http://%s:%s' % (agg_info['addr'], agg_info['port'])
98 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
102 threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn)
103 results = threads.get_results()
105 # gather information from each ticket
110 for result in results:
111 agg_ticket = SfaTicket(string=result)
112 attrs = agg_ticket.get_attributes()
114 object_gid = agg_ticket.get_gid_object()
115 rspecs.append(agg_ticket.get_rspec())
116 initscripts.extend(attrs.get('initscripts', []))
117 slivers.extend(attrs.get('slivers', []))
120 attributes = {'initscripts': initscripts,
122 merged_rspec = merge_rspecs(rspecs)
124 # create a new ticket
125 ticket = SfaTicket(subject = slice_hrn)
126 ticket.set_gid_caller(api.auth.client_gid)
127 ticket.set_issuer(key=api.key, subject=api.hrn)
128 ticket.set_gid_object(object_gid)
129 ticket.set_pubkey(object_gid.get_pubkey())
130 #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
131 ticket.set_attributes(attributes)
132 ticket.set_rspec(merged_rspec)
135 return ticket.save_to_string(save_parents=True)
137 def start_slice(api, xrn):
138 credential = api.getCredential()
139 threads = ThreadManager()
140 for aggregate in api.aggregates:
141 server = api.aggregates[aggregate]
142 threads.run(server.stop_slice, credential, xrn)
143 threads.get_results()
146 def stop_slice(api, xrn):
147 credential = api.getCredential()
148 threads = ThreadManager()
149 for aggregate in api.aggregates:
150 server = api.aggregates[aggregate]
151 threads.run(server.stop_slice, credential, xrn)
152 threads.get_results()
155 def reset_slice(api, xrn):
156 # XX not implemented at this interface
160 # look in cache first
162 slices = api.cache.get('slices')
166 # fetch from aggregates
168 credential = api.getCredential()
169 threads = ThreadManager()
170 for aggregate in api.aggregates:
171 server = api.aggregates[aggregate]
172 threads.run(server.get_slices, credential)
175 results = threads.get_results()
177 for result in results:
178 slices.extend(result)
182 api.cache.add('slices', slices)
186 def get_rspec(api, creds, options):
187 # get slice's hrn from options
188 xrn = options.get('geni_slice_urn', None)
189 hrn, type = urn_to_hrn(xrn)
191 # get hrn of the original caller
192 origin_hrn = options.get('origin_hrn', None)
194 origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
196 # look in cache first
197 if api.cache and not xrn:
198 rspec = api.cache.get('nodes')
202 hrn, type = urn_to_hrn(xrn)
205 # XX TODO: Should try to use delegated credential first
207 cred = api.getCredential()
208 threads = ThreadManager()
210 for aggregate in api.aggregates:
211 if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
212 # get the rspec from the aggregate
213 server = api.aggregates[aggregate]
215 # XX TODO: switch to ProtoGeni spec in next release. Give other
216 # XX aggregtes a chacne to upgrade to this release before switching
218 # threads.run(server.ListResources, cred, options)
219 threads.run(server.get_resources, cred, xrn, origin_hrn)
222 results = threads.get_results()
223 # combine the rspecs into a single rspec
224 for agg_rspec in results:
226 tree = etree.parse(StringIO(agg_rspec))
227 except etree.XMLSyntaxError:
228 message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
229 raise InvalidRSpec(message)
231 root = tree.getroot()
232 if root.get("type") in ["SFA"]:
236 for network in root.iterfind("./network"):
237 rspec.append(deepcopy(network))
238 for request in root.iterfind("./request"):
239 rspec.append(deepcopy(request))
242 rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
244 if api.cache and not xrn:
245 api.cache.add('nodes', rspec)
250 Returns the request context required by sfatables. At some point, this
251 mechanism should be changed to refer to "contexts", which is the
252 information that sfatables is requesting. But for now, we just return
253 the basic information needed in a dict.
255 def fetch_context(slice_hrn, user_hrn, contexts):
256 #slice_hrn = urn_to_hrn(slice_xrn)[0]
257 #user_hrn = urn_to_hrn(user_xrn)[0]
258 base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
263 r.parseFile(sys.argv[1])
265 create_slice(None,'plc.princeton.tmacktestslice',rspec)
267 if __name__ == "__main__":