extend threading functionality to other SM methods
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.threadmanager import ThreadManager
22 from sfa.util.debug import log
23 import sfa.plc.peers as peers
24
25 def delete_slice(api, xrn, origin_hrn=None):
26     credential = api.getCredential()
27     threads = ThreadManager()
28     for aggregate in api.aggregates:
29         server = api.aggregates[aggregate] 
30         threads.run(server.delete_slice, credential, xrn, origin_hrn)
31     threads.get_results()
32     return 1
33
34 def create_slice(api, xrn, rspec, origin_hrn=None):
35     hrn, type = urn_to_hrn(xrn)
36
37     # Validate the RSpec against PlanetLab's schema --disabled for now
38     # The schema used here needs to aggregate the PL and VINI schemas
39     # schema = "/var/www/html/schemas/pl.rng"
40     schema = None
41     if schema:
42         try:
43             tree = etree.parse(StringIO(rspec))
44         except etree.XMLSyntaxError:
45             message = str(sys.exc_info()[1])
46             raise InvalidRSpec(message)
47
48         relaxng_doc = etree.parse(schema)
49         relaxng = etree.RelaxNG(relaxng_doc)
50         
51         if not relaxng(tree):
52             error = relaxng.error_log.last_error
53             message = "%s (line %s)" % (error.message, error.line)
54             raise InvalidRSpec(message)
55
56     cred = api.getCredential()
57     threads = ThreadManager()
58     for aggregate in api.aggregates:
59         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:
60             server = api.aggregates[aggregate]
61             # Just send entire RSpec to each aggregate
62             threads.run(server.create_slice, cred, xrn, rspec, origin_hrn)
63     threads.get_results() 
64     return 1
65
66 def get_ticket(api, xrn, rspec, origin_hrn=None):
67     slice_hrn, type = urn_to_hrn(xrn)
68     # get the netspecs contained within the clients rspec
69     client_rspec = RSpec(xml=rspec)
70     netspecs = client_rspec.getDictsByTagName('NetSpec')
71     
72     # create an rspec for each individual rspec 
73     rspecs = {}
74     temp_rspec = RSpec()
75     for netspec in netspecs:
76         net_hrn = netspec['name']
77         resources = {'start_time': 0, 'end_time': 0 , 
78                      'network': {'NetSpec' : netspec}}
79         resourceDict = {'RSpec': resources}
80         temp_rspec.parseDict(resourceDict)
81         rspecs[net_hrn] = temp_rspec.toxml() 
82     
83     # send the rspec to the appropiate aggregate/sm
84     aggregates = api.aggregates
85     credential = api.getCredential()
86     tickets = {}
87     for net_hrn in rspecs:
88         net_urn = urn_to_hrn(net_hrn)     
89         try:
90             # if we are directly connected to the aggregate then we can just
91             # send them the request. if not, then we may be connected to an sm
92             # thats connected to the aggregate
93             if net_hrn in aggregates:
94                 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
95                             rspecs[net_hrn], origin_hrn)
96                 tickets[net_hrn] = ticket
97             else:
98                 # lets forward this rspec to a sm that knows about the network
99                 for agg in aggregates:
100                     network_found = aggregates[agg].get_aggregates(credential, net_urn)
101                     if network_found:
102                         ticket = aggregates[aggregate].get_ticket(credential, \
103                                         slice_hrn, rspecs[net_hrn], origin_hrn)
104                         tickets[aggregate] = ticket
105         except:
106             print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
107                            locals()
108             
109     # create a new ticket
110     new_ticket = SfaTicket(subject = slice_hrn)
111     new_ticket.set_gid_caller(api.auth.client_gid)
112     new_ticket.set_issuer(key=api.key, subject=api.hrn)
113    
114     tmp_rspec = RSpec()
115     networks = []
116     valid_data = {
117         'timestamp': int(time.time()),
118         'initscripts': [],
119         'slivers': [] 
120     } 
121     # merge data from aggregate ticket into new ticket 
122     for agg_ticket in tickets.values():
123         # get data from this ticket
124         agg_ticket = SfaTicket(string=agg_ticket)
125         attributes = agg_ticket.get_attributes()
126         if attributes.get('initscripts', []) != None:
127             valid_data['initscripts'].extend(attributes.get('initscripts', []))
128         if attributes.get('slivers', []) != None:
129             valid_data['slivers'].extend(attributes.get('slivers', []))
130  
131         # set the object gid
132         object_gid = agg_ticket.get_gid_object()
133         new_ticket.set_gid_object(object_gid)
134         new_ticket.set_pubkey(object_gid.get_pubkey())
135
136         # build the rspec
137         tmp_rspec.parseString(agg_ticket.get_rspec())
138         networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
139     
140     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
141     new_ticket.set_attributes(valid_data)
142     resources = {'networks': networks, 'start_time': 0, 'duration': 0}
143     resourceDict = {'RSpec': resources}
144     tmp_rspec.parseDict(resourceDict)
145     new_ticket.set_rspec(tmp_rspec.toxml())
146     new_ticket.encode()
147     new_ticket.sign()          
148     return new_ticket.save_to_string(save_parents=True)
149
150 def start_slice(api, xrn):
151     credential = api.getCredential()
152     threads = ThreadManager()
153     for aggregate in api.aggregates:
154         server = api.aggregates[aggregate]
155         threads.run(server.stop_slice, credential, xrn)
156     return 1
157  
158 def stop_slice(api, xrn):
159     credential = api.getCredential()
160     threads = ThreadManager()
161     for aggregate in api.aggregates:
162         server = api.aggregates[aggregate]
163         threads.run(server.stop_slice, credential, xrn)
164     return 1
165
166 def reset_slice(api, xrn):
167     # XX not implemented at this interface
168     return 1
169
170 def get_slices(api):
171     # look in cache first
172     if api.cache:
173         slices = api.cache.get('slices')
174         if slices:
175             return slices    
176
177     # fetch from aggregates
178     slices = []
179     credential = api.getCredential()
180     threads = Threadmanager()
181     for aggregate in api.aggregates:
182         server = api.aggregates[aggregate]
183         threads.run(server.get_slices, credential)
184
185     # combime results
186     results = threads.get_results()
187     slices = []
188     for result in results:
189         slices.extend(result)
190     
191     # cache the result
192     if api.cache:
193         api.cache.add('slices', slices)
194
195     return slices
196  
197 def get_rspec(api, xrn=None, origin_hrn=None):
198     # look in cache first 
199     if api.cache and not xrn:
200         rspec =  api.cache.get('nodes')
201         if rspec:
202             return rspec
203
204     hrn, type = urn_to_hrn(xrn)
205     rspec = None
206     cred = api.getCredential()
207     threads = ThreadManager()
208     for aggregate in api.aggregates:
209         if aggregate not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
210             # get the rspec from the aggregate
211             server = api.aggregates[aggregate]
212             threads.run(server.get_resources, cred, xrn, origin_hrn)
213
214     results = threads.get_results()
215     # combine the rspecs into a single rspec 
216     for agg_rspec in results:
217         try:
218             tree = etree.parse(StringIO(agg_rspec))
219         except etree.XMLSyntaxError:
220             message = str(agg_rspec) + ": " + str(sys.exc_info()[1])
221             raise InvalidRSpec(message)
222
223         root = tree.getroot()
224         if root.get("type") in ["SFA"]:
225             if rspec == None:
226                 rspec = root
227             else:
228                 for network in root.iterfind("./network"):
229                     rspec.append(deepcopy(network))
230                 for request in root.iterfind("./request"):
231                     rspec.append(deepcopy(request))
232
233     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
234     # cache the result
235     if api.cache and not xrn:
236         api.cache.add('nodes', rspec)
237  
238     return rspec
239
240 """
241 Returns the request context required by sfatables. At some point, this
242 mechanism should be changed to refer to "contexts", which is the
243 information that sfatables is requesting. But for now, we just return
244 the basic information needed in a dict.
245 """
246 def fetch_context(slice_hrn, user_hrn, contexts):
247     #slice_hrn = urn_to_hrn(slice_xrn)[0]
248     #user_hrn = urn_to_hrn(user_xrn)[0]
249     base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
250     return base_context
251
252 def main():
253     r = RSpec()
254     r.parseFile(sys.argv[1])
255     rspec = r.toDict()
256     create_slice(None,'plc.princeton.tmacktestslice',rspec)
257
258 if __name__ == "__main__":
259     main()
260