6 from geni.util.geniserver import *
7 from geni.util.geniclient import *
8 from geni.util.cert import *
9 from geni.util.trustedroot import *
10 from geni.util.excep import *
11 from geni.util.misc import *
12 from geni.util.config import Config
13 from geni.util.rspec import Rspec
14 from geni.util.specdict import *
15 from geni.util.storage import SimpleStorage
17 class SliceMgr(GeniServer):
34 # Create a new slice manager object.
36 # @param ip the ip address to listen on
37 # @param port the port to listen on
38 # @param key_file private key filename of registry
39 # @param cert_file certificate filename containing public key (could be a GID file)
41 def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"):
42 GeniServer.__init__(ip, port, key_file, cert_file)
43 self.key_file = key_file
44 self.cert_file = cert_file
45 self.conf = Config(config)
46 basedir = self.conf.GENI_BASE_DIR + os.sep
47 server_basedir = basedir + os.sep + "geni" + os.sep
48 self.hrn = conf.GENI_INTERFACE_HRN
50 # Get list of aggregates this sm talks to
51 # XX do we use simplestorage to maintain this file manually?
52 aggregates_file = server_basedir + os.sep + 'aggregates'
53 self.aggregates = SimpleStorage(aggregates_file)
54 self.connect_aggregates(aggregates_file)
56 nodes_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.components'])
57 self.nodes = SimpleStorage(nodes_file)
60 slices_file = os.sep.join([server_basedir, 'slicemgr' + self.hrn + '.slices'])
61 self.slices = SimpleStorage(slices_file)
64 policy_file = os.sep.join([server_basedir, 'smgr.policy'])
65 self.policy = SimpleStorage(policy_file)
68 timestamp_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.timestamp'])
69 self.timestamp = SimpleStorage(timestamp_file)
72 self.connectAggregates()
73 self.connectRegistry()
75 def connect_aggregates(self, aggregates_file):
77 Get info about the aggregates available to us from file and create
78 an xmlrpc connection to each. If any info is invalid, skip it.
82 f = open(aggregates_file, 'r')
89 if line.strip.startswith("#"):
91 agg_info = line.split("\t").split(" ")
94 if len(agg_info) != 3:
97 # create xmlrpc connection using GeniClient
98 hrn, address, port = agg_info[0], agg_info[1], agg_info[2]
99 url = 'https://%(address)s:%(port)s' % locals()
100 self.aggregates[hrn] = GeniClient(url, self.key_file, self.cert_file)
103 def item_hrns(self, items):
105 Take a list of items (components or slices) and return a dictionary where
106 the key is the authoritative hrn and the value is a list of items at that
110 agg_hrns = self.aggregates.keys()
111 for agg_hrn in agg_hrns:
112 item_hrns[agg_hrn] = []
114 for agg_hrn in agg_hrns:
115 if item.startswith(agg_hrn):
116 item_hrns[agg_hrn] = item
121 def hostname_to_hrn(self, login_base, hostname):
123 Convert hrn to plantelab name.
125 genihostname = "_".join(hostname.split("."))
126 return ".".join([self.hrn, login_base, genihostname])
128 def slicename_to_hrn(self, slicename):
130 Convert hrn to planetlab name.
132 slicename = slicename.replace("_", ".")
133 return ".".join([self.hrn, slicename])
135 def refresh_components(self):
137 Update the cached list of nodes.
141 aggregates = self.aggregates.keys()
144 for aggregate in aggregates:
146 # resolve components hostnames
147 nodes = self.aggregates[aggregate].get_components()
148 all_nodes.extend(nodes)
150 # XX print out to some error log
153 for node in all_nodes:
154 if self.polciy['whitelist'] and node not in self.polciy['whitelist']:
156 if self.polciy['blacklist'] and node in self.policy['blacklist']:
159 nodedict[node] = node
161 self.nodes = SimpleStorate(self.nodes.db_filename, nodedict)
164 # update timestamp and threshold
165 self.timestamp['timestamp'] = datetime.datetime.now()
166 delta = datetime.timedelta(hours=self.nodes_tt1)
167 self.threshold = self.timestamp['timestamp'] + delta
168 self.timestamp.write()
171 def load_components(self):
173 Read cached list of nodes and slices.
175 print "loading nodes"
176 # Read component list from cached file
178 self.timestamp.load()
179 time_format = "%Y-%m-%d %H:%M:%S"
180 timestamp = self.timestamp['timestamp']
181 self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
182 delta = datetime.timedelta(hours=self.nodes_ttl)
183 self.threshold = self.timestamp['timestamp'] + delta
185 def load_policy(self):
187 Read the list of blacklisted and whitelisted nodes.
191 def load_slices(self):
193 Read current slice instantiation states.
195 print "loading slices"
199 def get_components(self):
201 Return a list of components managed by this slice manager.
203 # Reload components list
204 now = datetime.datetime.now()
205 #self.load_components()
206 if not self.threshold or not self.timestamp or now > self.threshold:
207 self.refresh_components()
208 elif now < self.threshold and not self.components:
209 self.load_components()
210 return self.nodes.keys()
213 def get_slices(self):
215 Return a list of instnatiated managed by this slice manager.
217 return dict(self.slices)
219 def get_resources(self, slice_hrn):
221 Return the current rspec for the specified slice.
225 if slice_hrn in self.slices.keys():
226 # check if we alreay have this slices state saved
227 rspec = self.slices[slice_hrn]
229 # request this slices state from all known aggregates
231 for hrn in self.aggregates.keys():
232 # XX need to use the right credentials for this call
233 # check if the slice has resources at this hrn
234 tempresources = self.aggregates[hrn].resources(cred, slice_hrn)
236 temprspec.parseString(temprspec)
237 if temprspec.getDictsByTagName('NodeSpec'):
238 # append this rspec to the list of rspecs
239 rspecdicts.append(temprspec.toDict())
241 # merge all these rspecs into one
242 start_time = int(self.timestamp['timestamp'].strftime("%s"))
243 end_time = int(self.duration.strftime("%s"))
244 duration = end_time - start_time
247 networks = [rspecdict['networks'][0] for rspecdict in rspecdicts]
248 resources = {'networks': networks, 'start_time': start_time, 'duration': duration}
249 # convert the plc dict to an rspec dict
250 resourceDict = RspecDict(resources)
251 resourceSpec = Rspec()
252 resourceSpec.parseDict(resourceDict)
253 rspec = resourceSpec.toxml()
254 # save this slices resources
255 self.slices[slice_hrn] = rspec
260 def create_slice(self, slice_hrn, rspec, attributes):
262 Instantiate the specified slice according to whats defined in the rspec.
264 # XX need to gget the correct credentials
267 # save slice state locally
268 # we can assume that spec object has been validated so its safer to
269 # save this instead of the unvalidated rspec the user gave us
270 self.slices[slice_hrn] = spec.toxml()
273 # extract network list from the rspec and create a separate
274 # rspec for each network
275 slicename = self.hrn_to_plcslicename(slice_hrn)
277 spec.parseString(rspec)
278 specDict = spec.toDict()
279 start_time = specDict['start_time']
280 end_time = specDict['end_time']
283 # only attempt to extract information about the aggregates we know about
284 for hrn in self.aggregates.keys():
285 netspec = spec.getDictByTagNameValue('NetSpec', 'hrn')
288 tempdict = {'start_time': star_time, 'end_time': end_time, 'networks': netspec}
289 #convert the plc dict to rpsec dict
290 resourceDict = RspecDict(tempdict)
293 tempspec.parseDict(resourceDict)
294 rspecs[hrn] = tempspec.toxml()
296 # notify the aggregates
297 for hrn in self.rspecs.keys():
298 self.aggregates[hrn].createSlice(cred, rspecs[hrn])
302 def update_slice(self, slice_hrn, rspec, attributes = []):
304 Update the specifed slice
306 self.create_slice(slice_hrn, rspec, attributes)
308 def delete_slice_(self, slice_hrn):
310 Remove this slice from all components it was previouly associated with and
311 free up the resources it was using.
313 # XX need to get the correct credential
316 if self.slices.has_key(slice_hrn):
317 self.slices.pop(slice_hrn)
320 for hrn in self.aggregates.keys():
321 self.aggregates[hrn].deleteSlice(cred, slice_hrn)
325 def start_slice(self, slice_hrn):
327 Stop the slice at plc.
329 # XX need to get the correct credential
332 for hrn in self.aggregates.keys():
333 self.aggregates[hrn].startSlice(cred, slice_hrn)
336 def stop_slice(self, slice_hrn):
338 Stop the slice at plc
340 for hrn in self.aggregates.keys():
341 self.aggregates[hrn].startSlice(cred, slice_hrn)
344 def reset_slice(self, slice_hrn):
348 # XX not yet implemented
351 def get_policy(self):
353 Return the policy of this slice manager.
360 ##############################
361 ## Server methods here for now
362 ##############################
365 return self.get_components()
368 return self.get_slices()
370 def resources(self, cred, hrn):
371 self.decode_authentication(cred, 'info')
372 self.verify_object_belongs_to_me(hrn)
374 return self.get_resources(hrn)
376 def create(self, cred, hrn, rspec):
377 self.decode_authentication(cred, 'embed')
378 self.verify_object_belongs_to_me(hrn, rspec)
379 return self.create(hrn)
381 def delete(self, cred, hrn):
382 self.decode_authentication(cred, 'embed')
383 self.verify_object_belongs_to_me(hrn)
384 return self.delete_slice(hrn)
386 def start(self, cred, hrn):
387 self.decode_authentication(cred, 'control')
388 return self.start(hrn)
390 def stop(self, cred, hrn):
391 self.decode_authentication(cred, 'control')
392 return self.stop(hrn)
394 def reset(self, cred, hrn):
395 self.decode_authentication(cred, 'control')
396 return self.reset(hrn)
398 def policy(self, cred):
399 self.decode_authentication(cred, 'info')
400 return self.get_policy()
402 def register_functions(self):
403 GeniServer.register_functions(self)
405 # Aggregate interface methods
406 self.server.register_function(self.components)
407 self.server.register_function(self.slices)
408 self.server.register_function(self.resources)
409 self.server.register_function(self.create)
410 self.server.register_function(self.delete)
411 self.server.register_function(self.start)
412 self.server.register_function(self.stop)
413 self.server.register_function(self.reset)
414 self.server.register_function(self.policy)