7 from geni.util.geniserver import *
8 from geni.util.geniclient import *
9 from geni.util.cert import *
10 from geni.util.trustedroot import *
11 from geni.util.excep import *
12 from geni.util.misc import *
13 from geni.util.config import Config
14 from geni.util.rspec import Rspec
16 class Aggregate(GeniServer):
31 # Create a new aggregate object.
33 # @param ip the ip address to listen on
34 # @param port the port to listen on
35 # @param key_file private key filename of registry
36 # @param cert_file certificate filename containing public key (could be a GID file)
38 def __init__(self, ip, port, key_file, cert_file, config = "/usr/share/geniwrapper/util/geni_config"):
39 GeniServer.__init__(self, ip, port, key_file, cert_file)
41 basedir = conf.GENI_BASE_DIR + os.sep
42 server_basedir = basedir + os.sep + "plc" + os.sep
43 self.hrn = conf.GENI_INTERFACE_HRN
44 self.nodes_file = os.sep.join([server_basedir, 'components', self.hrn + '.comp'])
45 self.whitelist_file = os.sep.join([server_basedir, 'policy', 'whitelist'])
46 self.blacklist_file = os.sep.join([server_basedir, 'policy', 'blacklist'])
47 self.timestamp_file = os.sep.join([server_basedir, 'components', self.hrn + '.timestamp'])
49 self.policy['whitelist'] = []
50 self.policy['blacklist'] = []
52 self.connectRegistry()
54 def connectRegistry(self):
56 Connect to the registry
62 Connect to the plc api interface. First attempt to impor thte shell, if that fails
63 try to connect to the xmlrpc server.
65 self.auth = {'Username': conf.GENI_PLC_USER,
66 'AuthMethod': 'password',
67 'AuthString': conf.GENI_PLC_PASSWORD}
70 # try to import PLC.Shell directly
71 sys.path.append(conf.GENI_PLC_SHELL_PATH)
73 self.shell = PLC.Shell.Shell(globals())
74 self.shell.AuthCheck()
76 # connect to plc api via xmlrpc
77 plc_host = conf.GENI_PLC_HOST
78 plc_port = conf.GENI_PLC_PORT
79 plc_api_path = conf.GENI_PLC_API_PATH
80 url = "https://%(plc_host)s:%(plc_port)s/%(plc_api_path)s/" % locals()
81 self.auth = {'Username': conf.GENI_PLC_USER,
82 'AuthMethod': 'password',
83 'AuthString': conf.GENI_PLC_PASSWORD}
85 self.shell = xmlrpclib.Server(url, verbose = 0, allow_none = True)
86 self.shell.AuthCheck(self.auth)
88 def hostname_to_hrn(self, login_base, hostname):
90 Convert hrn to plantelab name.
92 genihostname = "_".join(hostname.split("."))
93 return ".".join([self.hrn, login_base, genihostname])
95 def slicename_to_hrn(self, slicename):
97 Convert hrn to planetlab name.
99 slicename = slicename.replace("_", ".")
100 return ".".join([self.hrn, slicename])
102 def refresh_components(self):
104 Update the cached list of nodes.
106 # resolve component hostnames
107 nodes = self.shell.GetNodes(self.auth, {}, ['hostname', 'site_id'])
109 # resolve site login_bases
110 site_ids = [node['site_id'] for node in nodes]
111 sites = self.shell.GetSites(self.auth, site_ids, ['site_id', 'login_base'])
114 site_dict[site['site_id']] = site['login_base']
116 # convert plc names to geni hrn
117 self.nodes = [self.hostname_to_hrn(site_dict[node['site_id']], node['hostname']) for node in nodes]
119 # apply policy. Do not allow nodes found in blacklist, only allow nodes found in whitelist
120 whitelist_policy = lambda node: node in self.policy['whitelist']
121 blacklist_policy = lambda node: node not in self.policy['blacklist']
123 if self.policy['blacklist']:
124 self.nodes = blacklist_policy(self.nodes)
125 if self.policy['whitelist']:
126 self.nodes = whitelist_policy(self.nodes)
128 # update timestamp and threshold
129 self.timestamp = datetime.datetime.now()
130 delta = datetime.timedelta(hours=self.nodes_ttl)
131 self.threshold = self.timestamp + delta
133 f = open(self.nodes_file, 'w')
134 f.write(str(self.nodes))
136 f = open(self.timestamp_file, 'w')
137 f.write(str(self.threshold))
140 def load_components(self):
142 Read cached list of nodes.
144 # Read component list from cached file
145 if os.path.exists(self.nodes_file):
146 f = open(self.nodes_file, 'r')
147 self.nodes = eval(f.read())
150 time_format = "%Y-%m-%d %H:%M:%S"
151 if os.path.exists(self.timestamp_file):
152 f = open(self.timestamp_file, 'r')
153 timestamp = str(f.read()).split(".")[0]
154 self.timestamp = datetime.datetime.fromtimestamp(time.mktime(time.strptime(timestamp, time_format)))
155 delta = datetime.timedelta(hours=self.nodes_ttl)
156 self.threshold = self.timestamp + delta
159 def load_policy(self):
161 Read the list of blacklisted and whitelisted nodes.
165 if os.path.exists(self.whitelist_file):
166 f = open(self.whitelist_file, 'r')
167 lines = f.readlines()
170 line = line.strip().replace(" ", "").replace("\n", "")
171 whitelist.extend(line.split(","))
174 if os.path.exists(self.blacklist_file):
175 f = open(self.blacklist_file, 'r')
176 lines = f.readlines()
179 line = line.strip().replace(" ", "").replace("\n", "")
180 blacklist.extend(line.split(","))
182 self.policy['whitelist'] = whitelist
183 self.policy['blacklist'] = blacklist
185 def get_components(self):
187 Return a list of components at this aggregate.
189 # Reload components list
190 now = datetime.datetime.now()
191 #self.load_components()
192 if not self.threshold or not self.timestamp or now > self.threshold:
193 self.refresh_components()
194 elif now < self.threshold and not self.nodes:
195 self.load_components()
198 def get_rspec(self, hrn, type):
200 rspec['nodespec'] = {'name': self.conf.GENI_INTERFACE_HRN}
201 rsepc['nodespec']['nodes'] = []
203 nodes = self.shell.GetNodes(self.auth)
204 elif type in ['slice']:
205 slicename = hrn_to_pl_slicename(hrn)
206 slices = self.shell.GetSlices(self.auth, [slicename])
207 node_ids = slices[0]['node_ids']
208 nodes = self.shell.GetNodes(self.auth, node_ids)
210 nodespec = {'name': node['hostname'], 'type': 'std'}
211 elif type in ['aggregate']:
216 def get_resources(self, slice_hrn):
218 Return the current rspec for the specified slice.
220 slicename = hrn_to_plcslicename(slice_hrn)
221 rspec = self.get_rspec(slicenamem, 'slice')
225 def create_slice(self, slice_hrn, rspec, attributes = []):
227 Instantiate the specified slice according to whats defined in the rspec.
229 slicename = self.hrn_to_plcslicename(slice_hrn)
231 nodespecs = spec.getDictsByTagName('NodeSpec')
232 nodes = [nodespec['name'] for nodespec in nodespecs]
233 self.shell.AddSliceToNodes(self.auth, slicename, nodes)
234 for attribute in attributes:
235 type, value, node, nodegroup = attribute['type'], attribute['value'], attribute['node'], attribute['nodegroup']
236 shell.AddSliceAttribute(self.auth, slicename, type, value, node, nodegroup)
238 # XX contact the registry to get the list of users on this slice and
240 #slice_record = self.registry.resolve(slice_hrn)
241 #person_records = slice_record['users']
242 # for person in person_record:
243 # email = person['email']
244 # self.shell.AddPersonToSlice(self.auth, email, slicename)
249 def update_slice(self, slice_hrn, rspec, attributes = []):
251 Update the specified slice.
254 slicename = self.hrn_to_plcslicename(slice_hrn)
255 slices = self.shell.GetSlices(self.auth, [slicename], ['node_ids'])
257 raise RecordNotFound(slice_hrn)
260 # find out where this slice is currently running
261 nodes = self.shell.GetNodes(self.auth, slice['node_ids'], ['hostname'])
262 hostnames = [node['hostname'] for node in nodes]
264 # get netspec details
266 nodespecs = spec.getDictsByTagName('NodeSpec')
267 nodes = [nodespec['name'] for nodespec in nodespecs]
268 # remove nodes not in rspec
269 delete_nodes = set(hostnames).difference(nodes)
270 # add nodes from rspec
271 added_nodes = set(nodes).difference(hostnames)
273 shell.AddSliceToNodes(self.auth, slicename, added_nodes)
274 shell.DeleteSliceFromNodes(self.auth, slicename, deleted_nodes)
276 for attribute in attributes:
277 type, value, node, nodegroup = attribute['type'], attribute['value'], attribute['node'], attribute['nodegroup']
278 shell.AddSliceAttribute(self.auth, slicename, type, value, node, nodegroup)
280 # contact registry to get slice users and add them to the slice
281 # slice_record = self.registry.resolve(slice_hrn)
282 # persons = slice_record['users']
284 #for person in persons:
285 # shell.AddPersonToSlice(person['email'], slice_name)
286 def delete_slice_(self, slice_hrn):
288 Remove this slice from all components it was previouly associated with and
289 free up the resources it was using.
291 slicename = self.hrn_to_plcslicename(slice_hrn)
292 slices = shell.GetSlices(self.auth, [slicename])
294 raise RecordNotFound(slice_hrn)
297 shell.DeleteSliceFromNodes(self.auth, slicename, slice['node_ids'])
300 def start_slice(self, slice_hrn):
302 Stop the slice at plc.
304 slicename = hrn_to_plcslicename(slice_hrn)
305 slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id'])
307 raise RecordNotFound(slice_hrn)
309 atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
310 attribute_id = attreibutes[0]
311 self.shell.UpdateSliceAttribute(self.auth, attribute_id, "1" )
314 def stop_slice(self, slice_hrn):
316 Stop the slice at plc
318 slicename = hrn_to_plcslicename(slice_hrn)
319 slices = self.shell.GetSlices(self.auth, {'name': slicename}, ['slice_id'])
321 raise RecordNotFound(slice_hrn)
323 atrribtes = self.shell.GetSliceAttributes({'slice_id': slice_id, 'name': 'enabled'}, ['slice_attribute_id'])
324 attribute_id = attreibutes[0]
325 self.shell.UpdateSliceAttribute(self.auth, attribute_id, "0")
329 def reset_slice(self, slice_hrn):
333 slicename = self.hrn_to_plcslicename(slice_hrn)
336 def get_policy(self):
338 Return this aggregates policy.
345 ##############################
346 ## Server methods here for now
347 ##############################
349 def components(self):
350 return self.get_components()
353 # return self.get_slices()
355 def resources(self, cred, hrn):
356 self.decode_authentication(cred, 'info')
357 self.verify_object_belongs_to_me(hrn)
359 return self.get_resources(hrn)
361 def create(self, cred, hrn, rspec):
362 self.decode_authentication(cred, 'embed')
363 self.verify_object_belongs_to_me(hrn)
364 return self.create(hrn)
366 def update(self, cred, hrn, rspec):
367 self.decode_authentication(cred, 'embed')
368 self.verify_object_belongs_to_me(hrn)
369 return self.update(hrn)
371 def delete(self, cred, hrn):
372 self.decode_authentication(cred, 'embed')
373 self.verify_object_belongs_to_me(hrn)
374 return self.delete_slice(hrn)
376 def start(self, cred, hrn):
377 self.decode_authentication(cred, 'control')
378 return self.start(hrn)
380 def stop(self, cred, hrn):
381 self.decode_authentication(cred, 'control')
382 return self.stop(hrn)
384 def reset(self, cred, hrn):
385 self.decode_authentication(cred, 'control')
386 return self.reset(hrn)
388 def policy(self, cred):
389 self.decode_authentication(cred, 'info')
390 return self.get_policy()
392 def register_functions(self):
393 GeniServer.register_functions(self)
395 # Aggregate interface methods
396 self.server.register_function(self.components)
397 #self.server.register_function(self.slices)
398 self.server.register_function(self.resources)
399 self.server.register_function(self.create)
400 self.server.register_function(self.delete)
401 self.server.register_function(self.start)
402 self.server.register_function(self.stop)
403 self.server.register_function(self.reset)
404 self.server.register_function(self.policy)