2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
24 from nepi.util.constants import TESTBED_STATUS_CONFIGURED
26 class TempKeyError(Exception):
29 class TestbedController(testbed_impl.TestbedController):
30 def __init__(self, testbed_version):
31 super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
32 self._home_directory = None
36 import node, interfaces, application
38 self._interfaces = interfaces
39 self._app = application
41 self._blacklist = set()
44 def home_directory(self):
45 return self._home_directory
49 if not hasattr(self, '_plapi'):
53 self._plapi = plcapi.PLCAPI(
54 username = self.authUser,
55 password = self.authString,
56 hostname = self.plcHost,
57 urlpattern = self.plcUrl
60 # anonymous access - may not be enough for much
61 self._plapi = plcapi.PLCAPI()
66 if not hasattr(self, '_slice_id'):
67 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
69 self._slice_id = slices[0]['slice_id']
71 # If it wasn't found, don't remember this failure, keep trying
76 self._home_directory = self._attributes.\
77 get_attribute_value("homeDirectory")
78 self.slicename = self._attributes.\
79 get_attribute_value("slice")
80 self.authUser = self._attributes.\
81 get_attribute_value("authUser")
82 self.authString = self._attributes.\
83 get_attribute_value("authPass")
84 self.sliceSSHKey = self._attributes.\
85 get_attribute_value("sliceSSHKey")
86 self.sliceSSHKeyPass = None
87 self.plcHost = self._attributes.\
88 get_attribute_value("plcHost")
89 self.plcUrl = self._attributes.\
90 get_attribute_value("plcUrl")
91 super(TestbedController, self).do_setup()
93 def do_post_asynclaunch(self, guid):
94 # Dependencies were launched asynchronously,
96 dep = self._elements[guid]
97 if isinstance(dep, self._app.Dependency):
98 dep.async_setup_wait()
100 # Two-phase configuration for asynchronous launch
101 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
102 do_poststep_configure = staticmethod(do_post_asynclaunch)
104 def do_preconfigure(self):
106 # Perform resource discovery if we don't have
107 # specific resources assigned yet
108 self.do_resource_discovery()
110 # Create PlanetLab slivers
111 self.do_provisioning()
114 # Wait for provisioning
119 except self._node.UnresponsiveNodeError:
123 # Plan application deployment
124 self.do_spanning_deployment_plan()
126 # Configure elements per XML data
127 super(TestbedController, self).do_preconfigure()
129 def do_resource_discovery(self):
130 to_provision = self._to_provision = set()
132 reserved = set(self._blacklist)
133 for guid, node in self._elements.iteritems():
134 if isinstance(node, self._node.Node) and node._node_id is not None:
135 reserved.add(node._node_id)
138 # look for perfectly defined nodes
139 # (ie: those with only one candidate)
140 for guid, node in self._elements.iteritems():
141 if isinstance(node, self._node.Node) and node._node_id is None:
142 # Try existing nodes first
143 # If we have only one candidate, simply use it
144 candidates = node.find_candidates(
145 filter_slice_id = self.slice_id)
146 candidates -= reserved
147 if len(candidates) == 1:
148 node_id = iter(candidates).next()
149 node.assign_node_id(node_id)
150 reserved.add(node_id)
152 # Try again including unassigned nodes
153 candidates = node.find_candidates()
154 if len(candidates) > 1:
156 if len(candidates) == 1:
157 node_id = iter(candidates).next()
158 node.assign_node_id(node_id)
159 to_provision.add(node_id)
160 reserved.add(node_id)
162 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
163 node.make_filter_description())
165 # Now do the backtracking search for a suitable solution
166 # First with existing slice nodes
169 for guid, node in self._elements.iteritems():
170 if isinstance(node, self._node.Node) and node._node_id is None:
171 # Try existing nodes first
172 # If we have only one candidate, simply use it
173 candidates = node.find_candidates(
174 filter_slice_id = self.slice_id)
175 candidates -= reserved
176 reqs.append(candidates)
181 solution = resourcealloc.alloc(reqs)
182 except resourcealloc.ResourceAllocationError:
183 # Failed, try again with all nodes
186 candidates = node.find_candidates()
187 reqs.append(candidates)
189 solution = resourcealloc.alloc(reqs)
190 to_provision.update(solution)
193 for node, node_id in zip(nodes, solution):
194 node.assign_node_id(node_id)
196 def do_provisioning(self):
197 if self._to_provision:
198 # Add new nodes to the slice
199 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
200 new_nodes = list(set(cur_nodes) | self._to_provision)
201 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
204 del self._to_provision
206 def do_wait_nodes(self):
207 for guid, node in self._elements.iteritems():
208 if isinstance(node, self._node.Node):
209 # Just inject configuration stuff
210 node.home_path = "nepi-node-%s" % (guid,)
211 node.ident_path = self.sliceSSHKey
212 node.slicename = self.slicename
215 print "PlanetLab Node", guid, "configured at", node.hostname
218 for guid, node in self._elements.iteritems():
219 if isinstance(node, self._node.Node):
220 print "Waiting for Node", guid, "configured at", node.hostname,
223 node.wait_provisioning()
226 except self._node.UnresponsiveNodeError:
230 # Mark all dead nodes (which are unresponsive) on the blacklist
232 for guid, node in self._elements.iteritems():
233 if isinstance(node, self._node.Node):
234 if not node.is_alive():
235 print "Blacklisting", node.hostname, "for unresponsiveness"
236 self._blacklist.add(node._node_id)
240 def do_spanning_deployment_plan(self):
241 # Create application groups by collecting all applications
242 # based on their hash - the hash should contain everything that
243 # defines them and the platform they're built
247 frozenset((app.depends or "").split(' ')),
248 frozenset((app.sources or "").split(' ')),
251 app.node.architecture,
252 app.node.operatingSystem,
256 depgroups = collections.defaultdict(list)
258 for element in self._elements.itervalues():
259 if isinstance(element, self._app.Dependency):
260 depgroups[dephash(element)].append(element)
262 # Set up spanning deployment for those applications that
263 # have been deployed in several nodes.
264 for dh, group in depgroups.iteritems():
266 # Pick root (deterministically)
267 root = min(group, key=lambda app:app.node.hostname)
269 # Obtain all IPs in numeric format
270 # (which means faster distance computations)
272 dep._ip = socket.gethostbyname(dep.node.hostname)
273 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
276 # NOTE: the plan is an iterator
279 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
283 # Re-sign private key
285 tempprk, temppuk, tmppass = self._make_temp_private_key()
291 for slave, master in plan:
292 slave.set_master(master)
293 slave.install_keys(tempprk, temppuk, tmppass)
295 # We don't need the user's passphrase anymore
296 self.sliceSSHKeyPass = None
298 def _make_temp_private_key(self):
299 # Get the user's key's passphrase
300 if not self.sliceSSHKeyPass:
301 if 'SSH_ASKPASS' in os.environ:
302 proc = subprocess.Popen(
303 [ os.environ['SSH_ASKPASS'],
304 "Please type the passphrase for the %s SSH identity file. "
305 "The passphrase will be used to re-cipher the identity file with "
306 "a random 256-bit key for automated chain deployment on the "
307 "%s PlanetLab slice" % (
308 os.path.basename(self.sliceSSHKey),
311 stdin = open("/dev/null"),
312 stdout = subprocess.PIPE,
313 stderr = subprocess.PIPE)
314 out,err = proc.communicate()
315 self.sliceSSHKeyPass = out.strip()
317 if not self.sliceSSHKeyPass:
320 # Create temporary key files
321 prk = tempfile.NamedTemporaryFile(
322 dir = self.root_directory,
323 prefix = "pl_deploy_tmpk_",
326 puk = tempfile.NamedTemporaryFile(
327 dir = self.root_directory,
328 prefix = "pl_deploy_tmpk_",
331 # Create secure 256-bits temporary passphrase
332 passphrase = ''.join(map(chr,[rng.randint(0,255)
333 for rng in (random.SystemRandom(),)
334 for i in xrange(32)] )).encode("hex")
337 oprk = open(self.sliceSSHKey, "rb")
338 opuk = open(self.sliceSSHKey+".pub", "rb")
339 shutil.copymode(oprk.name, prk.name)
340 shutil.copymode(opuk.name, puk.name)
341 shutil.copyfileobj(oprk, prk)
342 shutil.copyfileobj(opuk, puk)
348 # A descriptive comment
349 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
352 proc = subprocess.Popen(
355 "-P", self.sliceSSHKeyPass,
358 stdout = subprocess.PIPE,
359 stderr = subprocess.PIPE,
360 stdin = subprocess.PIPE
362 out, err = proc.communicate()
365 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
371 # Change comment on public key
372 puklines = puk.readlines()
373 puklines[0] = puklines[0].split(' ')
374 puklines[0][-1] = comment+'\n'
375 puklines[0] = ' '.join(puklines[0])
378 puk.writelines(puklines)
382 return prk, puk, passphrase
384 def set(self, guid, name, value, time = TIME_NOW):
385 super(TestbedController, self).set(guid, name, value, time)
386 # TODO: take on account schedule time for the task
387 element = self._elements[guid]
389 setattr(element, name, value)
391 if hasattr(element, 'refresh'):
392 # invoke attribute refresh hook
395 def get(self, guid, name, time = TIME_NOW):
396 value = super(TestbedController, self).get(guid, name, time)
397 # TODO: take on account schedule time for the task
398 factory_id = self._create[guid]
399 factory = self._factories[factory_id]
400 if factory.box_attributes.is_attribute_design_only(name):
402 element = self._elements.get(guid)
404 return getattr(element, name)
405 except KeyError, AttributeError:
408 def get_address(self, guid, index, attribute='Address'):
412 iface = self._elements.get(guid)
413 if iface and index == 0:
414 if attribute == 'Address':
416 elif attribute == 'NetPrefix':
417 return iface.netprefix
418 elif attribute == 'Broadcast':
419 return iface.broadcast
421 # if all else fails, query box
422 return super(TestbedController, self).get_address(guid, index, attribute)
424 def action(self, time, guid, action):
425 raise NotImplementedError
428 for trace in self._traces.itervalues():
430 for element in self._elements.itervalues():
431 # invoke cleanup hooks
432 if hasattr(element, 'cleanup'):
434 for element in self._elements.itervalues():
435 # invoke destroy hooks
436 if hasattr(element, 'destroy'):
438 self._elements.clear()
441 def trace(self, guid, trace_id, attribute='value'):
442 app = self._elements[guid]
444 if attribute == 'value':
445 path = app.sync_trace(self.home_directory, trace_id)
452 elif attribute == 'path':
453 content = app.remote_trace_path(trace_id)
454 elif attribute == 'size':
456 raise NotImplementedError
461 def follow_trace(self, trace_id, trace):
462 self._traces[trace_id] = trace
464 def _make_generic(self, parameters, kind):
465 app = kind(self.plapi)
467 # Note: there is 1-to-1 correspondence between attribute names
468 # If that changes, this has to change as well
469 for attr,val in parameters.iteritems():
470 setattr(app, attr, val)
474 def _make_node(self, parameters):
475 node = self._make_generic(parameters, self._node.Node)
477 # If emulation is enabled, we automatically need
478 # some vsys interfaces and packages
480 node.required_vsys.add('ipfw-be')
481 node.required_packages.add('ipfwslice')
485 def _make_node_iface(self, parameters):
486 return self._make_generic(parameters, self._interfaces.NodeIface)
488 def _make_tun_iface(self, parameters):
489 return self._make_generic(parameters, self._interfaces.TunIface)
491 def _make_tap_iface(self, parameters):
492 return self._make_generic(parameters, self._interfaces.TapIface)
494 def _make_netpipe(self, parameters):
495 return self._make_generic(parameters, self._interfaces.NetPipe)
497 def _make_internet(self, parameters):
498 return self._make_generic(parameters, self._interfaces.Internet)
500 def _make_application(self, parameters):
501 return self._make_generic(parameters, self._app.Application)
503 def _make_dependency(self, parameters):
504 return self._make_generic(parameters, self._app.Dependency)
506 def _make_nepi_dependency(self, parameters):
507 return self._make_generic(parameters, self._app.NepiDependency)
509 def _make_ns3_dependency(self, parameters):
510 return self._make_generic(parameters, self._app.NS3Dependency)