6 from geni.util.geniserver import *
7 from geni.util.geniclient import *
8 from geni.util.cert import *
9 from geni.util.credential import Credential
10 from geni.util.trustedroot import *
11 from geni.util.excep import *
12 from geni.util.misc import *
13 from geni.util.config import Config
14 from geni.util.rspec import Rspec
15 from geni.util.specdict import *
16 from geni.util.storage import SimpleStorage
18 class SliceMgr(GeniServer):
35 # Create a new slice manager object.
37 # @param ip the ip address to listen on
38 # @param port the port to listen on
39 # @param key_file private key filename of registry
40 # @param cert_file certificate filename containing public key (could be a GID file)
42 def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/geni/util/geni_config"):
43 GeniServer.__init__(ip, port, key_file, cert_file)
44 self.key_file = key_file
45 self.cert_file = cert_file
46 self.conf = Config(config)
47 basedir = self.conf.GENI_BASE_DIR + os.sep
48 server_basedir = basedir + os.sep + "geni" + os.sep
49 self.hrn = conf.GENI_INTERFACE_HRN
51 # Get list of aggregates this sm talks to
52 # XX do we use simplestorage to maintain this file manually?
53 aggregates_file = server_basedir + os.sep + 'aggregates'
54 self.aggregates = SimpleStorage(aggregates_file)
55 self.connect_aggregates(aggregates_file)
57 nodes_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.components'])
58 self.nodes = SimpleStorage(nodes_file)
61 slices_file = os.sep.join([server_basedir, 'slicemgr' + self.hrn + '.slices'])
62 self.slices = SimpleStorage(slices_file)
65 policy_file = os.sep.join([server_basedir, 'smgr.policy'])
66 self.policy = SimpleStorage(policy_file)
69 timestamp_file = os.sep.join([server_basedir, 'smgr.' + self.hrn + '.timestamp'])
70 self.timestamp = SimpleStorage(timestamp_file)
73 self.connectAggregates()
74 self.connectRegistry()
78 def loadCredential(self):
80 Attempt to load credential from file if it exists. If it doesnt get
81 credential from registry.
84 self_cred_filename = self.server_basedir + os.sep + "smgr." + self.hrn + ".cred"
85 ma_cred_filename = self.server_basedir + os.sep + "smgr." + self.hrn + ".sa.cred"
87 # see if this file exists
89 cred = Credential(filename = ma_cred_filename)
90 self.credential = cred.save_to_string()
93 self_cred = self.registry.get_credential(None, 'ma', self.hrn)
94 self_credential = Credential(string = self_cred)
95 self_credential.save_to_file(self_cred_filename)
98 ma_cred = self.registry.get_gredential(self_cred)
99 ma_credential = Credential(string = ma_cred)
100 ma_credential.save_to_file(ma_cred_filename)
101 self.credential = ma_cred
103 def connect_aggregates(self, aggregates_file):
105 Get info about the aggregates available to us from file and create
106 an xmlrpc connection to each. If any info is invalid, skip it.
110 f = open(aggregates_file, 'r')
111 lines = f.readlines()
117 if line.strip.startswith("#"):
119 agg_info = line.split("\t").split(" ")
122 if len(agg_info) != 3:
125 # create xmlrpc connection using GeniClient
126 hrn, address, port = agg_info[0], agg_info[1], agg_info[2]
127 url = 'https://%(address)s:%(port)s' % locals()
128 self.aggregates[hrn] = GeniClient(url, self.key_file, self.cert_file)
131 def item_hrns(self, items):
133 Take a list of items (components or slices) and return a dictionary where
134 the key is the authoritative hrn and the value is a list of items at that
138 agg_hrns = self.aggregates.keys()
139 for agg_hrn in agg_hrns:
140 item_hrns[agg_hrn] = []
142 for agg_hrn in agg_hrns:
143 if item.startswith(agg_hrn):
144 item_hrns[agg_hrn] = item
149 def hostname_to_hrn(self, login_base, hostname):
151 Convert hrn to plantelab name.
153 genihostname = "_".join(hostname.split("."))
154 return ".".join([self.hrn, login_base, genihostname])
156 def slicename_to_hrn(self, slicename):
158 Convert hrn to planetlab name.
160 slicename = slicename.replace("_", ".")
161 return ".".join([self.hrn, slicename])
163 def refresh_components(self):
165 Update the cached list of nodes.
169 aggregates = self.aggregates.keys()
172 for aggregate in aggregates:
174 # resolve components hostnames
175 nodes = self.aggregates[aggregate].get_components()
176 all_nodes.extend(nodes)
178 # XX print out to some error log
181 for node in all_nodes:
182 if self.polciy['whitelist'] and node not in self.polciy['whitelist']:
184 if self.polciy['blacklist'] and node in self.policy['blacklist']:
187 nodedict[node] = node
189 self.nodes = SimpleStorate(self.nodes.db_filename, nodedict)
192 # update timestamp and threshold
193 self.timestamp['timestamp'] = datetime.datetime.now()
194 delta = datetime.timedelta(hours=self.nodes_tt1)
195 self.threshold = self.timestamp['timestamp'] + delta
196 self.timestamp.write()
199 def load_components(self):
201 Read cached list of nodes and slices.
203 print "loading nodes"
204 # Read component list from cached file
206 self.timestamp.load()
207 time_format = "%Y-%m-%d %H:%M:%S"
208 timestamp = self.timestamp['timestamp']
209 self.timestamp['timestamp'] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
210 delta = datetime.timedelta(hours=self.nodes_ttl)
211 self.threshold = self.timestamp['timestamp'] + delta
213 def load_policy(self):
215 Read the list of blacklisted and whitelisted nodes.
219 def load_slices(self):
221 Read current slice instantiation states.
223 print "loading slices"
227 def get_components(self):
229 Return a list of components managed by this slice manager.
231 # Reload components list
232 now = datetime.datetime.now()
233 #self.load_components()
234 if not self.threshold or not self.timestamp or now > self.threshold:
235 self.refresh_components()
236 elif now < self.threshold and not self.components:
237 self.load_components()
238 return self.nodes.keys()
241 def get_slices(self):
243 Return a list of instnatiated managed by this slice manager.
245 return dict(self.slices)
247 def get_resources(self, slice_hrn):
249 Return the current rspec for the specified slice.
251 cred = self.credential
253 if slice_hrn in self.slices.keys():
254 # check if we alreay have this slices state saved
255 rspec = self.slices[slice_hrn]
257 # request this slices state from all known aggregates
259 for hrn in self.aggregates.keys():
260 # XX need to use the right credentials for this call
261 # check if the slice has resources at this hrn
262 tempresources = self.aggregates[hrn].resources(cred, slice_hrn)
264 temprspec.parseString(temprspec)
265 if temprspec.getDictsByTagName('NodeSpec'):
266 # append this rspec to the list of rspecs
267 rspecdicts.append(temprspec.toDict())
269 # merge all these rspecs into one
270 start_time = int(self.timestamp['timestamp'].strftime("%s"))
271 end_time = int(self.duration.strftime("%s"))
272 duration = end_time - start_time
275 networks = [rspecdict['networks'][0] for rspecdict in rspecdicts]
276 resources = {'networks': networks, 'start_time': start_time, 'duration': duration}
277 # convert the plc dict to an rspec dict
278 resourceDict = RspecDict(resources)
279 resourceSpec = Rspec()
280 resourceSpec.parseDict(resourceDict)
281 rspec = resourceSpec.toxml()
282 # save this slices resources
283 self.slices[slice_hrn] = rspec
288 def create_slice(self, slice_hrn, rspec, attributes):
290 Instantiate the specified slice according to whats defined in the rspec.
292 # XX need to gget the correct credentials
293 cred = self.credential
295 # save slice state locally
296 # we can assume that spec object has been validated so its safer to
297 # save this instead of the unvalidated rspec the user gave us
298 self.slices[slice_hrn] = spec.toxml()
301 # extract network list from the rspec and create a separate
302 # rspec for each network
303 slicename = self.hrn_to_plcslicename(slice_hrn)
305 spec.parseString(rspec)
306 specDict = spec.toDict()
307 start_time = specDict['start_time']
308 end_time = specDict['end_time']
311 # only attempt to extract information about the aggregates we know about
312 for hrn in self.aggregates.keys():
313 netspec = spec.getDictByTagNameValue('NetSpec', 'hrn')
316 tempdict = {'start_time': star_time, 'end_time': end_time, 'networks': netspec}
317 #convert the plc dict to rpsec dict
318 resourceDict = RspecDict(tempdict)
321 tempspec.parseDict(resourceDict)
322 rspecs[hrn] = tempspec.toxml()
324 # notify the aggregates
325 for hrn in self.rspecs.keys():
326 self.aggregates[hrn].createSlice(cred, rspecs[hrn])
330 def update_slice(self, slice_hrn, rspec, attributes = []):
332 Update the specifed slice
334 self.create_slice(slice_hrn, rspec, attributes)
336 def delete_slice_(self, slice_hrn):
338 Remove this slice from all components it was previouly associated with and
339 free up the resources it was using.
341 # XX need to get the correct credential
342 cred = self.credential
344 if self.slices.has_key(slice_hrn):
345 self.slices.pop(slice_hrn)
348 for hrn in self.aggregates.keys():
349 self.aggregates[hrn].deleteSlice(cred, slice_hrn)
353 def start_slice(self, slice_hrn):
355 Stop the slice at plc.
357 cred = self.credential
359 for hrn in self.aggregates.keys():
360 self.aggregates[hrn].startSlice(cred, slice_hrn)
363 def stop_slice(self, slice_hrn):
365 Stop the slice at plc
367 cred = self.credential
368 for hrn in self.aggregates.keys():
369 self.aggregates[hrn].startSlice(cred, slice_hrn)
372 def reset_slice(self, slice_hrn):
376 # XX not yet implemented
379 def get_policy(self):
381 Return the policy of this slice manager.
388 ##############################
389 ## Server methods here for now
390 ##############################
393 return self.get_components()
396 return self.get_slices()
398 def resources(self, cred, hrn):
399 self.decode_authentication(cred, 'info')
400 self.verify_object_belongs_to_me(hrn)
402 return self.get_resources(hrn)
404 def create(self, cred, hrn, rspec):
405 self.decode_authentication(cred, 'embed')
406 self.verify_object_belongs_to_me(hrn)
407 return self.create(hrn)
409 def delete(self, cred, hrn):
410 self.decode_authentication(cred, 'embed')
411 self.verify_object_belongs_to_me(hrn)
412 return self.delete_slice(hrn)
414 def start(self, cred, hrn):
415 self.decode_authentication(cred, 'control')
416 return self.start(hrn)
418 def stop(self, cred, hrn):
419 self.decode_authentication(cred, 'control')
420 return self.stop(hrn)
422 def reset(self, cred, hrn):
423 self.decode_authentication(cred, 'control')
424 return self.reset(hrn)
426 def policy(self, cred):
427 self.decode_authentication(cred, 'info')
428 return self.get_policy()
430 def register_functions(self):
431 GeniServer.register_functions(self)
433 # Aggregate interface methods
434 self.server.register_function(self.components)
435 self.server.register_function(self.slices)
436 self.server.register_function(self.resources)
437 self.server.register_function(self.create)
438 self.server.register_function(self.delete)
439 self.server.register_function(self.start)
440 self.server.register_function(self.stop)
441 self.server.register_function(self.reset)
442 self.server.register_function(self.policy)