merged in from trunk -r 17776:17849
[sfa.git] / sfa / managers / slice_manager_pl.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8 from copy import deepcopy
9 from lxml import etree
10 from StringIO import StringIO
11 from types import StringTypes
12
13 from sfa.util.namespace import *
14 from sfa.util.rspec import *
15 from sfa.util.specdict import *
16 from sfa.util.faults import *
17 from sfa.util.record import SfaRecord
18 from sfa.util.policy import Policy
19 from sfa.util.prefixTree import prefixTree
20 from sfa.util.sfaticket import *
21 from sfa.util.debug import log
22 from sfa.util.sfalogging import logger
23 import sfa.plc.peers as peers
24
25
26 def delete_slice(api, xrn, origin_hrn=None):
27     credential = api.getCredential()
28     aggregates = api.aggregates
29     for aggregate in aggregates:
30         success = False
31         # request hash is optional so lets try the call without it
32         try:
33             aggregates[aggregate].delete_slice(credential, xrn, origin_hrn)
34             success = True
35         except:
36             print >> log, "%s" % (traceback.format_exc())
37             print >> log, "Error calling delete slice at aggregate %s" % aggregate
38     return 1
39
40 def create_slice(api, xrn, rspec, origin_hrn=None):
41     hrn, type = urn_to_hrn(xrn)
42
43     # Validate the RSpec against PlanetLab's schema --disabled for now
44     # The schema used here needs to aggregate the PL and VINI schemas
45     # schema = "/var/www/html/schemas/pl.rng"
46     schema = None
47     if schema:
48         try:
49             tree = etree.parse(StringIO(rspec))
50         except etree.XMLSyntaxError:
51             message = str(sys.exc_info()[1])
52             raise InvalidRSpec(message)
53
54         relaxng_doc = etree.parse(schema)
55         relaxng = etree.RelaxNG(relaxng_doc)
56         
57         if not relaxng(tree):
58             error = relaxng.error_log.last_error
59             message = "%s (line %s)" % (error.message, error.line)
60             raise InvalidRSpec(message)
61
62     aggs = api.aggregates
63     cred = api.getCredential()                                                 
64     for agg in aggs:
65         if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
66             try:
67                 # Just send entire RSpec to each aggregate
68                 aggs[agg].create_slice(cred, xrn, rspec, origin_hrn)
69             except:
70                 print >> log, "Error creating slice %s at %s" % (hrn, agg)
71                 traceback.print_exc()
72
73     return True
74
75 def get_ticket(api, xrn, rspec, origin_hrn=None):
76     slice_hrn, type = urn_to_hrn(xrn)
77     # get the netspecs contained within the clients rspec
78     client_rspec = RSpec(xml=rspec)
79     netspecs = client_rspec.getDictsByTagName('NetSpec')
80     
81     # create an rspec for each individual rspec 
82     rspecs = {}
83     temp_rspec = RSpec()
84     for netspec in netspecs:
85         net_hrn = netspec['name']
86         resources = {'start_time': 0, 'end_time': 0 , 
87                      'network': {'NetSpec' : netspec}}
88         resourceDict = {'RSpec': resources}
89         temp_rspec.parseDict(resourceDict)
90         rspecs[net_hrn] = temp_rspec.toxml() 
91     
92     # send the rspec to the appropiate aggregate/sm
93     aggregates = api.aggregates
94     credential = api.getCredential()
95     tickets = {}
96     for net_hrn in rspecs:
97         net_urn = urn_to_hrn(net_hrn)     
98         try:
99             # if we are directly connected to the aggregate then we can just
100             # send them the request. if not, then we may be connected to an sm
101             # thats connected to the aggregate
102             if net_hrn in aggregates:
103                 ticket = aggregates[net_hrn].get_ticket(credential, xrn, \
104                             rspecs[net_hrn], origin_hrn)
105                 tickets[net_hrn] = ticket
106             else:
107                 # lets forward this rspec to a sm that knows about the network
108                 for agg in aggregates:
109                     network_found = aggregates[agg].get_aggregates(credential, net_urn)
110                     if network_found:
111                         ticket = aggregates[aggregate].get_ticket(credential, \
112                                         slice_hrn, rspecs[net_hrn], origin_hrn)
113                         tickets[aggregate] = ticket
114         except:
115             print >> log, "Error getting ticket for %(slice_hrn)s at aggregate %(net_hrn)s" % \
116                            locals()
117             
118     # create a new ticket
119     new_ticket = SfaTicket(subject = slice_hrn)
120     new_ticket.set_gid_caller(api.auth.client_gid)
121     new_ticket.set_issuer(key=api.key, subject=api.hrn)
122    
123     tmp_rspec = RSpec()
124     networks = []
125     valid_data = {
126         'timestamp': int(time.time()),
127         'initscripts': [],
128         'slivers': [] 
129     } 
130     # merge data from aggregate ticket into new ticket 
131     for agg_ticket in tickets.values():
132         # get data from this ticket
133         agg_ticket = SfaTicket(string=agg_ticket)
134         attributes = agg_ticket.get_attributes()
135         if attributes.get('initscripts', []) != None:
136             valid_data['initscripts'].extend(attributes.get('initscripts', []))
137         if attributes.get('slivers', []) != None:
138             valid_data['slivers'].extend(attributes.get('slivers', []))
139  
140         # set the object gid
141         object_gid = agg_ticket.get_gid_object()
142         new_ticket.set_gid_object(object_gid)
143         new_ticket.set_pubkey(object_gid.get_pubkey())
144
145         # build the rspec
146         tmp_rspec.parseString(agg_ticket.get_rspec())
147         networks.extend([{'NetSpec': tmp_rspec.getDictsByTagName('NetSpec')}])
148     
149     #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn))
150     new_ticket.set_attributes(valid_data)
151     resources = {'networks': networks, 'start_time': 0, 'duration': 0}
152     resourceDict = {'RSpec': resources}
153     tmp_rspec.parseDict(resourceDict)
154     new_ticket.set_rspec(tmp_rspec.toxml())
155     new_ticket.encode()
156     new_ticket.sign()          
157     return new_ticket.save_to_string(save_parents=True)
158
159 def start_slice(api, xrn):
160     hrn, type = urn_to_hrn(xrn)
161     slicename = hrn_to_pl_slicename(hrn)
162     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
163     if not slices:
164         raise RecordNotFound(hrn)
165     slice_id = slices[0]
166     attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
167     attribute_id = attreibutes[0]['slice_attribute_id']
168     api.plshell.UpdateSliceTag(api.plauth, attribute_id, "1" )
169
170     return 1
171  
172 def stop_slice(api, xrn):
173     hrn, type = urn_to_hrn(xrn)
174     slicename = hrn_to_pl_slicename(hrn)
175     slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
176     if not slices:
177         raise RecordNotFound(hrn)
178     slice_id = slices[0]['slice_id']
179     attributes = api.plshell.GetSliceTags(api.plauth, {'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
180     attribute_id = attributes[0]['slice_attribute_id']
181     api.plshell.UpdateSliceTag(api.plauth, attribute_id, "0")
182     return 1
183
184 def reset_slice(api, xrn):
185     # XX not implemented at this interface
186     return 1
187
188 def get_slices(api):
189     # look in cache first
190     if api.cache:
191         slices = api.cache.get('slices')
192         if slices:
193             return slices    
194
195     # fetch from aggregates
196     slices = []
197     credential = api.getCredential()
198     for aggregate in api.aggregates:
199         try:
200             tmp_slices = api.aggregates[aggregate].get_slices(credential)
201             slices.extend(tmp_slices)
202         except:
203             print >> log, "%s" % (traceback.format_exc())
204             print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
205
206     # cache the result
207     if api.cache:
208         api.cache.add('slices', slices)
209
210     return slices
211  
212 def get_rspec(api, xrn=None, origin_hrn=None):
213     # look in cache first 
214     if api.cache and not xrn:
215         rspec =  api.cache.get('nodes')
216         if rspec:
217             return rspec
218
219     hrn, type = urn_to_hrn(xrn)
220     rspec = None
221     aggs = api.aggregates
222     cred = api.getCredential()                                                 
223
224     print >> log, "Aggregates = %s" % aggs
225     for agg in aggs:
226         if agg not in [api.auth.client_cred.get_gid_caller().get_hrn()]:      
227             try:
228                 # get the rspec from the aggregate
229                 agg_rspec = aggs[agg].get_resources(cred, xrn, origin_hrn)
230             except:
231                 
232                 # XX print out to some error log
233                 print >> log, "Error getting resources at aggregate %s" % agg
234                 traceback.print_exc(log)
235                 print >> log, "%s" % (traceback.format_exc())
236                 continue
237                 
238             try:
239                 tree = etree.parse(StringIO(agg_rspec))
240             except etree.XMLSyntaxError:
241                 message = agg + ": " + str(sys.exc_info()[1])
242                 raise InvalidRSpec(message)
243
244             root = tree.getroot()
245             if root.get("type") in ["SFA"]:
246                 if rspec == None:
247                     rspec = root
248                 else:
249                     for network in root.iterfind("./network"):
250                         rspec.append(deepcopy(network))
251                     for request in root.iterfind("./request"):
252                         rspec.append(deepcopy(request))
253
254     rspec =  etree.tostring(rspec, xml_declaration=True, pretty_print=True)
255     if api.cache and not xrn:
256         api.cache.add('nodes', rspec)
257  
258     return rspec
259
260 """
261 Returns the request context required by sfatables. At some point, this
262 mechanism should be changed to refer to "contexts", which is the
263 information that sfatables is requesting. But for now, we just return
264 the basic information needed in a dict.
265 """
266 def fetch_context(slice_hrn, user_hrn, contexts):
267     #slice_hrn = urn_to_hrn(slice_xrn)[0]
268     #user_hrn = urn_to_hrn(user_xrn)[0]
269     base_context = {'sfa':{'user':{'hrn':user_hrn}, 'slice':{'hrn':slice_hrn}}}
270     return base_context
271
272 def main():
273     r = RSpec()
274     r.parseFile(sys.argv[1])
275     rspec = r.toDict()
276     create_slice(None,'plc.princeton.tmacktestslice',rspec)
277
278 if __name__ == "__main__":
279     main()
280