modify accept arguments for get_rspec, preparing to switch to protogeni api spec
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspec import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
26
27 def get_version():
28     version = {}
29     version['geni_api'] = 1
30     return version
31
32 def delete_slice(api, xrn, origin_hrn=None):
33     credential = api.getCredential()
34     threads = ThreadManager()
35     for aggregate in api.aggregates:
36         server = api.aggregates[aggregate] 
37         threads.run(server.delete_slice, credential, xrn, origin_hrn)
38     threads.get_results()
39     return 1
40
41 def create_slice(api, xrn, rspec, origin_hrn=None):
42     hrn, type = urn_to_hrn(xrn)
43
44     # Validate the RSpec against PlanetLab's schema --disabled for now
45     # The schema used here needs to aggregate the PL and VINI schemas
46     # schema = "/var/www/html/schemas/pl.rng"
47     schema = None
48     if schema:
49         try:
50             tree = etree.parse(StringIO(rspec))
51         except etree.XMLSyntaxError:
52             message = str(sys.exc_info()[1])
53             raise InvalidRSpec(message)
54
55         relaxng_doc = etree.parse(schema)
56         relaxng = etree.RelaxNG(relaxng_doc)
57         
58         if not relaxng(tree):
59             error = relaxng.error_log.last_error
60             message = "%s (line %s)" % (error.message, error.line)
61             raise InvalidRSpec(message)
62
63     cred = api.getCredential()
64     threads = ThreadManager()
65     for aggregate in api.aggregates:
66         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
67             server = api.aggregates[aggregate]
68             # Just send entire RSpec to each aggregate
69             threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
70     threads.get_results() 
71     return 1
72
73 def get_ticket(api, xrn, rspec, origin_hrn=None):
74     slice_hrn, type = urn_to_hrn(xrn)
75     # get the netspecs contained within the clients rspec
76     aggregate_rspecs = {}
77     tree= etree.parse(StringIO(rspec))
78     elements = tree.findall('./network')
79     for element in elements:
80         aggregate_hrn = element.values()[0]
81         aggregate_rspecs[aggregate_hrn] = rspec 
82
83     # get a ticket from each aggregate 
84     credential = api.getCredential()
85     threads = ThreadManager()
86     for aggregate, aggregate_rspec in aggregate_rspecs.items():
87         server = None
88         if aggregate in api.aggregates:
89             server = api.aggregates[aggregate]
90         else:
91             net_urn = hrn_to_urn(aggregate, 'authority')     
92             # we may have a peer that knows about this aggregate
93             for agg in api.aggregates:
94                 agg_info = api.aggregates[agg].get_aggregates(credential, net_urn)
95                 if agg_info:
96                     # send the request to this address 
97                     url = 'http://%s:%s' % (agg_info['addr'], agg_info['port'])
98                     server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
99                     break   
100         if server is None:
101             continue 
102         threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn)
103     results = threads.get_results()
104     
105     # gather information from each ticket 
106     rspecs = []
107     initscripts = []
108     slivers = [] 
109     object_gid = None  
110     for result in results:
111         agg_ticket = SfaTicket(string=result)
112         attrs = agg_ticket.get_attributes()
113         if not object_gid:
114             object_gid = agg_ticket.get_gid_object()
115         rspecs.append(agg_ticket.get_rspec())
116         initscripts.extend(attrs.get('initscripts', [])) 
117         slivers.extend(attrs.get('slivers', [])) 
118     
119     # merge info
120     attributes = {'initscripts': initscripts,
121                  'slivers': slivers}
122     merged_rspec = merge_rspecs(rspecs) 
123
124     # create a new ticket
125     ticket = SfaTicket(subject = slice_hrn)
126     ticket.set_gid_caller(api.auth.client_gid)
127     ticket.set_issuer(key=api.key, subject=api.hrn)
128     ticket.set_gid_object(object_gid)
129     ticket.set_pubkey(object_gid.get_pubkey())
130     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
131     ticket.set_attributes(attributes)
132     ticket.set_rspec(merged_rspec)
133     ticket.encode()
134     ticket.sign()          
135     return ticket.save_to_string(save_parents=True)
136
137 def start_slice(api, xrn):
138     credential = api.getCredential()
139     threads = ThreadManager()
140     for aggregate in api.aggregates:
141         server = api.aggregates[aggregate]
142         threads.run(server.stop_slice, credential, xrn)
143     threads.get_results()    
144     return 1
145  
146 def stop_slice(api, xrn):
147     credential = api.getCredential()
148     threads = ThreadManager()
149     for aggregate in api.aggregates:
150         server = api.aggregates[aggregate]
151         threads.run(server.stop_slice, credential, xrn)
152     threads.get_results()    
153     return 1
154
155 def reset_slice(api, xrn):
156     # XX not implemented at this interface
157     return 1
158
159 def get_slices(api):
160     # look in cache first
161     if api.cache:
162         slices = api.cache.get('slices')
163         if slices:
164             return slices    
165
166     # fetch from aggregates
167     slices = []
168     credential = api.getCredential()
169     threads = ThreadManager()
170     for aggregate in api.aggregates:
171         server = api.aggregates[aggregate]
172         threads.run(server.get_slices, credential)
173
174     # combime results
175     results = threads.get_results()
176     slices = []
177     for result in results:
178         slices.extend(result)
179     
180     # cache the result
181     if api.cache:
182         api.cache.add('slices', slices)
183
184     return slices
185  
186 def get_rspec(api, creds, options):
187     # get slice's hrn from options
188     xrn = options.get('geni_slice_urn', None)
189     hrn, type = urn_to_hrn(xrn)
190
191     # get hrn of the original caller
192     origin_hrn = options.get('origin_hrn', None)
193     if not origin_hrn:
194         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
195     
196     # look in cache first 
197     if api.cache and not xrn:
198         rspec =  api.cache.get('nodes')
199         if rspec:
200             return rspec
201
202     hrn, type = urn_to_hrn(xrn)
203     rspec = None
204     # XX
205     # XX TODO: Should try to use delegated credential first 
206     # XX
207     cred = api.getCredential()
208     threads = ThreadManager()
209     
210     for aggregate in api.aggregates:
211         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:   
212             # get the rspec from the aggregate
213             server = api.aggregates[aggregate]
214             # XX
215             # XX TODO: switch to ProtoGeni spec in next release. Give other 
216             # XX aggregtes a chacne to upgrade to this release before switching 
217             # XX 
218             # threads.run(server.ListResources, cred, options)
219             threads.run(server.get_resources, cred, xrn, origin_hrn)
220                     
221
222     results = threads.get_results()
223     # combine the rspecs into a single rspec 
224     for agg_rspec in results:
225         try:
226             tree = etree.parse(StringIO(agg_rspec))
227         except etree.XMLSyntaxError:
228             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
229             raise InvalidRSpec(message)
230
231         root = tree.getroot()
232         if root.get("type") in ["SFA"]:
233             if rspec == None:
234                 rspec = root
235             else:
236                 for network in root.iterfind("./network"):
237                     rspec.append(deepcopy(network))
238                 for request in root.iterfind("./request"):
239                     rspec.append(deepcopy(request))
240     
241     print results
242     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
243     # cache the result
244     if api.cache and not xrn:
245         api.cache.add('nodes', rspec)
246  
247     return rspec
248
249 """
250 Returns the request context required by sfatables. At some point, this
251 mechanism should be changed to refer to "contexts", which is the
252 information that sfatables is requesting. But for now, we just return
253 the basic information needed in a dict.
254 """
255 def fetch_context(slice_hrn, user_hrn, contexts):
256     #slice_hrn = urn_to_hrn(slice_xrn)[0]
257     #user_hrn = urn_to_hrn(user_xrn)[0]
258     base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
259     return base_context
260
261 def main():
262     r = RSpec()
263     r.parseFile(sys.argv[1])
264     rspec = r.toDict()
265     create_slice(None,'plc.princeton.tmacktestslice',rspec)
266
267 if __name__ == "__main__":
268     main()
269