slice manager sets 'geni_compressed' option to False when calling ListResources at...
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspecHelper import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
26 from copy import copy
27
28 def get_version():
29     version = {}
30     version['geni_api'] = 1
31     version['sfa'] = 1
32     return version
33
34 def slice_status(api, slice_xrn, creds ):
35     result = {}
36     result['geni_urn'] = slice_xrn
37     result['geni_status'] = 'unknown'
38     result['geni_resources'] = {}
39     return result
40
41 def create_slice(api, xrn, creds, rspec, users):
42     hrn, type = urn_to_hrn(xrn)
43
44     # Validate the RSpec against PlanetLab's schema --disabled for now
45     # The schema used here needs to aggregate the PL and VINI schemas
46     # schema = "/var/www/html/schemas/pl.rng"
47     schema = None
48     if schema:
49         try:
50             tree = etree.parse(StringIO(rspec))
51         except etree.XMLSyntaxError:
52             message = str(sys.exc_info()[1])
53             raise InvalidRSpec(message)
54
55         relaxng_doc = etree.parse(schema)
56         relaxng = etree.RelaxNG(relaxng_doc)
57         
58         if not relaxng(tree):
59             error = relaxng.error_log.last_error
60             message = "%s (line %s)" % (error.message, error.line)
61             raise InvalidRSpec(message)
62
63     # XX
64     # XX TODO: Should try to use delegated credential first
65     # XX
66     cred = api.getCredential()
67     threads = ThreadManager()
68     for aggregate in api.aggregates:
69         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
70             server = api.aggregates[aggregate]
71             # Just send entire RSpec to each aggregate
72             threads.run(server.CreateSliver, xrn, cred, rspec, users)
73             
74     results = threads.get_results() 
75     merged_rspec = merge_rspecs(results)
76     return merged_rspec
77
78 def renew_slice(api, xrn, creds, expiration_time):
79     # XX
80     # XX TODO: Should try to use delegated credential first
81     # XX
82     credential = api.getCredential()
83     threads = ThreadManager()
84     for aggregate in api.aggregates:
85         server = api.aggregates[aggregate]
86         threads.run(server.RenewSliver, xrn, credential, expiration_time)
87     threads.get_results()
88     return 1
89
90 def get_ticket(api, xrn, creds, rspec, users):
91     slice_hrn, type = urn_to_hrn(xrn)
92     # get the netspecs contained within the clients rspec
93     aggregate_rspecs = {}
94     tree= etree.parse(StringIO(rspec))
95     elements = tree.findall('./network')
96     for element in elements:
97         aggregate_hrn = element.values()[0]
98         aggregate_rspecs[aggregate_hrn] = rspec 
99
100     # get a ticket from each aggregate 
101     credential = api.getCredential()
102     threads = ThreadManager()
103     for aggregate, aggregate_rspec in aggregate_rspecs.items():
104         server = None
105         if aggregate in api.aggregates:
106             server = api.aggregates[aggregate]
107         else:
108             net_urn = hrn_to_urn(aggregate, 'authority')     
109             # we may have a peer that knows about this aggregate
110             for agg in api.aggregates:
111                 target_aggs = api.aggregates[agg].get_aggregates(credential, net_urn)
112                 if not target_aggs or not 'hrn' in target_aggs[0]:
113                     continue
114                 # send the request to this address 
115                 url = target_aggs[0]['url']
116                 server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
117                 # aggregate found, no need to keep looping
118                 break   
119         if server is None:
120             continue 
121         threads.run(server.GetTicket, xrn, credential, aggregate_rspec, users)
122
123     results = threads.get_results()
124     
125     # gather information from each ticket 
126     rspecs = []
127     initscripts = []
128     slivers = [] 
129     object_gid = None  
130     for result in results:
131         agg_ticket = SfaTicket(string=result)
132         attrs = agg_ticket.get_attributes()
133         if not object_gid:
134             object_gid = agg_ticket.get_gid_object()
135         rspecs.append(agg_ticket.get_rspec())
136         initscripts.extend(attrs.get('initscripts', [])) 
137         slivers.extend(attrs.get('slivers', [])) 
138     
139     # merge info
140     attributes = {'initscripts': initscripts,
141                  'slivers': slivers}
142     merged_rspec = merge_rspecs(rspecs) 
143
144     # create a new ticket
145     ticket = SfaTicket(subject = slice_hrn)
146     ticket.set_gid_caller(api.auth.client_gid)
147     ticket.set_issuer(key=api.key, subject=api.hrn)
148     ticket.set_gid_object(object_gid)
149     ticket.set_pubkey(object_gid.get_pubkey())
150     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
151     ticket.set_attributes(attributes)
152     ticket.set_rspec(merged_rspec)
153     ticket.encode()
154     ticket.sign()          
155     return ticket.save_to_string(save_parents=True)
156
157
158 def delete_slice(api, xrn, origin_hrn=None):
159     # XX
160     # XX TODO: Should try to use delegated credential first
161     # XX
162     credential = api.getCredential()
163     threads = ThreadManager()
164     for aggregate in api.aggregates:
165         server = api.aggregates[aggregate]
166         threads.run(server.DeleteSliver, xrn, credential)
167     threads.get_results()
168     return 1
169
170 def start_slice(api, xrn, creds):
171     # XX
172     # XX TODO: Should try to use delegated credential first
173     # XX
174     credential = api.getCredential()
175     threads = ThreadManager()
176     for aggregate in api.aggregates:
177         server = api.aggregates[aggregate]
178         threads.run(server.Start, xrn, credential)
179     threads.get_results()    
180     return 1
181  
182 def stop_slice(api, xrn, creds):
183     # XX
184     # XX TODO: Should try to use delegated credential first
185     # XX
186     credential = api.getCredential()
187     threads = ThreadManager()
188     for aggregate in api.aggregates:
189         server = api.aggregates[aggregate]
190         threads.run(server.Stop, xrn, credential)
191     threads.get_results()    
192     return 1
193
194 def reset_slice(api, xrn):
195     """
196     Not implemented
197     """
198     return 1
199
200 def shutdown(api, xrn, creds):
201     """
202     Not implemented   
203     """
204     return 1
205
206 def status(api, xrn, creds):
207     """
208     Not implemented 
209     """
210     return 1
211
212 def get_slices(api, creds):
213     # look in cache first
214     if api.cache:
215         slices = api.cache.get('slices')
216         if slices:
217             return slices    
218
219     # fetch from aggregates
220     slices = []
221     credential = api.getCredential()
222     threads = ThreadManager()
223     for aggregate in api.aggregates:
224         server = api.aggregates[aggregate]
225         threads.run(server.ListSlices, credential)
226
227     # combime results
228     results = threads.get_results()
229     slices = []
230     for result in results:
231         slices.extend(result)
232     
233     # cache the result
234     if api.cache:
235         api.cache.add('slices', slices)
236
237     return slices
238  
239 def get_rspec(api, creds, options):
240     # get slice's hrn from options
241     xrn = options.get('geni_slice_urn', None)
242     hrn, type = urn_to_hrn(xrn)
243
244     # get hrn of the original caller
245     origin_hrn = options.get('origin_hrn', None)
246     if not origin_hrn:
247         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
248     
249     # look in cache first 
250     if api.cache and not xrn:
251         rspec =  api.cache.get('nodes')
252         if rspec:
253             return rspec
254
255     hrn, type = urn_to_hrn(xrn)
256     rspec = None
257     # XX
258     # XX TODO: Should try to use delegated credential first 
259     # XX
260     cred = api.getCredential()
261     threads = ThreadManager()
262     
263     for aggregate in api.aggregates:
264         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:   
265             # get the rspec from the aggregate
266             server = api.aggregates[aggregate]
267             my_opts = copy(options)
268             my_opts['geni_compressed'] = False
269             threads.run(server.ListResources, cred, my_opts)
270             #threads.run(server.get_resources, cred, xrn, origin_hrn)
271                     
272
273     results = threads.get_results()
274     # combine the rspecs into a single rspec 
275     for agg_rspec in results:
276         try:
277             tree = etree.parse(StringIO(agg_rspec))
278         except etree.XMLSyntaxError:
279             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
280             raise InvalidRSpec(message)
281
282         root = tree.getroot()
283         if root.get("type") in ["SFA"]:
284             if rspec == None:
285                 rspec = root
286             else:
287                 for network in root.iterfind("./network"):
288                     rspec.append(deepcopy(network))
289                 for request in root.iterfind("./request"):
290                     rspec.append(deepcopy(request))
291     
292     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
293     # cache the result
294     if api.cache and not xrn:
295         api.cache.add('nodes', rspec)
296  
297     return rspec
298
299 def main():
300     r = RSpec()
301     r.parseFile(sys.argv[1])
302     rspec = r.toDict()
303     create_slice(None,'plc.princeton.tmacktestslice',rspec)
304
305 if __name__ == "__main__":
306     main()
307