addeg get_version method
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12 from sfa.util.rspec import merge_rspecs
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.threadmanager import ThreadManager
22 import sfa.util.xmlrpcprotocol as xmlrpcprotocol     
23 from sfa.util.debug import log
24 import sfa.plc.peers as peers
25
26 def get_version():
27     version = {}
28     version['geni_api'] = 1
29     return version
30
31 def delete_slice(api, xrn, origin_hrn=None):
32     credential = api.getCredential()
33     threads = ThreadManager()
34     for aggregate in api.aggregates:
35         server = api.aggregates[aggregate] 
36         threads.run(server.delete_slice, credential, xrn, origin_hrn)
37     threads.get_results()
38     return 1
39
40 def create_slice(api, xrn, rspec, origin_hrn=None):
41     hrn, type = urn_to_hrn(xrn)
42
43     # Validate the RSpec against PlanetLab's schema --disabled for now
44     # The schema used here needs to aggregate the PL and VINI schemas
45     # schema = "/var/www/html/schemas/pl.rng"
46     schema = None
47     if schema:
48         try:
49             tree = etree.parse(StringIO(rspec))
50         except etree.XMLSyntaxError:
51             message = str(sys.exc_info()[1])
52             raise InvalidRSpec(message)
53
54         relaxng_doc = etree.parse(schema)
55         relaxng = etree.RelaxNG(relaxng_doc)
56         
57         if not relaxng(tree):
58             error = relaxng.error_log.last_error
59             message = "%s (line %s)" % (error.message, error.line)
60             raise InvalidRSpec(message)
61
62     cred = api.getCredential()
63     threads = ThreadManager()
64     for aggregate in api.aggregates:
65         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
66             server = api.aggregates[aggregate]
67             # Just send entire RSpec to each aggregate
68             threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
69     threads.get_results() 
70     return 1
71
72 def get_ticket(api, xrn, rspec, origin_hrn=None):
73     slice_hrn, type = urn_to_hrn(xrn)
74     # get the netspecs contained within the clients rspec
75     aggregate_rspecs = {}
76     tree= etree.parse(StringIO(rspec))
77     elements = tree.findall('./network')
78     for element in elements:
79         aggregate_hrn = element.values()[0]
80         aggregate_rspecs[aggregate_hrn] = rspec 
81
82     # get a ticket from each aggregate 
83     credential = api.getCredential()
84     threads = ThreadManager()
85     for aggregate, aggregate_rspec in aggregate_rspecs.items():
86         server = None
87         if aggregate in api.aggregates:
88             server = api.aggregates[aggregate]
89         else:
90             net_urn = hrn_to_urn(aggregate, 'authority')     
91             # we may have a peer that knows about this aggregate
92             for agg in api.aggregates:
93                 agg_info = api.aggregates[agg].get_aggregates(credential, net_urn)
94                 if agg_info:
95                     # send the request to this address 
96                     url = 'http://%s:%s' % (agg_info['addr'], agg_info['port'])
97                     server = xmlrpcprotocol.get_server(url, api.key_file, api.cert_file)
98                     break   
99         if server is None:
100             continue 
101         threads.run(server.get_ticket, credential, xrn, aggregate_rspec, origin_hrn)
102     results = threads.get_results()
103     
104     # gather information from each ticket 
105     rspecs = []
106     initscripts = []
107     slivers = [] 
108     object_gid = None  
109     for result in results:
110         agg_ticket = SfaTicket(string=result)
111         attrs = agg_ticket.get_attributes()
112         if not object_gid:
113             object_gid = agg_ticket.get_gid_object()
114         print object_gid
115         rspecs.append(agg_ticket.get_rspec())
116         initscripts.extend(attrs.get('initscripts', [])) 
117         slivers.extend(attrs.get('slivers', [])) 
118     
119     # merge info
120     attributes = {'initscripts': initscripts,
121                  'slivers': slivers}
122     merged_rspec = merge_rspecs(rspecs) 
123
124     # create a new ticket
125     ticket = SfaTicket(subject = slice_hrn)
126     ticket.set_gid_caller(api.auth.client_gid)
127     ticket.set_issuer(key=api.key, subject=api.hrn)
128     ticket.set_gid_object(object_gid)
129     ticket.set_pubkey(object_gid.get_pubkey())
130     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
131     ticket.set_attributes(attributes)
132     ticket.set_rspec(merged_rspec)
133     ticket.encode()
134     ticket.sign()          
135     return ticket.save_to_string(save_parents=True)
136
137 def start_slice(api, xrn):
138     credential = api.getCredential()
139     threads = ThreadManager()
140     for aggregate in api.aggregates:
141         server = api.aggregates[aggregate]
142         threads.run(server.stop_slice, credential, xrn)
143     threads.get_results()    
144     return 1
145  
146 def stop_slice(api, xrn):
147     credential = api.getCredential()
148     threads = ThreadManager()
149     for aggregate in api.aggregates:
150         server = api.aggregates[aggregate]
151         threads.run(server.stop_slice, credential, xrn)
152     threads.get_results()    
153     return 1
154
155 def reset_slice(api, xrn):
156     # XX not implemented at this interface
157     return 1
158
159 def get_slices(api):
160     # look in cache first
161     if api.cache:
162         slices = api.cache.get('slices')
163         if slices:
164             return slices    
165
166     # fetch from aggregates
167     slices = []
168     credential = api.getCredential()
169     threads = ThreadManager()
170     for aggregate in api.aggregates:
171         server = api.aggregates[aggregate]
172         threads.run(server.get_slices, credential)
173
174     # combime results
175     results = threads.get_results()
176     slices = []
177     for result in results:
178         slices.extend(result)
179     
180     # cache the result
181     if api.cache:
182         api.cache.add('slices', slices)
183
184     return slices
185  
186 def get_rspec(api, xrn=None, origin_hrn=None):
187     # look in cache first 
188     if api.cache and not xrn:
189         rspec =  api.cache.get('nodes')
190         if rspec:
191             return rspec
192
193     hrn, type = urn_to_hrn(xrn)
194     rspec = None
195     cred = api.getCredential()
196     threads = ThreadManager()
197     for aggregate in api.aggregates:
198         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
199             # get the rspec from the aggregate
200             server = api.aggregates[aggregate]
201             threads.run(server.get_resources, cred, xrn, origin_hrn)
202
203     results = threads.get_results()
204     # combine the rspecs into a single rspec 
205     for agg_rspec in results:
206         try:
207             tree = etree.parse(StringIO(agg_rspec))
208         except etree.XMLSyntaxError:
209             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
210             raise InvalidRSpec(message)
211
212         root = tree.getroot()
213         if root.get("type") in ["SFA"]:
214             if rspec == None:
215                 rspec = root
216             else:
217                 for network in root.iterfind("./network"):
218                     rspec.append(deepcopy(network))
219                 for request in root.iterfind("./request"):
220                     rspec.append(deepcopy(request))
221
222     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
223     # cache the result
224     if api.cache and not xrn:
225         api.cache.add('nodes', rspec)
226  
227     return rspec
228
229 """
230 Returns the request context required by sfatables. At some point, this
231 mechanism should be changed to refer to "contexts", which is the
232 information that sfatables is requesting. But for now, we just return
233 the basic information needed in a dict.
234 """
235 def fetch_context(slice_hrn, user_hrn, contexts):
236     #slice_hrn = urn_to_hrn(slice_xrn)[0]
237     #user_hrn = urn_to_hrn(user_xrn)[0]
238     base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
239     return base_context
240
241 def main():
242     r = RSpec()
243     r.parseFile(sys.argv[1])
244     rspec = r.toDict()
245     create_slice(None,'plc.princeton.tmacktestslice',rspec)
246
247 if __name__ == "__main__":
248     main()
249