fig bug in get_ticket(), ticket now has merged rspec
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspec import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.threadmanager import ThreadManager
22 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
23 from sfa.util.debug import log
24 import sfa.plc.peers as peers
25
26 def delete_slice(api, xrn, origin_hrn=None):
27     credential = api.getCredential()
28     threads = ThreadManager()
29     for aggregate in api.aggregates:
30         server = api.aggregates[aggregate] 
31         threads.run(server.delete_slice, credential, xrn, origin_hrn)
32     threads.get_results()
33     return 1
34
35 def create_slice(api, xrn, rspec, origin_hrn=None):
36     hrn, type = urn_to_hrn(xrn)
37
38     # Validate the RSpec against PlanetLab's schema --disabled for now
39     # The schema used here needs to aggregate the PL and VINI schemas
40     # schema = "/var/www/html/schemas/pl.rng"
41     schema = None
42     if schema:
43         try:
44             tree = etree.parse(StringIO(rspec))
45         except etree.XMLSyntaxError:
46             message = str(sys.exc_info()[1])
47             raise InvalidRSpec(message)
48
49         relaxng_doc = etree.parse(schema)
50         relaxng = etree.RelaxNG(relaxng_doc)
51         
52         if not relaxng(tree):
53             error = relaxng.error_log.last_error
54             message = "%s (line %s)" % (error.message, error.line)
55             raise InvalidRSpec(message)
56
57     cred = api.getCredential()
58     threads = ThreadManager()
59     for aggregate in api.aggregates:
60         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
61             server = api.aggregates[aggregate]
62             # Just send entire RSpec to each aggregate
63             threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
64     threads.get_results() 
65     return 1
66
67 def get_ticket(api, xrn, rspec, origin_hrn=None):
68     slice_hrn, type = urn_to_hrn(xrn)
69     # get the netspecs contained within the clients rspec
70     aggregate_rspecs = {}
71     tree= etree.parse(StringIO(rspec))
72     elements = tree.findall('./network')
73     for element in elements:
74         aggregate_hrn = element.values()[0]
75         aggregate_rspecs[aggregate_hrn] = rspec 
76
77     # get a ticket from each aggregate 
78     credential = api.getCredential()
79     threads = ThreadManager()
80     for aggregate, aggregate_rspec in aggregate_rspecs.items():
81         server = None
82         if aggregate in api.aggregates:
83             server = api.aggregates[aggregate]
84         else:
85             net_urn = hrn_to_urn(aggregate, 'authority')     
86             # we may have a peer that knows about this aggregate
87             for agg in api.aggregates:
88                 agg_info = api.aggregates[agg].get_aggregates(credential, net_urn)
89                 if agg_info:
90                     # send the request to this address 
91                     url = 'http://%s:%s' % (agg_info['addr'], agg_info['port'])
92                     server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
93                     break   
94         if server is None:
95             continue 
96         threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn)
97     results = threads.get_results()
98     
99     # gather information from each ticket 
100     rspecs = []
101     initscripts = []
102     slivers = [] 
103     object_gid = None  
104     for result in results:
105         agg_ticket = SfaTicket(string=result)
106         attrs = agg_ticket.get_attributes()
107         if not object_gid:
108             object_gid = agg_ticket.get_gid_object()
109         print object_gid
110         rspecs.append(agg_ticket.get_rspec())
111         initscripts.extend(attrs.get('initscripts', [])) 
112         slivers.extend(attrs.get('slivers', [])) 
113     
114     # merge info
115     attributes = {'initscripts': initscripts,
116                  'slivers': slivers}
117     merged_rspec = merge_rspecs(rspecs) 
118
119     # create a new ticket
120     ticket = SfaTicket(subject = slice_hrn)
121     ticket.set_gid_caller(api.auth.client_gid)
122     ticket.set_issuer(key=api.key, subject=api.hrn)
123     ticket.set_gid_object(object_gid)
124     ticket.set_pubkey(object_gid.get_pubkey())
125     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
126     ticket.set_attributes(attributes)
127     ticket.set_rspec(merged_rspec)
128     ticket.encode()
129     ticket.sign()          
130     return ticket.save_to_string(save_parents=True)
131
132 def start_slice(api, xrn):
133     credential = api.getCredential()
134     threads = ThreadManager()
135     for aggregate in api.aggregates:
136         server = api.aggregates[aggregate]
137         threads.run(server.stop_slice, credential, xrn)
138     threads.get_results()    
139     return 1
140  
141 def stop_slice(api, xrn):
142     credential = api.getCredential()
143     threads = ThreadManager()
144     for aggregate in api.aggregates:
145         server = api.aggregates[aggregate]
146         threads.run(server.stop_slice, credential, xrn)
147     threads.get_results()    
148     return 1
149
150 def reset_slice(api, xrn):
151     # XX not implemented at this interface
152     return 1
153
154 def get_slices(api):
155     # look in cache first
156     if api.cache:
157         slices = api.cache.get('slices')
158         if slices:
159             return slices    
160
161     # fetch from aggregates
162     slices = []
163     credential = api.getCredential()
164     threads = ThreadManager()
165     for aggregate in api.aggregates:
166         server = api.aggregates[aggregate]
167         threads.run(server.get_slices, credential)
168
169     # combime results
170     results = threads.get_results()
171     slices = []
172     for result in results:
173         slices.extend(result)
174     
175     # cache the result
176     if api.cache:
177         api.cache.add('slices', slices)
178
179     return slices
180  
181 def get_rspec(api, xrn=None, origin_hrn=None):
182     # look in cache first 
183     if api.cache and not xrn:
184         rspec =  api.cache.get('nodes')
185         if rspec:
186             return rspec
187
188     hrn, type = urn_to_hrn(xrn)
189     rspec = None
190     cred = api.getCredential()
191     threads = ThreadManager()
192     for aggregate in api.aggregates:
193         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
194             # get the rspec from the aggregate
195             server = api.aggregates[aggregate]
196             threads.run(server.get_resources, cred, xrn, origin_hrn)
197
198     results = threads.get_results()
199     # combine the rspecs into a single rspec 
200     for agg_rspec in results:
201         try:
202             tree = etree.parse(StringIO(agg_rspec))
203         except etree.XMLSyntaxError:
204             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
205             raise InvalidRSpec(message)
206
207         root = tree.getroot()
208         if root.get("type") in ["SFA"]:
209             if rspec == None:
210                 rspec = root
211             else:
212                 for network in root.iterfind("./network"):
213                     rspec.append(deepcopy(network))
214                 for request in root.iterfind("./request"):
215                     rspec.append(deepcopy(request))
216
217     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
218     # cache the result
219     if api.cache and not xrn:
220         api.cache.add('nodes', rspec)
221  
222     return rspec
223
224 """
225 Returns the request context required by sfatables. At some point, this
226 mechanism should be changed to refer to "contexts", which is the
227 information that sfatables is requesting. But for now, we just return
228 the basic information needed in a dict.
229 """
230 def fetch_context(slice_hrn, user_hrn, contexts):
231     #slice_hrn = urn_to_hrn(slice_xrn)[0]
232     #user_hrn = urn_to_hrn(user_xrn)[0]
233     base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
234     return base_context
235
236 def main():
237     r = RSpec()
238     r.parseFile(sys.argv[1])
239     rspec = r.toDict()
240     create_slice(None,'plc.princeton.tmacktestslice',rspec)
241
242 if __name__ == "__main__":
243     main()
244