use ListResources instead of get_resources. Use CreateSliver instead of create_slice
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspecHelper import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.trust.credential import Credential
22 from sfa.util.threadmanager import ThreadManager
23 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
24 from sfa.util.debug import log
25 import sfa.plc.peers as peers
26
27 def get_version():
28     version = {}
29     version['geni_api'] = 1
30     return version
31
32 def delete_slice(api, xrn, origin_hrn=None):
33     credential = api.getCredential()
34     threads = ThreadManager()
35     for aggregate in api.aggregates:
36         server = api.aggregates[aggregate] 
37         threads.run(server.delete_slice, credential, xrn, origin_hrn)
38     threads.get_results()
39     return 1
40
41 def create_slice(api, xrn, creds, rspec, users):
42     hrn, type = urn_to_hrn(xrn)
43
44     # Validate the RSpec against PlanetLab's schema --disabled for now
45     # The schema used here needs to aggregate the PL and VINI schemas
46     # schema = "/var/www/html/schemas/pl.rng"
47     schema = None
48     if schema:
49         try:
50             tree = etree.parse(StringIO(rspec))
51         except etree.XMLSyntaxError:
52             message = str(sys.exc_info()[1])
53             raise InvalidRSpec(message)
54
55         relaxng_doc = etree.parse(schema)
56         relaxng = etree.RelaxNG(relaxng_doc)
57         
58         if not relaxng(tree):
59             error = relaxng.error_log.last_error
60             message = "%s (line %s)" % (error.message, error.line)
61             raise InvalidRSpec(message)
62
63     # XX
64     # XX TODO: Should try to use delegated credential first
65     # XX
66     cred = api.getCredential()
67     threads = ThreadManager()
68     for aggregate in api.aggregates:
69         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
70             server = api.aggregates[aggregate]
71             # Just send entire RSpec to each aggregate
72             threads.run(server.CreateSliver, xrn, cred, rspec, users)
73             
74     threads.get_results() 
75     return 1
76
77 def get_ticket(api, xrn, rspec, origin_hrn=None):
78     slice_hrn, type = urn_to_hrn(xrn)
79     # get the netspecs contained within the clients rspec
80     aggregate_rspecs = {}
81     tree= etree.parse(StringIO(rspec))
82     elements = tree.findall('./network')
83     for element in elements:
84         aggregate_hrn = element.values()[0]
85         aggregate_rspecs[aggregate_hrn] = rspec 
86
87     # get a ticket from each aggregate 
88     credential = api.getCredential()
89     threads = ThreadManager()
90     for aggregate, aggregate_rspec in aggregate_rspecs.items():
91         server = None
92         if aggregate in api.aggregates:
93             server = api.aggregates[aggregate]
94         else:
95             net_urn = hrn_to_urn(aggregate, 'authority')     
96             # we may have a peer that knows about this aggregate
97             for agg in api.aggregates:
98                 agg_info = api.aggregates[agg].get_aggregates(credential, net_urn)
99                 if agg_info:
100                     # send the request to this address 
101                     url = 'http://%s:%s' % (agg_info['addr'], agg_info['port'])
102                     server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
103                     break   
104         if server is None:
105             continue 
106         threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn)
107     results = threads.get_results()
108     
109     # gather information from each ticket 
110     rspecs = []
111     initscripts = []
112     slivers = [] 
113     object_gid = None  
114     for result in results:
115         agg_ticket = SfaTicket(string=result)
116         attrs = agg_ticket.get_attributes()
117         if not object_gid:
118             object_gid = agg_ticket.get_gid_object()
119         rspecs.append(agg_ticket.get_rspec())
120         initscripts.extend(attrs.get('initscripts', [])) 
121         slivers.extend(attrs.get('slivers', [])) 
122     
123     # merge info
124     attributes = {'initscripts': initscripts,
125                  'slivers': slivers}
126     merged_rspec = merge_rspecs(rspecs) 
127
128     # create a new ticket
129     ticket = SfaTicket(subject = slice_hrn)
130     ticket.set_gid_caller(api.auth.client_gid)
131     ticket.set_issuer(key=api.key, subject=api.hrn)
132     ticket.set_gid_object(object_gid)
133     ticket.set_pubkey(object_gid.get_pubkey())
134     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
135     ticket.set_attributes(attributes)
136     ticket.set_rspec(merged_rspec)
137     ticket.encode()
138     ticket.sign()          
139     return ticket.save_to_string(save_parents=True)
140
141 def start_slice(api, xrn):
142     credential = api.getCredential()
143     threads = ThreadManager()
144     for aggregate in api.aggregates:
145         server = api.aggregates[aggregate]
146         threads.run(server.stop_slice, credential, xrn)
147     threads.get_results()    
148     return 1
149  
150 def stop_slice(api, xrn):
151     credential = api.getCredential()
152     threads = ThreadManager()
153     for aggregate in api.aggregates:
154         server = api.aggregates[aggregate]
155         threads.run(server.stop_slice, credential, xrn)
156     threads.get_results()    
157     return 1
158
159 def reset_slice(api, xrn):
160     # XX not implemented at this interface
161     return 1
162
163 def get_slices(api):
164     # look in cache first
165     if api.cache:
166         slices = api.cache.get('slices')
167         if slices:
168             return slices    
169
170     # fetch from aggregates
171     slices = []
172     credential = api.getCredential()
173     threads = ThreadManager()
174     for aggregate in api.aggregates:
175         server = api.aggregates[aggregate]
176         threads.run(server.get_slices, credential)
177
178     # combime results
179     results = threads.get_results()
180     slices = []
181     for result in results:
182         slices.extend(result)
183     
184     # cache the result
185     if api.cache:
186         api.cache.add('slices', slices)
187
188     return slices
189  
190 def get_rspec(api, creds, options):
191     # get slice's hrn from options
192     xrn = options.get('geni_slice_urn', None)
193     hrn, type = urn_to_hrn(xrn)
194
195     # get hrn of the original caller
196     origin_hrn = options.get('origin_hrn', None)
197     if not origin_hrn:
198         origin_hrn = Credential(string=creds[0]).get_gid_caller().get_hrn()
199     
200     # look in cache first 
201     if api.cache and not xrn:
202         rspec =  api.cache.get('nodes')
203         if rspec:
204             return rspec
205
206     hrn, type = urn_to_hrn(xrn)
207     rspec = None
208     # XX
209     # XX TODO: Should try to use delegated credential first 
210     # XX
211     cred = api.getCredential()
212     threads = ThreadManager()
213     
214     for aggregate in api.aggregates:
215         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:   
216             # get the rspec from the aggregate
217             server = api.aggregates[aggregate]
218             threads.run(server.ListResources, cred, options)
219             #threads.run(server.get_resources, cred, xrn, origin_hrn)
220                     
221
222     results = threads.get_results()
223     # combine the rspecs into a single rspec 
224     for agg_rspec in results:
225         try:
226             tree = etree.parse(StringIO(agg_rspec))
227         except etree.XMLSyntaxError:
228             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
229             raise InvalidRSpec(message)
230
231         root = tree.getroot()
232         if root.get("type") in ["SFA"]:
233             if rspec == None:
234                 rspec = root
235             else:
236                 for network in root.iterfind("./network"):
237                     rspec.append(deepcopy(network))
238                 for request in root.iterfind("./request"):
239                     rspec.append(deepcopy(request))
240     
241     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
242     # cache the result
243     if api.cache and not xrn:
244         api.cache.add('nodes', rspec)
245  
246     return rspec
247
248 """
249 Returns the request context required by sfatables. At some point, this
250 mechanism should be changed to refer to "contexts", which is the
251 information that sfatables is requesting. But for now, we just return
252 the basic information needed in a dict.
253 """
254 def fetch_context(slice_hrn, user_hrn, contexts):
255     #slice_hrn = urn_to_hrn(slice_xrn)[0]
256     #user_hrn = urn_to_hrn(user_xrn)[0]
257     base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
258     return base_context
259
260 def main():
261     r = RSpec()
262     r.parseFile(sys.argv[1])
263     rspec = r.toDict()
264     create_slice(None,'plc.princeton.tmacktestslice',rspec)
265
266 if __name__ == "__main__":
267     main()
268