2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
29 class TempKeyError(Exception):
32 class TestbedController(testbed_impl.TestbedController):
34 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
35 self._home_directory = None
39 import node, interfaces, application
41 self._interfaces = interfaces
42 self._app = application
44 self._blacklist = set()
45 self._just_provisioned = set()
47 self._load_blacklist()
49 self._logger = logging.getLogger('nepi.testbeds.planetlab')
52 def home_directory(self):
53 return self._home_directory
57 if not hasattr(self, '_plapi'):
61 self._plapi = plcapi.PLCAPI(
62 username = self.authUser,
63 password = self.authString,
64 hostname = self.plcHost,
65 urlpattern = self.plcUrl
68 # anonymous access - may not be enough for much
69 self._plapi = plcapi.PLCAPI()
74 if not hasattr(self, '_slice_id'):
75 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
77 self._slice_id = slices[0]['slice_id']
79 # If it wasn't found, don't remember this failure, keep trying
83 def _load_blacklist(self):
84 blpath = environ.homepath('plblacklist')
87 bl = open(blpath, "r")
89 self._blacklist = set()
93 self._blacklist = set(
95 map(str.strip, bl.readlines())
101 def _save_blacklist(self):
102 blpath = environ.homepath('plblacklist')
103 bl = open(blpath, "w")
106 map('%s\n'.__mod__, self._blacklist))
111 self._home_directory = self._attributes.\
112 get_attribute_value("homeDirectory")
113 self.slicename = self._attributes.\
114 get_attribute_value("slice")
115 self.authUser = self._attributes.\
116 get_attribute_value("authUser")
117 self.authString = self._attributes.\
118 get_attribute_value("authPass")
119 self.sliceSSHKey = self._attributes.\
120 get_attribute_value("sliceSSHKey")
121 self.sliceSSHKeyPass = None
122 self.plcHost = self._attributes.\
123 get_attribute_value("plcHost")
124 self.plcUrl = self._attributes.\
125 get_attribute_value("plcUrl")
126 self.logLevel = self._attributes.\
127 get_attribute_value("plLogLevel")
128 self.tapPortBase = self._attributes.\
129 get_attribute_value("tapPortBase")
130 self.p2pDeployment = self._attributes.\
131 get_attribute_value("p2pDeployment")
132 self.dedicatedSlice = self._attributes.\
133 get_attribute_value("dedicatedSlice")
135 self._logger.setLevel(getattr(logging,self.logLevel))
137 super(TestbedController, self).do_setup()
139 def do_post_asynclaunch(self, guid):
140 # Dependencies were launched asynchronously,
142 dep = self._elements[guid]
143 if isinstance(dep, self._app.Dependency):
144 dep.async_setup_wait()
146 # Two-phase configuration for asynchronous launch
147 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
148 do_poststep_configure = staticmethod(do_post_asynclaunch)
150 def do_preconfigure(self):
152 # Perform resource discovery if we don't have
153 # specific resources assigned yet
154 self.do_resource_discovery()
156 # Create PlanetLab slivers
157 self.do_provisioning()
160 # Wait for provisioning
165 except self._node.UnresponsiveNodeError:
169 if self.p2pDeployment:
170 # Plan application deployment
171 self.do_spanning_deployment_plan()
173 # Configure elements per XML data
174 super(TestbedController, self).do_preconfigure()
176 def do_resource_discovery(self, recover = False):
177 to_provision = self._to_provision = set()
179 reserved = set(self._blacklist)
180 for guid, node in self._elements.iteritems():
181 if isinstance(node, self._node.Node) and node._node_id is not None:
182 reserved.add(node._node_id)
185 # look for perfectly defined nodes
186 # (ie: those with only one candidate)
187 for guid, node in self._elements.iteritems():
188 if isinstance(node, self._node.Node) and node._node_id is None:
189 # Try existing nodes first
190 # If we have only one candidate, simply use it
191 candidates = node.find_candidates(
192 filter_slice_id = self.slice_id)
193 candidates -= reserved
194 if len(candidates) == 1:
195 node_id = iter(candidates).next()
196 node.assign_node_id(node_id)
197 reserved.add(node_id)
199 # Try again including unassigned nodes
200 candidates = node.find_candidates()
201 candidates -= reserved
202 if len(candidates) > 1:
204 if len(candidates) == 1:
205 node_id = iter(candidates).next()
206 node.assign_node_id(node_id)
207 to_provision.add(node_id)
208 reserved.add(node_id)
210 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
211 node.make_filter_description())
213 # Now do the backtracking search for a suitable solution
214 # First with existing slice nodes
217 for guid, node in self._elements.iteritems():
218 if isinstance(node, self._node.Node) and node._node_id is None:
219 # Try existing nodes first
220 # If we have only one candidate, simply use it
221 candidates = node.find_candidates(
222 filter_slice_id = self.slice_id)
223 candidates -= reserved
224 reqs.append(candidates)
229 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
232 solution = resourcealloc.alloc(reqs)
233 except resourcealloc.ResourceAllocationError:
234 # Failed, try again with all nodes
237 candidates = node.find_candidates()
238 candidates -= reserved
239 reqs.append(candidates)
241 solution = resourcealloc.alloc(reqs)
242 to_provision.update(solution)
245 for node, node_id in zip(nodes, solution):
246 node.assign_node_id(node_id)
248 def do_provisioning(self):
249 if self._to_provision:
250 # Add new nodes to the slice
251 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
252 new_nodes = list(set(cur_nodes) | self._to_provision)
253 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
256 self._just_provisioned = self._to_provision
257 del self._to_provision
259 def do_wait_nodes(self):
260 for guid, node in self._elements.iteritems():
261 if isinstance(node, self._node.Node):
262 # Just inject configuration stuff
263 node.home_path = "nepi-node-%s" % (guid,)
264 node.ident_path = self.sliceSSHKey
265 node.slicename = self.slicename
268 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
271 for guid, node in self._elements.iteritems():
272 if isinstance(node, self._node.Node):
273 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
275 node.wait_provisioning(
276 (20*60 if node._node_id in self._just_provisioned else 60)
279 self._logger.info("READY Node %s at %s", guid, node.hostname)
281 # Prepare dependency installer now
282 node.prepare_dependencies()
283 except self._node.UnresponsiveNodeError:
285 self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
287 # Mark all dead nodes (which are unresponsive) on the blacklist
289 for guid, node in self._elements.iteritems():
290 if isinstance(node, self._node.Node):
291 if not node.is_alive():
292 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
293 self._blacklist.add(node._node_id)
297 self._save_blacklist()
301 traceback.print_exc()
305 def do_spanning_deployment_plan(self):
306 # Create application groups by collecting all applications
307 # based on their hash - the hash should contain everything that
308 # defines them and the platform they're built
312 frozenset((app.depends or "").split(' ')),
313 frozenset((app.sources or "").split(' ')),
316 app.node.architecture,
317 app.node.operatingSystem,
321 depgroups = collections.defaultdict(list)
323 for element in self._elements.itervalues():
324 if isinstance(element, self._app.Dependency):
325 depgroups[dephash(element)].append(element)
326 elif isinstance(element, self._node.Node):
327 deps = element._yum_dependencies
329 depgroups[dephash(deps)].append(deps)
331 # Set up spanning deployment for those applications that
332 # have been deployed in several nodes.
333 for dh, group in depgroups.iteritems():
335 # Pick root (deterministically)
336 root = min(group, key=lambda app:app.node.hostname)
338 # Obtain all IPs in numeric format
339 # (which means faster distance computations)
341 dep._ip = socket.gethostbyname(dep.node.hostname)
342 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
345 # NOTE: the plan is an iterator
348 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
352 # Re-sign private key
354 tempprk, temppuk, tmppass = self._make_temp_private_key()
360 for slave, master in plan:
361 slave.set_master(master)
362 slave.install_keys(tempprk, temppuk, tmppass)
364 # We don't need the user's passphrase anymore
365 self.sliceSSHKeyPass = None
367 def _make_temp_private_key(self):
368 # Get the user's key's passphrase
369 if not self.sliceSSHKeyPass:
370 if 'SSH_ASKPASS' in os.environ:
371 proc = subprocess.Popen(
372 [ os.environ['SSH_ASKPASS'],
373 "Please type the passphrase for the %s SSH identity file. "
374 "The passphrase will be used to re-cipher the identity file with "
375 "a random 256-bit key for automated chain deployment on the "
376 "%s PlanetLab slice" % (
377 os.path.basename(self.sliceSSHKey),
380 stdin = open("/dev/null"),
381 stdout = subprocess.PIPE,
382 stderr = subprocess.PIPE)
383 out,err = proc.communicate()
384 self.sliceSSHKeyPass = out.strip()
386 if not self.sliceSSHKeyPass:
389 # Create temporary key files
390 prk = tempfile.NamedTemporaryFile(
391 dir = self.root_directory,
392 prefix = "pl_deploy_tmpk_",
395 puk = tempfile.NamedTemporaryFile(
396 dir = self.root_directory,
397 prefix = "pl_deploy_tmpk_",
400 # Create secure 256-bits temporary passphrase
401 passphrase = ''.join(map(chr,[rng.randint(0,255)
402 for rng in (random.SystemRandom(),)
403 for i in xrange(32)] )).encode("hex")
406 oprk = open(self.sliceSSHKey, "rb")
407 opuk = open(self.sliceSSHKey+".pub", "rb")
408 shutil.copymode(oprk.name, prk.name)
409 shutil.copymode(opuk.name, puk.name)
410 shutil.copyfileobj(oprk, prk)
411 shutil.copyfileobj(opuk, puk)
417 # A descriptive comment
418 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
421 proc = subprocess.Popen(
424 "-P", self.sliceSSHKeyPass,
427 stdout = subprocess.PIPE,
428 stderr = subprocess.PIPE,
429 stdin = subprocess.PIPE
431 out, err = proc.communicate()
434 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
440 # Change comment on public key
441 puklines = puk.readlines()
442 puklines[0] = puklines[0].split(' ')
443 puklines[0][-1] = comment+'\n'
444 puklines[0] = ' '.join(puklines[0])
447 puk.writelines(puklines)
451 return prk, puk, passphrase
453 def set(self, guid, name, value, time = TIME_NOW):
454 super(TestbedController, self).set(guid, name, value, time)
455 # TODO: take on account schedule time for the task
456 element = self._elements[guid]
458 setattr(element, name, value)
460 if hasattr(element, 'refresh'):
461 # invoke attribute refresh hook
464 def get(self, guid, name, time = TIME_NOW):
465 value = super(TestbedController, self).get(guid, name, time)
466 # TODO: take on account schedule time for the task
467 factory_id = self._create[guid]
468 factory = self._factories[factory_id]
469 element = self._elements.get(guid)
471 return getattr(element, name)
472 except (KeyError, AttributeError):
475 def get_address(self, guid, index, attribute='Address'):
479 iface = self._elements.get(guid)
480 if iface and index == 0:
481 if attribute == 'Address':
483 elif attribute == 'NetPrefix':
484 return iface.netprefix
485 elif attribute == 'Broadcast':
486 return iface.broadcast
488 # if all else fails, query box
489 return super(TestbedController, self).get_address(guid, index, attribute)
491 def action(self, time, guid, action):
492 raise NotImplementedError
495 for trace in self._traces.itervalues():
498 def invokeif(action, testbed, guid):
499 element = self._elements[guid]
500 if hasattr(element, action):
501 getattr(element, action)()
503 self._do_in_factory_order(
504 functools.partial(invokeif, 'cleanup'),
505 metadata.shutdown_order)
507 self._do_in_factory_order(
508 functools.partial(invokeif, 'destroy'),
509 metadata.shutdown_order)
511 self._elements.clear()
514 def trace(self, guid, trace_id, attribute='value'):
515 app = self._elements[guid]
517 if attribute == 'value':
518 path = app.sync_trace(self.home_directory, trace_id)
525 elif attribute == 'path':
526 content = app.remote_trace_path(trace_id)
531 def follow_trace(self, trace_id, trace):
532 self._traces[trace_id] = trace
535 # Create and connect do not perform any real tasks against
536 # the nodes, it only sets up the object hierarchy,
537 # so we can run them normally
539 self.do_connect_init()
540 self.do_connect_compl()
542 # Manually recover nodes, to mark dependencies installed
543 # and clean up mutable attributes
544 self._do_in_factory_order(
545 lambda self, guid : self._elements[guid].recover(),
550 # Assign nodes - since we're working off exeucte XML, nodes
551 # have specific hostnames assigned and we don't need to do
552 # real assignment, only find out node ids and check liveliness
553 self.do_resource_discovery(recover = True)
556 # Pre/post configure, however, tends to set up tunnels
557 # Execute configuration steps only for those object
558 # kinds that do not have side effects
560 # Do the ones without side effects,
561 # including nodes that need to set up home
562 # folders and all that
563 self._do_in_factory_order(
564 "preconfigure_function",
567 Parallel(metadata.NODE),
571 # Tunnels require a home path that is configured
572 # at this step. Since we cannot run the step itself,
573 # we need to inject this homepath ourselves
574 for guid, element in self._elements.iteritems():
575 if isinstance(element, self._interfaces.TunIface):
576 element._home_path = "tun-%s" % (guid,)
578 # Manually recover tunnels, applications and
579 # netpipes, negating the side effects
580 self._do_in_factory_order(
581 lambda self, guid : self._elements[guid].recover(),
583 Parallel(metadata.TAPIFACE),
584 Parallel(metadata.TUNIFACE),
586 Parallel(metadata.NEPIDEPENDENCY),
587 Parallel(metadata.NS3DEPENDENCY),
588 Parallel(metadata.DEPENDENCY),
589 Parallel(metadata.APPLICATION),
592 # Tunnels are not harmed by configuration after
593 # recovery, and some attributes get set this way
594 # like external_iface
595 self._do_in_factory_order(
596 "preconfigure_function",
598 Parallel(metadata.TAPIFACE),
599 Parallel(metadata.TUNIFACE),
602 # Post-do the ones without side effects
603 self._do_in_factory_order(
604 "configure_function",
607 Parallel(metadata.NODE),
609 Parallel(metadata.TAPIFACE),
610 Parallel(metadata.TUNIFACE),
613 # There are no required prestart steps
614 # to call upon recovery, so we're done
617 def _make_generic(self, parameters, kind):
618 app = kind(self.plapi)
620 # Note: there is 1-to-1 correspondence between attribute names
621 # If that changes, this has to change as well
622 for attr,val in parameters.iteritems():
623 setattr(app, attr, val)
627 def _make_node(self, parameters):
628 node = self._make_generic(parameters, self._node.Node)
629 node.enable_cleanup = self.dedicatedSlice
632 def _make_node_iface(self, parameters):
633 return self._make_generic(parameters, self._interfaces.NodeIface)
635 def _make_tun_iface(self, parameters):
636 return self._make_generic(parameters, self._interfaces.TunIface)
638 def _make_tap_iface(self, parameters):
639 return self._make_generic(parameters, self._interfaces.TapIface)
641 def _make_netpipe(self, parameters):
642 return self._make_generic(parameters, self._interfaces.NetPipe)
644 def _make_internet(self, parameters):
645 return self._make_generic(parameters, self._interfaces.Internet)
647 def _make_application(self, parameters):
648 return self._make_generic(parameters, self._app.Application)
650 def _make_dependency(self, parameters):
651 return self._make_generic(parameters, self._app.Dependency)
653 def _make_nepi_dependency(self, parameters):
654 return self._make_generic(parameters, self._app.NepiDependency)
656 def _make_ns3_dependency(self, parameters):
657 return self._make_generic(parameters, self._app.NS3Dependency)