initial checkin of managers plugin directory
[sfa.git] / sfa / managers / slicemanager / plc.py
1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
3
4 import datetime
5 import time
6 import traceback
7 import sys
8
9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
21
22 MAXINT =  2L**31-1
23
24 class Slices(SimpleStorage):
25
26     rspec_to_slice_tag = {'max_rate':'net_max_rate'}
27
28     def __init__(self, api, ttl = .5, caller_cred=None):
29         self.api = api
30         self.ttl = ttl
31         self.threshold = None
32         path = self.api.config.SFA_DATA_DIR
33         filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34         filepath = path + os.sep + filename
35         self.slices_file = filepath
36         SimpleStorage.__init__(self, self.slices_file)
37         self.policy = Policy(self.api)    
38         self.load()
39         self.caller_cred=caller_cred
40         self.aggregates = Aggregates(self.api)
41         
42     def get_slivers(self, hrn, node=None):
43         """
44         Get the slivers at each aggregate
45         """
46         slivers = []
47         for aggregate in self.aggregates:
48             slivers += aggregate.get_slivers()
49         return slivers
50  
51     def refresh(self):
52         """
53         Update the cached list of slices
54         """
55         # Reload components list
56         now = datetime.datetime.now()
57         if not self.has_key('threshold') or not self.has_key('timestamp') or \
58            now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
59             self.refresh_slices_smgr()
60
61     def refresh_slices_smgr(self):
62         slice_hrns = []
63         aggregates = Aggregates(self.api)
64         credential = self.api.getCredential()
65         for aggregate in aggregates:
66             success = False
67             # request hash is optional so lets try the call without it 
68             try:
69                 request_hash=None
70                 slices = aggregates[aggregate].get_slices(credential, request_hash, self.caller_cred)
71                 slice_hrns.extend(slices)
72                 success = True
73             except:
74                 print >> log, "%s" % (traceback.format_exc())
75                 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
76
77             # try sending the request hash if the previous call failed 
78             if not success:
79                 arg_list = [credential]
80                 request_hash = self.api.key.compute_hash(arg_list)
81                 try:
82                     slices = aggregates[aggregate].get_slices(credential, request_hash, self.caller_cred)
83                     slice_hrns.extend(slices)
84                     success = True
85                 except:
86                     print >> log, "%s" % (traceback.format_exc())
87                     print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
88
89         # update timestamp and threshold
90         timestamp = datetime.datetime.now()
91         hr_timestamp = timestamp.strftime(self.api.time_format)
92         delta = datetime.timedelta(hours=self.ttl)
93         threshold = timestamp + delta
94         hr_threshold = threshold.strftime(self.api.time_format)
95
96         slice_details = {'hrn': slice_hrns,
97                          'timestamp': hr_timestamp,
98                          'threshold': hr_threshold
99                         }
100         self.update(slice_details)
101         self.write()
102
103
104     def delete_slice(self, hrn):
105         self.delete_slice_smgr(hrn)
106         
107     def delete_slice_smgr(self, hrn):
108         credential = self.api.getCredential()
109         caller_cred = self.caller_cred
110         aggregates = Aggregates(self.api)
111         for aggregate in aggregates:
112             success = False
113             # request hash is optional so lets try the call without it
114             try:
115                 request_hash=None       
116                 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
117                 success = True
118             except:
119                 print >> log, "%s" % (traceback.format_exc())
120                 print >> log, "Error calling list nodes at aggregate %s" % aggregate
121             
122             # try sending the request hash if the previous call failed 
123             if not success:
124                 try:
125                     arg_list = [credential, hrn]
126                     request_hash = self.api.key.compute_hash(arg_list)
127                     aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
128                     success = True
129                 except:
130                     print >> log, "%s" % (traceback.format_exc())
131                     print >> log, "Error calling list nodes at aggregate %s" % aggregate
132                         
133     def create_slice(self, hrn, rspec):
134         
135         # check our slice policy before we procede
136         whitelist = self.policy['slice_whitelist']     
137         blacklist = self.policy['slice_blacklist']
138        
139         if whitelist and hrn not in whitelist or \
140            blacklist and hrn in blacklist:
141             policy_file = self.policy.policy_file
142             print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
143             return 1
144
145         self.create_slice_smgr(hrn, rspec)
146
147     def create_slice_smgr(self, hrn, rspec):
148         spec = RSpec()
149         tempspec = RSpec()
150         spec.parseString(rspec)
151         slicename = hrn_to_pl_slicename(hrn)
152         specDict = spec.toDict()
153         if specDict.has_key('RSpec'): specDict = specDict['RSpec']
154         if specDict.has_key('start_time'): start_time = specDict['start_time']
155         else: start_time = 0
156         if specDict.has_key('end_time'): end_time = specDict['end_time']
157         else: end_time = 0
158
159         rspecs = {}
160         aggregates = Aggregates(self.api)
161         credential = self.api.getCredential()
162
163         # split the netspecs into individual rspecs
164         netspecs = spec.getDictsByTagName('NetSpec')
165         for netspec in netspecs:
166             net_hrn = netspec['name']
167             resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
168             resourceDict = {'RSpec': resources}
169             tempspec.parseDict(resourceDict)
170             rspecs[net_hrn] = tempspec.toxml()
171
172         # send each rspec to the appropriate aggregate/sm
173         caller_cred = self.caller_cred 
174         for net_hrn in rspecs:
175             try:
176                 # if we are directly connected to the aggregate then we can just send them the rspec
177                 # if not, then we may be connected to an sm thats connected to the aggregate
178                 if net_hrn in aggregates:
179                     # send the whloe rspec to the local aggregate
180                     if net_hrn in [self.api.hrn]:
181                         try:
182                             request_hash = None
183                             aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
184                         except:
185                             arg_list = [credential,hrn,rspec]
186                             request_hash = self.api.key.compute_hash(arg_list)
187                             aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
188                     else:
189                         try:
190                             request_hash = None
191                             aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
192                         except:
193                             arg_list = [credential,hrn,rspecs[net_hrn]]
194                             request_hash = self.api.key.compute_hash(arg_list)
195                             aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
196                 else:
197                     # lets forward this rspec to a sm that knows about the network
198                     arg_list = [credential, net_hrn]
199                     request_hash = self.api.compute_hash(arg_list)    
200                     for aggregate in aggregates:
201                         try:
202                             network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
203                         except:
204                             network_found = aggregates[aggregate].get_aggregates(credential, net_hrn, request_hash)
205                         if network_networks:
206                             try:
207                                 request_hash = None
208                                 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
209                             except:
210                                 arg_list = [credential, hrn, rspecs[net_hrn]]
211                                 request_hash = self.api.key.compute_hash(arg_list) 
212                                 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
213                      
214             except:
215                 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
216                 traceback.print_exc()
217         return 1
218
219
220     def start_slice(self, hrn):
221         self.start_slice_smgr(hrn)
222
223     def start_slice_smgr(self, hrn):
224         credential = self.api.getCredential()
225         aggregates = Aggregates(self.api)
226         for aggregate in aggregates:
227             aggregates[aggregate].start_slice(credential, hrn)
228         return 1
229
230
231     def stop_slice(self, hrn):
232         self.stop_slice_smgr(hrn)
233
234     def stop_slice_smgr(self, hrn):
235         credential = self.api.getCredential()
236         aggregates = Aggregates(self.api)
237         arg_list = [credential, hrn]
238         request_hash = self.api.key.compute_hash(arg_list)
239         for aggregate in aggregates:
240             try:
241                 aggregates[aggregate].stop_slice(credential, hrn)
242             except:  
243                 aggregates[aggregate].stop_slice(credential, hrn, request_hash)
244