fix bug where slicemgr will skip local interface when the calling cred is delegated...
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspecHelper import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
26 from copy import copy
27
28 def get_version():
29     version = {}
30     version['geni_api'] = 1
31     version['sfa'] = 1
32     return version
33
34 def slice_status(api, slice_xrn, creds ):
35     result = {}
36     result['geni_urn'] = slice_xrn
37     result['geni_status'] = 'unknown'
38     result['geni_resources'] = {}
39     return result
40
41 def create_slice(api, xrn, creds, rspec, users):
42     hrn, type = urn_to_hrn(xrn)
43
44     # Validate the RSpec against PlanetLab's schema --disabled for now
45     # The schema used here needs to aggregate the PL and VINI schemas
46     # schema = "/var/www/html/schemas/pl.rng"
47     schema = None
48     if schema:
49         try:
50             tree = etree.parse(StringIO(rspec))
51         except etree.XMLSyntaxError:
52             message = str(sys.exc_info()[1])
53             raise InvalidRSpec(message)
54
55         relaxng_doc = etree.parse(schema)
56         relaxng = etree.RelaxNG(relaxng_doc)
57         
58         if not relaxng(tree):
59             error = relaxng.error_log.last_error
60             message = "%s (line %s)" % (error.message, error.line)
61             raise InvalidRSpec(message)
62
63     # XX
64     # XX TODO: Should try to use delegated credential first
65     # XX
66     cred = api.getCredential()
67     threads = ThreadManager()
68     for aggregate in api.aggregates:
69         # Just send entire RSpec to each aggregate
70         server = api.aggregates[aggregate]
71         threads.run(server.CreateSliver, xrn, cred, rspec, users)
72             
73     results = threads.get_results() 
74     merged_rspec = merge_rspecs(results)
75     return merged_rspec
76
77 def renew_slice(api, xrn, creds, expiration_time):
78     # XX
79     # XX TODO: Should try to use delegated credential first
80     # XX
81     credential = api.getCredential()
82     threads = ThreadManager()
83     for aggregate in api.aggregates:
84         server = api.aggregates[aggregate]
85         threads.run(server.RenewSliver, xrn, credential, expiration_time)
86     threads.get_results()
87     return 1
88
89 def get_ticket(api, xrn, creds, rspec, users):
90     slice_hrn, type = urn_to_hrn(xrn)
91     # get the netspecs contained within the clients rspec
92     aggregate_rspecs = {}
93     tree= etree.parse(StringIO(rspec))
94     elements = tree.findall('./network')
95     for element in elements:
96         aggregate_hrn = element.values()[0]
97         aggregate_rspecs[aggregate_hrn] = rspec 
98
99     # get a ticket from each aggregate 
100     credential = api.getCredential()
101     threads = ThreadManager()
102     for aggregate, aggregate_rspec in aggregate_rspecs.items():
103         server = None
104         if aggregate in api.aggregates:
105             server = api.aggregates[aggregate]
106         else:
107             net_urn = hrn_to_urn(aggregate, 'authority')     
108             # we may have a peer that knows about this aggregate
109             for agg in api.aggregates:
110                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
111                 if not target_aggs or not 'hrn' in target_aggs[0]:
112                     continue
113                 # send the request to this address 
114                 url = target_aggs[0]['url']
115                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
116                 # aggregate found, no need to keep looping
117                 break   
118         if server is None:
119             continue 
120         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
121
122     results = threads.get_results()
123     
124     # gather information from each ticket 
125     rspecs = []
126     initscripts = []
127     slivers = [] 
128     object_gid = None  
129     for result in results:
130         agg_ticket = SfaTicket(string=result)
131         attrs = agg_ticket.get_attributes()
132         if not object_gid:
133             object_gid = agg_ticket.get_gid_object()
134         rspecs.append(agg_ticket.get_rspec())
135         initscripts.extend(attrs.get('initscripts', [])) 
136         slivers.extend(attrs.get('slivers', [])) 
137     
138     # merge info
139     attributes = {'initscripts': initscripts,
140                  'slivers': slivers}
141     merged_rspec = merge_rspecs(rspecs) 
142
143     # create a new ticket
144     ticket = SfaTicket(subject = slice_hrn)
145     ticket.set_gid_caller(api.auth.client_gid)
146     ticket.set_issuer(key=api.key, subject=api.hrn)
147     ticket.set_gid_object(object_gid)
148     ticket.set_pubkey(object_gid.get_pubkey())
149     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
150     ticket.set_attributes(attributes)
151     ticket.set_rspec(merged_rspec)
152     ticket.encode()
153     ticket.sign()          
154     return ticket.save_to_string(save_parents=True)
155
156
157 def delete_slice(api, xrn, origin_hrn=None):
158     # XX
159     # XX TODO: Should try to use delegated credential first
160     # XX
161     credential = api.getCredential()
162     threads = ThreadManager()
163     for aggregate in api.aggregates:
164         server = api.aggregates[aggregate]
165         threads.run(server.DeleteSliver, xrn, credential)
166     threads.get_results()
167     return 1
168
169 def start_slice(api, xrn, creds):
170     # XX
171     # XX TODO: Should try to use delegated credential first
172     # XX
173     credential = api.getCredential()
174     threads = ThreadManager()
175     for aggregate in api.aggregates:
176         server = api.aggregates[aggregate]
177         threads.run(server.Start, xrn, credential)
178     threads.get_results()    
179     return 1
180  
181 def stop_slice(api, xrn, creds):
182     # XX
183     # XX TODO: Should try to use delegated credential first
184     # XX
185     credential = api.getCredential()
186     threads = ThreadManager()
187     for aggregate in api.aggregates:
188         server = api.aggregates[aggregate]
189         threads.run(server.Stop, xrn, credential)
190     threads.get_results()    
191     return 1
192
193 def reset_slice(api, xrn):
194     """
195     Not implemented
196     """
197     return 1
198
199 def shutdown(api, xrn, creds):
200     """
201     Not implemented   
202     """
203     return 1
204
205 def status(api, xrn, creds):
206     """
207     Not implemented 
208     """
209     return 1
210
211 def get_slices(api, creds):
212     # look in cache first
213     if api.cache:
214         slices = api.cache.get('slices')
215         if slices:
216             return slices    
217
218     # fetch from aggregates
219     slices = []
220     credential = api.getCredential()
221     threads = ThreadManager()
222     for aggregate in api.aggregates:
223         server = api.aggregates[aggregate]
224         threads.run(server.ListSlices, credential)
225
226     # combime results
227     results = threads.get_results()
228     slices = []
229     for result in results:
230         slices.extend(result)
231     
232     # cache the result
233     if api.cache:
234         api.cache.add('slices', slices)
235
236     return slices
237  
238 def get_rspec(api, creds, options):
239     # get slice's hrn from options
240     xrn = options.get('geni_slice_urn', None)
241     hrn, type = urn_to_hrn(xrn)
242
243     # get hrn of the original caller
244     origin_hrn = options.get('origin_hrn', None)
245     if not origin_hrn:
246         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
247     
248     # look in cache first 
249     if api.cache and not xrn:
250         rspec =  api.cache.get('nodes')
251         if rspec:
252             return rspec
253
254     hrn, type = urn_to_hrn(xrn)
255     rspec = None
256     # XX
257     # XX TODO: Should try to use delegated credential first 
258     # XX
259     cred = api.getCredential()
260     threads = ThreadManager()
261     
262     for aggregate in api.aggregates:
263         # get the rspec from the aggregate
264         server = api.aggregates[aggregate]
265         my_opts = copy(options)
266         my_opts['geni_compressed'] = False
267         threads.run(server.ListResources, cred, my_opts)
268         #threads.run(server.get_resources, cred, xrn, origin_hrn)
269                     
270     results = threads.get_results()
271     # combine the rspecs into a single rspec 
272     for agg_rspec in results:
273         try:
274             tree = etree.parse(StringIO(agg_rspec))
275         except etree.XMLSyntaxError:
276             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
277             raise InvalidRSpec(message)
278
279         root = tree.getroot()
280         if root.get("type") in ["SFA"]:
281             if rspec == None:
282                 rspec = root
283             else:
284                 for network in root.iterfind("./network"):
285                     rspec.append(deepcopy(network))
286                 for request in root.iterfind("./request"):
287                     rspec.append(deepcopy(request))
288     
289     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
290     # cache the result
291     if api.cache and not xrn:
292         api.cache.add('nodes', rspec)
293  
294     return rspec
295
296 def main():
297     r = RSpec()
298     r.parseFile(sys.argv[1])
299     rspec = r.toDict()
300     create_slice(None,'plc.princeton.tmacktestslice',rspec)
301
302 if __name__ == "__main__":
303     main()
304