Using sfa.util.cache to cache result of get_resources
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.debug import log
22 import sfa.plc.peers as peers
23
24 def delete_slice(api, xrn, origin_hrn=None):
25     credential = api.getCredential()
26     aggregates = api.aggregates
27     for aggregate in aggregates:
28         success = False
29         # request hash is optional so lets try the call without it
30         try:
31             aggregates[aggregate].delete_slice(credential, xrn, origin_hrn)
32             success = True
33         except:
34             print >> log, "%s" % (traceback.format_exc())
35             print >> log, "Error calling delete slice at aggregate %s" % aggregate
36     return 1
37
38 def create_slice(api, xrn, rspec, origin_hrn=None):
39     hrn, type = urn_to_hrn(xrn)
40
41     # Validate the RSpec against PlanetLab's schema --disabled for now
42     # The schema used here needs to aggregate the PL and VINI schemas
43     # schema = "/var/www/html/schemas/pl.rng"
44     schema = None
45     if schema:
46         try:
47             tree = etree.parse(StringIO(rspec))
48         except etree.XMLSyntaxError:
49             message = str(sys.exc_info()[1])
50             raise InvalidRSpec(message)
51
52         relaxng_doc = etree.parse(schema)
53         relaxng = etree.RelaxNG(relaxng_doc)
54         
55         if not relaxng(tree):
56             error = relaxng.error_log.last_error
57             message = "%s (line %s)" % (error.message, error.line)
58             raise InvalidRSpec(message)
59
60     aggs = api.aggregates
61     cred = api.getCredential()                                                 
62     for agg in aggs:
63         if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
64             try:
65                 # Just send entire RSpec to each aggregate
66                 aggs[agg].create_slice(cred, xrn, rspec, origin_hrn)
67             except:
68                 print >> log, "Error creating slice %s at %s" % (hrn, agg)
69                 traceback.print_exc()
70
71     return True
72
73 def get_ticket(api, xrn, rspec, origin_hrn=None):
74     slice_hrn, type = urn_to_hrn(xrn)
75     # get the netspecs contained within the clients rspec
76     client_rspec = RSpec(xml=rspec)
77     netspecs = client_rspec.getDictsByTagName('NetSpec')
78     
79     # create an rspec for each individual rspec 
80     rspecs = {}
81     temp_rspec = RSpec()
82     for netspec in netspecs:
83         net_hrn = netspec['name']
84         resources = {'start_time': 0, 'end_time': 0 , 
85                      'network': {'NetSpec' : netspec}}
86         resourceDict = {'RSpec': resources}
87         temp_rspec.parseDict(resourceDict)
88         rspecs[net_hrn] = temp_rspec.toxml() 
89     
90     # send the rspec to the appropiate aggregate/sm
91     aggregates = api.aggregates
92     credential = api.getCredential()
93     tickets = {}
94     for net_hrn in rspecs:
95         net_urn = urn_to_hrn(net_hrn)     
96         try:
97             # if we are directly connected to the aggregate then we can just
98             # send them the request. if not, then we may be connected to an sm
99             # thats connected to the aggregate
100             if net_hrn in aggregates:
101                 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
102                             rspecs[net_hrn], origin_hrn)
103                 tickets[net_hrn] = ticket
104             else:
105                 # lets forward this rspec to a sm that knows about the network
106                 for agg in aggregates:
107                     network_found = aggregates[agg].get_aggregates(credential, net_urn)
108                     if network_found:
109                         ticket = aggregates[aggregate].get_ticket(credential, \
110                                         slice_hrn, rspecs[net_hrn], origin_hrn)
111                         tickets[aggregate] = ticket
112         except:
113             print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
114                            locals()
115             
116     # create a new ticket
117     new_ticket = SfaTicket(subject = slice_hrn)
118     new_ticket.set_gid_caller(api.auth.client_gid)
119     new_ticket.set_issuer(key=api.key, subject=api.hrn)
120    
121     tmp_rspec = RSpec()
122     networks = []
123     valid_data = {
124         'timestamp': int(time.time()),
125         'initscripts': [],
126         'slivers': [] 
127     } 
128     # merge data from aggregate ticket into new ticket 
129     for agg_ticket in tickets.values():
130         # get data from this ticket
131         agg_ticket = SfaTicket(string=agg_ticket)
132         attributes = agg_ticket.get_attributes()
133         if attributes.get('initscripts', []) != None:
134             valid_data['initscripts'].extend(attributes.get('initscripts', []))
135         if attributes.get('slivers', []) != None:
136             valid_data['slivers'].extend(attributes.get('slivers', []))
137  
138         # set the object gid
139         object_gid = agg_ticket.get_gid_object()
140         new_ticket.set_gid_object(object_gid)
141         new_ticket.set_pubkey(object_gid.get_pubkey())
142
143         # build the rspec
144         tmp_rspec.parseString(agg_ticket.get_rspec())
145         networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
146     
147     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
148     new_ticket.set_attributes(valid_data)
149     resources = {'networks': networks, 'start_time': 0, 'duration': 0}
150     resourceDict = {'RSpec': resources}
151     tmp_rspec.parseDict(resourceDict)
152     new_ticket.set_rspec(tmp_rspec.toxml())
153     new_ticket.encode()
154     new_ticket.sign()          
155     return new_ticket.save_to_string(save_parents=True)
156
157 def start_slice(api, xrn):
158     hrn, type = urn_to_hrn(xrn)
159     slicename = hrn_to_pl_slicename(hrn)
160     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
161     if not slices:
162         raise RecordNotFound(hrn)
163     slice_id = slices[0]
164     attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
165     attribute_id = attreibutes[0]['slice_attribute_id']
166     api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" )
167
168     return 1
169  
170 def stop_slice(api, xrn):
171     hrn, type = urn_to_hrn(xrn)
172     slicename = hrn_to_pl_slicename(hrn)
173     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
174     if not slices:
175         raise RecordNotFound(hrn)
176     slice_id = slices[0]['slice_id']
177     attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
178     attribute_id = attributes[0]['slice_attribute_id']
179     api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0")
180     return 1
181
182 def reset_slice(api, xrn):
183     # XX not implemented at this interface
184     return 1
185
186 def get_slices(api):
187     # look in cache first
188     if api.cache:
189         slices = api.cache.get('slices')
190         if slices:
191             return slices    
192
193     # fetch from aggregates
194     slices = []
195     credential = api.getCredential()
196     for aggregate in api.aggregates:
197         try:
198             tmp_slices = api.aggregates[aggregate].get_slices(credential)
199             slices.extend(tmp_slices)
200         except:
201             print >> log, "%s" % (traceback.format_exc())
202             print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
203
204     # cache the result
205     if api.cache:
206         api.cache.add('slices', slices)
207
208     return slices
209  
210 def get_rspec(api, xrn=None, origin_hrn=None):
211     # look in cache first 
212     if api.cache and not xrn:
213         rspec =  api.cache.get('nodes')
214         if rspec:
215             return rspec
216
217     hrn, type = urn_to_hrn(xrn)
218     rspec = None
219     aggs = api.aggregates
220     cred = api.getCredential()                                                 
221     for agg in aggs:
222         if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
223             try:
224                 # get the rspec from the aggregate
225                 agg_rspec = aggs[agg].get_resources(cred, xrn, origin_hrn)
226             except:
227                 # XX print out to some error log
228                 print >> log, "Error getting resources at aggregate %s" % agg
229                 traceback.print_exc(log)
230                 print >> log, "%s" % (traceback.format_exc())
231                 continue
232                 
233             try:
234                 tree = etree.parse(StringIO(agg_rspec))
235             except etree.XMLSyntaxError:
236                 message = agg + ": " + str(sys.exc_info()[1])
237                 raise InvalidRSpec(message)
238
239             root = tree.getroot()
240             if root.get("type") in ["SFA"]:
241                 if rspec == None:
242                     rspec = root
243                 else:
244                     for network in root.iterfind("./network"):
245                         rspec.append(deepcopy(network))
246                     for request in root.iterfind("./request"):
247                         rspec.append(deepcopy(request))
248
249     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
250     if api.cache and not xrn:
251         api.cache.add('nodes', rspec)
252  
253     return rspec
254
255 """
256 Returns the request context required by sfatables. At some point, this
257 mechanism should be changed to refer to "contexts", which is the
258 information that sfatables is requesting. But for now, we just return
259 the basic information needed in a dict.
260 """
261 def fetch_context(slice_hrn, user_hrn, contexts):
262     #slice_hrn = urn_to_hrn(slice_xrn)[0]
263     #user_hrn = urn_to_hrn(user_xrn)[0]
264     base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
265     return base_context
266
267 def main():
268     r = RSpec()
269     r.parseFile(sys.argv[1])
270     rspec = r.toDict()
271     create_slice(None,'plc.princeton.tmacktestslice',rspec)
272
273 if __name__ == "__main__":
274     main()
275