1 ### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
2 ### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
9 from types import StringTypes
10 from sfa.util.misc import *
11 from sfa.util.rspec import *
12 from sfa.util.specdict import *
13 from sfa.util.faults import *
14 from sfa.util.storage import *
15 from sfa.util.record import GeniRecord
16 from sfa.util.policy import Policy
17 from sfa.util.prefixTree import prefixTree
18 from sfa.util.debug import log
19 from sfa.server.aggregate import Aggregates
20 from sfa.server.registry import Registries
24 class Slices(SimpleStorage):
26 rspec_to_slice_tag = {'max_rate':'net_max_rate'}
28 def __init__(self, api, ttl = .5, caller_cred=None):
32 path = self.api.config.SFA_DATA_DIR
33 filename = ".".join([self.api.interface, self.api.hrn, "slices"])
34 filepath = path + os.sep + filename
35 self.slices_file = filepath
36 SimpleStorage.__init__(self, self.slices_file)
37 self.policy = Policy(self.api)
39 self.caller_cred=caller_cred
40 self.aggregates = Aggregates(self.api)
42 def get_slivers(self, hrn, node=None):
44 Get the slivers at each aggregate
47 for aggregate in self.aggregates:
48 slivers += aggregate.get_slivers()
53 Update the cached list of slices
55 # Reload components list
56 now = datetime.datetime.now()
57 if not self.has_key('threshold') or not self.has_key('timestamp') or \
58 now > datetime.datetime.fromtimestamp(time.mktime(time.strptime(self['threshold'], self.api.time_format))):
59 self.refresh_slices_smgr()
61 def refresh_slices_smgr(self):
63 aggregates = Aggregates(self.api)
64 credential = self.api.getCredential()
65 for aggregate in aggregates:
67 # request hash is optional so lets try the call without it
70 slices = aggregates[aggregate].get_slices(credential, request_hash, self.caller_cred)
71 slice_hrns.extend(slices)
74 print >> log, "%s" % (traceback.format_exc())
75 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
77 # try sending the request hash if the previous call failed
79 arg_list = [credential]
80 request_hash = self.api.key.compute_hash(arg_list)
82 slices = aggregates[aggregate].get_slices(credential, request_hash, self.caller_cred)
83 slice_hrns.extend(slices)
86 print >> log, "%s" % (traceback.format_exc())
87 print >> log, "Error calling slices at aggregate %(aggregate)s" % locals()
89 # update timestamp and threshold
90 timestamp = datetime.datetime.now()
91 hr_timestamp = timestamp.strftime(self.api.time_format)
92 delta = datetime.timedelta(hours=self.ttl)
93 threshold = timestamp + delta
94 hr_threshold = threshold.strftime(self.api.time_format)
96 slice_details = {'hrn': slice_hrns,
97 'timestamp': hr_timestamp,
98 'threshold': hr_threshold
100 self.update(slice_details)
104 def delete_slice(self, hrn):
105 self.delete_slice_smgr(hrn)
107 def delete_slice_smgr(self, hrn):
108 credential = self.api.getCredential()
109 caller_cred = self.caller_cred
110 aggregates = Aggregates(self.api)
111 for aggregate in aggregates:
113 # request hash is optional so lets try the call without it
116 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
119 print >> log, "%s" % (traceback.format_exc())
120 print >> log, "Error calling list nodes at aggregate %s" % aggregate
122 # try sending the request hash if the previous call failed
125 arg_list = [credential, hrn]
126 request_hash = self.api.key.compute_hash(arg_list)
127 aggregates[aggregate].delete_slice(credential, hrn, request_hash, caller_cred)
130 print >> log, "%s" % (traceback.format_exc())
131 print >> log, "Error calling list nodes at aggregate %s" % aggregate
133 def create_slice(self, hrn, rspec):
135 # check our slice policy before we procede
136 whitelist = self.policy['slice_whitelist']
137 blacklist = self.policy['slice_blacklist']
139 if whitelist and hrn not in whitelist or \
140 blacklist and hrn in blacklist:
141 policy_file = self.policy.policy_file
142 print >> log, "Slice %(hrn)s not allowed by policy %(policy_file)s" % locals()
145 self.create_slice_smgr(hrn, rspec)
147 def create_slice_smgr(self, hrn, rspec):
150 spec.parseString(rspec)
151 slicename = hrn_to_pl_slicename(hrn)
152 specDict = spec.toDict()
153 if specDict.has_key('RSpec'): specDict = specDict['RSpec']
154 if specDict.has_key('start_time'): start_time = specDict['start_time']
156 if specDict.has_key('end_time'): end_time = specDict['end_time']
160 aggregates = Aggregates(self.api)
161 credential = self.api.getCredential()
163 # split the netspecs into individual rspecs
164 netspecs = spec.getDictsByTagName('NetSpec')
165 for netspec in netspecs:
166 net_hrn = netspec['name']
167 resources = {'start_time': start_time, 'end_time': end_time, 'networks': netspec}
168 resourceDict = {'RSpec': resources}
169 tempspec.parseDict(resourceDict)
170 rspecs[net_hrn] = tempspec.toxml()
172 # send each rspec to the appropriate aggregate/sm
173 caller_cred = self.caller_cred
174 for net_hrn in rspecs:
176 # if we are directly connected to the aggregate then we can just send them the rspec
177 # if not, then we may be connected to an sm thats connected to the aggregate
178 if net_hrn in aggregates:
179 # send the whloe rspec to the local aggregate
180 if net_hrn in [self.api.hrn]:
183 aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
185 arg_list = [credential,hrn,rspec]
186 request_hash = self.api.key.compute_hash(arg_list)
187 aggregates[net_hrn].create_slice(credential, hrn, rspec, request_hash, caller_cred)
191 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
193 arg_list = [credential,hrn,rspecs[net_hrn]]
194 request_hash = self.api.key.compute_hash(arg_list)
195 aggregates[net_hrn].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
197 # lets forward this rspec to a sm that knows about the network
198 arg_list = [credential, net_hrn]
199 request_hash = self.api.compute_hash(arg_list)
200 for aggregate in aggregates:
202 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn)
204 network_found = aggregates[aggregate].get_aggregates(credential, net_hrn, request_hash)
208 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
210 arg_list = [credential, hrn, rspecs[net_hrn]]
211 request_hash = self.api.key.compute_hash(arg_list)
212 aggregates[aggregate].create_slice(credential, hrn, rspecs[net_hrn], request_hash, caller_cred)
215 print >> log, "Error creating slice %(hrn)s at aggregate %(net_hrn)s" % locals()
216 traceback.print_exc()
220 def start_slice(self, hrn):
221 self.start_slice_smgr(hrn)
223 def start_slice_smgr(self, hrn):
224 credential = self.api.getCredential()
225 aggregates = Aggregates(self.api)
226 for aggregate in aggregates:
227 aggregates[aggregate].start_slice(credential, hrn)
231 def stop_slice(self, hrn):
232 self.stop_slice_smgr(hrn)
234 def stop_slice_smgr(self, hrn):
235 credential = self.api.getCredential()
236 aggregates = Aggregates(self.api)
237 arg_list = [credential, hrn]
238 request_hash = self.api.key.compute_hash(arg_list)
239 for aggregate in aggregates:
241 aggregates[aggregate].stop_slice(credential, hrn)
243 aggregates[aggregate].stop_slice(credential, hrn, request_hash)