From 684c8f0f3d2d3e6ff58897a2fca102c963d482c7 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Wed, 8 Jun 2011 13:58:23 +0200 Subject: [PATCH] Ticket #45: spanning tree deployment --- setup.py | 1 + src/nepi/testbeds/planetlab/application.py | 378 +++++++++++++++++--- src/nepi/testbeds/planetlab/execute.py | 178 ++++++++- src/nepi/testbeds/planetlab/metadata_v01.py | 4 +- src/nepi/testbeds/planetlab/tunproto.py | 17 +- src/nepi/util/graphtools/__init__.py | 1 + src/nepi/util/graphtools/mst.py | 148 ++++++++ src/nepi/util/ipaddr2.py | 18 + src/nepi/util/parallel.py | 91 +++++ test/testbeds/planetlab/integration.py | 94 ++++- 10 files changed, 870 insertions(+), 60 deletions(-) create mode 100644 src/nepi/util/graphtools/__init__.py create mode 100644 src/nepi/util/graphtools/mst.py create mode 100644 src/nepi/util/parallel.py diff --git a/setup.py b/setup.py index e6690a09..80b9299b 100755 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ setup( "nepi.core", "nepi.util.parser", "nepi.util.settools", + "nepi.util.graphtools", "nepi.util" ], package_dir = {"": "src"}, package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"], diff --git a/src/nepi/testbeds/planetlab/application.py b/src/nepi/testbeds/planetlab/application.py index 72f83a84..50167aca 100644 --- a/src/nepi/testbeds/planetlab/application.py +++ b/src/nepi/testbeds/planetlab/application.py @@ -6,10 +6,15 @@ import plcapi import operator import os import os.path +import sys import nepi.util.server as server import cStringIO import subprocess import rspawn +import random +import time +import socket +import threading from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \ STATUS_FINISHED @@ -64,12 +69,23 @@ class Dependency(object): self._setuper = None self._pid = None self._ppid = None + + # Spanning tree deployment + self._master = None + self._master_passphrase = None + self._master_prk = None + self._master_puk = None + self._master_token = ''.join(map(chr,[rng.randint(0,255) + for rng in (random.SystemRandom(),) + for i in xrange(8)] )).encode("hex") + self._build_pid = None + self._build_ppid = None + def __str__(self): return "%s<%s>" % ( self.__class__.__name__, - ' '.join(list(self.depends or []) - + list(self.sources or [])) + ' '.join(filter(bool,(self.depends, self.sources))) ) def validate(self): @@ -127,13 +143,20 @@ class Dependency(object): def setup(self): self._make_home() - self._build() + self._launch_build() + self._finish_build() self._setup = True def async_setup(self): if not self._setuper: + def setuper(): + try: + self.setup() + except: + self._setuper._exc.append(sys.exc_info()) self._setuper = threading.Thread( - target = self.setup) + target = setuper) + self._setuper._exc = [] self._setuper.start() def async_setup_wait(self): @@ -141,7 +164,11 @@ class Dependency(object): if self._setuper: self._setuper.join() if not self._setup: - raise RuntimeError, "Failed to setup application" + if self._setuper._exc: + exctyp,exval,exctrace = self._setuper._exc[0] + raise exctyp,exval,exctrace + else: + raise RuntimeError, "Failed to setup application" else: self.setup() @@ -149,7 +176,7 @@ class Dependency(object): # Make sure all the paths are created where # they have to be created for deployment (out,err),proc = server.popen_ssh_command( - "mkdir -p %s" % (server.shell_escape(self.home_path),), + "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" % { 'home' : server.shell_escape(self.home_path) }, host = self.node.hostname, port = None, user = self.node.slicename, @@ -159,8 +186,7 @@ class Dependency(object): ) if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) - + raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,) if self.stdin: # Write program input @@ -175,7 +201,7 @@ class Dependency(object): ) if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,) def _replace_paths(self, command): """ @@ -187,28 +213,131 @@ class Dependency(object): .replace("${SOURCES}", root+server.shell_escape(self.home_path)) .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) ) - def _build(self): - if self.sources: - sources = self.sources.split(' ') + def _launch_build(self): + if self._master is not None: + self._do_install_keys() + buildscript = self._do_build_slave() + else: + buildscript = self._do_build_master() - # Copy all sources + if buildscript is not None: + # upload build script (out,err),proc = server.popen_scp( - sources, - "%s@%s:%s" % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path,'.'),), + buildscript, + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, 'nepi-build.sh') ), + port = None, + agent = None, ident_key = self.node.ident_path, server_key = self.node.server_key ) - + if proc.wait(): - raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,) + raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,) - if self.buildDepends: - # Install build dependencies - (out,err),proc = server.popen_ssh_command( - "sudo -S yum -y install %(packages)s" % { - 'packages' : self.buildDepends - }, + # launch build + self._do_launch_build() + + def _finish_build(self): + self._do_wait_build() + self._do_install() + + def _do_build_slave(self): + if not self.sources and not self.build: + return None + + # Create build script + files = set() + + if self.sources: + sources = self.sources.split(' ') + files.update( + "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostname, + os.path.join(self._master.home_path, os.path.basename(source)),) + for source in sources + ) + + if self.build: + files.add( + "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostname, + os.path.join(self._master.home_path, 'build.tar.gz'),) + ) + + launch_agent = "{ ( echo -e '#!/bin/sh\\ncat' > .ssh-askpass ) && chmod u+x .ssh-askpass"\ + " && export SSH_ASKPASS=$(pwd)/.ssh-askpass "\ + " && ssh-agent > .ssh-agent.sh ; } && . ./.ssh-agent.sh && ( echo $NEPI_MASTER_PASSPHRASE | ssh-add %(prk)s ) && rm -rf %(prk)s %(puk)s" % \ + { + 'prk' : server.shell_escape(self._master_prk_name), + 'puk' : server.shell_escape(self._master_puk_name), + } + + kill_agent = "kill $SSH_AGENT_PID" + + waitmaster = "{ . ./.ssh-agent.sh ; while [[ $(ssh -q -o UserKnownHostsFile=%(hostkey)s %(master)s cat %(token_path)s) != %(token)s ]] ; do sleep 5 ; done ; }" % { + 'hostkey' : 'master_known_hosts', + 'master' : "%s@%s" % (self._master.node.slicename, self._master.node.hostname), + 'token_path' : os.path.join(self._master.home_path, 'build.token'), + 'token' : server.shell_escape(self._master._master_token), + } + + syncfiles = "scp -p -o UserKnownHostsFile=%(hostkey)s %(files)s ." % { + 'hostkey' : 'master_known_hosts', + 'files' : ' '.join(files), + } + if self.build: + syncfiles += " && tar xzf build.tar.gz" + syncfiles += " && ( echo %s > build.token )" % (server.shell_escape(self._master_token),) + syncfiles = "{ . ./.ssh-agent.sh ; %s ; }" % (syncfiles,) + + cleanup = "{ . ./.ssh-agent.sh ; kill $SSH_AGENT_PID ; rm -rf %(prk)s %(puk)s master_known_hosts .ssh-askpass ; }" % { + 'prk' : server.shell_escape(self._master_prk_name), + 'puk' : server.shell_escape(self._master_puk_name), + } + + slavescript = "( ( %(launch_agent)s && %(waitmaster)s && %(syncfiles)s && %(kill_agent)s && %(cleanup)s ) || %(cleanup)s )" % { + 'waitmaster' : waitmaster, + 'syncfiles' : syncfiles, + 'cleanup' : cleanup, + 'kill_agent' : kill_agent, + 'launch_agent' : launch_agent, + 'home' : server.shell_escape(self.home_path), + } + + return cStringIO.StringIO(slavescript) + + def _do_launch_build(self): + script = "bash ./nepi-build.sh" + if self._master_passphrase: + script = "NEPI_MASTER_PASSPHRASE=%s %s" % ( + server.shell_escape(self._master_passphrase), + script + ) + (out,err),proc = rspawn.remote_spawn( + script, + + pidfile = 'build-pid', + home = self.home_path, + stdin = '/dev/null', + stdout = 'buildlog', + stderr = rspawn.STDOUT, + + host = self.node.hostname, + port = None, + user = self.node.slicename, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key + ) + + if proc.wait(): + raise RuntimeError, "Failed to set up build slave %s: %s %s" % (self.home_path, out,err,) + + + pid = ppid = None + delay = 1.0 + for i in xrange(5): + pidtuple = rspawn.remote_check_pid( + os.path.join(self.home_path,'build-pid'), host = self.node.hostname, port = None, user = self.node.slicename, @@ -216,17 +345,46 @@ class Dependency(object): ident_key = self.node.ident_path, server_key = self.node.server_key ) - - if proc.wait(): - raise RuntimeError, "Failed instal build dependencies: %s %s" % (out,err,) - - if self.build: - # Build sources + if pidtuple: + pid, ppid = pidtuple + self._build_pid, self._build_ppid = pidtuple + break + else: + time.sleep(delay) + delay = min(30,delay*1.2) + else: + raise RuntimeError, "Failed to set up build slave %s: cannot get pid" % (self.home_path,) + + def _do_wait_build(self): + pid = self._build_pid + ppid = self._build_ppid + + if pid and ppid: + delay = 1.0 + while True: + status = rspawn.remote_status( + pid, ppid, + host = self.node.hostname, + port = None, + user = self.node.slicename, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key + ) + + if status is not rspawn.RUNNING: + self._build_pid = self._build_ppid = None + break + else: + time.sleep(delay*(0.5+random.random())) + delay = min(30,delay*1.2) + + # check build token + (out,err),proc = server.popen_ssh_command( - "cd %(home)s && mkdir -p build && cd build && ( %(command)s ) > ${HOME}/%(home)s/buildlog 2>&1 || ( tail ${HOME}/%(home)s/buildlog >&2 && false )" % { - 'command' : self._replace_paths(self.build), - 'home' : server.shell_escape(self.home_path), + "cat %(token_path)s" % { + 'token_path' : os.path.join(self.home_path, 'build.token'), }, host = self.node.hostname, port = None, @@ -235,27 +393,101 @@ class Dependency(object): ident_key = self.node.ident_path, server_key = self.node.server_key ) - - if proc.wait(): - raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,) + + slave_token = "" + if not proc.wait() and out: + slave_token = out.strip() + + if slave_token != self._master_token: + # Get buildlog for the error message - # Make archive - (out,err),proc = server.popen_ssh_command( - "cd %(home)s && tar czf build.tar.gz build" % { - 'command' : self._replace_paths(self.build), - 'home' : server.shell_escape(self.home_path), - }, + (buildlog,err),proc = server.popen_ssh_command( + "cat %(buildlog)s" % { + 'buildlog' : os.path.join(self.home_path, 'buildlog'), + 'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'), + }, + host = self.node.hostname, + port = None, + user = self.node.slicename, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key + ) + + proc.wait() + + raise RuntimeError, "Failed to set up application %s: "\ + "build failed, got wrong token from pid %s/%s "\ + "(expected %r, got %r), see buildlog: %s" % ( + self.home_path, pid, ppid, self._master_token, slave_token, buildlog) + + def _do_kill_build(self): + pid = self._build_pid + ppid = self._build_ppid + + if pid and ppid: + rspawn.remote_kill( + pid, ppid, host = self.node.hostname, port = None, user = self.node.slicename, agent = None, + ident_key = self.node.ident_path + ) + + + def _do_build_master(self): + if not self.sources and not self.build and not self.buildDepends: + return None + + if self.sources: + sources = self.sources.split(' ') + + # Copy all sources + (out,err),proc = server.popen_scp( + sources, + "%s@%s:%s" % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path,'.'),), ident_key = self.node.ident_path, server_key = self.node.server_key ) if proc.wait(): - raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,) + raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,) + + buildscript = cStringIO.StringIO() + + if self.buildDepends: + # Install build dependencies + buildscript.write( + "sudo -S yum -y install %(packages)s\n" % { + 'packages' : self.buildDepends + } + ) + + + if self.build: + # Build sources + buildscript.write( + "mkdir -p build && ( cd build && ( %(command)s ) )\n" % { + 'command' : self._replace_paths(self.build), + 'home' : server.shell_escape(self.home_path), + } + ) + + # Make archive + buildscript.write( + "tar czf build.tar.gz build && ( echo %(master_token)s > build.token )\n" % { + 'master_token' : server.shell_escape(self._master_token) + } + ) + + buildscript.seek(0) + return buildscript + + + def _do_install(self): if self.install: # Install application (out,err),proc = server.popen_ssh_command( @@ -274,6 +506,57 @@ class Dependency(object): if proc.wait(): raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,) + def set_master(self, master): + self._master = master + + def install_keys(self, prk, puk, passphrase): + # Install keys + self._master_passphrase = passphrase + self._master_prk = prk + self._master_puk = puk + self._master_prk_name = os.path.basename(prk.name) + self._master_puk_name = os.path.basename(puk.name) + + def _do_install_keys(self): + prk = self._master_prk + puk = self._master_puk + + (out,err),proc = server.popen_scp( + [ prk.name, puk.name ], + '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path ), + port = None, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key + ) + + if proc.wait(): + raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,) + + (out,err),proc = server.popen_scp( + cStringIO.StringIO('%s,%s %s\n' % ( + self._master.node.hostname, socket.gethostbyname(self._master.node.hostname), + self._master.node.server_key)), + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path,"master_known_hosts") ), + port = None, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key + ) + + if proc.wait(): + raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,) + + # No longer need'em + self._master_prk = None + self._master_puk = None + + def cleanup(self): + # make sure there's no leftover build processes + self._do_kill_build() + + class Application(Dependency): """ An application also has dependencies, but also a command to be ran and monitored. @@ -397,7 +680,8 @@ class Application(Dependency): port = None, user = self.node.slicename, agent = None, - ident_key = self.node.ident_path + ident_key = self.node.ident_path, + server_key = self.node.server_key ) if status is rspawn.NOT_STARTED: @@ -448,7 +732,7 @@ class NepiDependency(Dependency): tarname = os.path.basename(self.tarball.name) # it's already built - just move the tarball into place - self.build = "mv ${SOURCES}/%s ." % (tarname,) + self.build = "mv -f ${SOURCES}/%s ." % (tarname,) # unpack it into sources, and we're done self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,) @@ -492,12 +776,12 @@ class NS3Dependency(Dependency): def __init__(self, api = None): super(NS3Dependency, self).__init__(api) - self.buildDepends = 'build-essential waf gcc gcc-c++ gccxml unzip' + self.buildDepends = 'make waf gcc gcc-c++ gccxml unzip' # We have to download the sources, untar, build... pybindgen_source_url = "http://pybindgen.googlecode.com/files/pybindgen-0.15.0.zip" pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip" - ns3_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/ns-3-dev/archive/tip.tar.gz" + ns3_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/nepi-ns-3.9/archive/tip.tar.gz" passfd_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/tip.tar.gz" self.build =( " ( " diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index f6693bf9..aa17562b 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -4,9 +4,26 @@ from constants import TESTBED_ID from nepi.core import testbed_impl from nepi.util.constants import TIME_NOW +from nepi.util.graphtools import mst +from nepi.util import ipaddr2 import os +import os.path import time import resourcealloc +import collections +import operator +import functools +import socket +import struct +import tempfile +import subprocess +import random +import shutil + +from nepi.util.constants import TESTBED_STATUS_CONFIGURED + +class TempKeyError(Exception): + pass class TestbedController(testbed_impl.TestbedController): def __init__(self, testbed_version): @@ -63,12 +80,24 @@ class TestbedController(testbed_impl.TestbedController): get_attribute_value("authPass") self.sliceSSHKey = self._attributes.\ get_attribute_value("sliceSSHKey") + self.sliceSSHKeyPass = None self.plcHost = self._attributes.\ get_attribute_value("plcHost") self.plcUrl = self._attributes.\ get_attribute_value("plcUrl") super(TestbedController, self).do_setup() + def do_post_asynclaunch(self, guid): + # Dependencies were launched asynchronously, + # so wait for them + dep = self._elements[guid] + if isinstance(dep, self._app.Dependency): + dep.async_setup_wait() + + # Two-phase configuration for asynchronous launch + do_poststep_preconfigure = staticmethod(do_post_asynclaunch) + do_poststep_configure = staticmethod(do_post_asynclaunch) + def do_preconfigure(self): # Perform resource discovery if we don't have # specific resources assigned yet @@ -76,6 +105,9 @@ class TestbedController(testbed_impl.TestbedController): # Create PlanetLab slivers self.do_provisioning() + + # Plan application deployment + self.do_spanning_deployment_plan() # Configure elements per XML data super(TestbedController, self).do_preconfigure() @@ -146,7 +178,151 @@ class TestbedController(testbed_impl.TestbedController): # cleanup del self._to_provision - + + def do_spanning_deployment_plan(self): + # Create application groups by collecting all applications + # based on their hash - the hash should contain everything that + # defines them and the platform they're built + + def dephash(app): + return ( + frozenset((app.depends or "").split(' ')), + frozenset((app.sources or "").split(' ')), + app.build, + app.install, + app.node.architecture, + app.node.operatingSystem, + app.node.pl_distro, + ) + + depgroups = collections.defaultdict(list) + + for element in self._elements.itervalues(): + if isinstance(element, self._app.Dependency): + depgroups[dephash(element)].append(element) + + # Set up spanning deployment for those applications that + # have been deployed in several nodes. + for dh, group in depgroups.iteritems(): + if len(group) > 1: + # Pick root (deterministically) + root = min(group, key=lambda app:app.node.hostname) + + # Obtain all IPs in numeric format + # (which means faster distance computations) + for dep in group: + dep._ip = socket.gethostbyname(dep.node.hostname) + dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0] + + # Compute plan + # NOTE: the plan is an iterator + plan = mst.mst( + group, + lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n), + root = root, + maxbranching = 2) + + # Re-sign private key + try: + tempprk, temppuk, tmppass = self._make_temp_private_key() + except TempKeyError: + continue + + # Set up slaves + plan = list(plan) + for slave, master in plan: + slave.set_master(master) + slave.install_keys(tempprk, temppuk, tmppass) + + # We don't need the user's passphrase anymore + self.sliceSSHKeyPass = None + + def _make_temp_private_key(self): + # Get the user's key's passphrase + if not self.sliceSSHKeyPass: + if 'SSH_ASKPASS' in os.environ: + proc = subprocess.Popen( + [ os.environ['SSH_ASKPASS'], + "Please type the passphrase for the %s SSH identity file. " + "The passphrase will be used to re-cipher the identity file with " + "a random 256-bit key for automated chain deployment on the " + "%s PlanetLab slice" % ( + os.path.basename(self.sliceSSHKey), + self.slicename + ) ], + stdin = open("/dev/null"), + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + out,err = proc.communicate() + self.sliceSSHKeyPass = out.strip() + + if not self.sliceSSHKeyPass: + raise TempKeyError + + # Create temporary key files + prk = tempfile.NamedTemporaryFile( + dir = self.root_directory, + prefix = "pl_deploy_tmpk_", + suffix = "") + + puk = tempfile.NamedTemporaryFile( + dir = self.root_directory, + prefix = "pl_deploy_tmpk_", + suffix = ".pub") + + # Create secure 256-bits temporary passphrase + passphrase = ''.join(map(chr,[rng.randint(0,255) + for rng in (random.SystemRandom(),) + for i in xrange(32)] )).encode("hex") + + # Copy keys + oprk = open(self.sliceSSHKey, "rb") + opuk = open(self.sliceSSHKey+".pub", "rb") + shutil.copymode(oprk.name, prk.name) + shutil.copymode(opuk.name, puk.name) + shutil.copyfileobj(oprk, prk) + shutil.copyfileobj(opuk, puk) + prk.flush() + puk.flush() + oprk.close() + opuk.close() + + # A descriptive comment + comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename) + + # Recipher keys + proc = subprocess.Popen( + ["ssh-keygen", "-p", + "-f", prk.name, + "-P", self.sliceSSHKeyPass, + "-N", passphrase, + "-C", comment ], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + stdin = subprocess.PIPE + ) + out, err = proc.communicate() + + if err: + raise RuntimeError, "Problem generating keys: \n%s\n%r" % ( + out, err) + + prk.seek(0) + puk.seek(0) + + # Change comment on public key + puklines = puk.readlines() + puklines[0] = puklines[0].split(' ') + puklines[0][-1] = comment+'\n' + puklines[0] = ' '.join(puklines[0]) + puk.seek(0) + puk.truncate() + puk.writelines(puklines) + del puklines + puk.flush() + + return prk, puk, passphrase + def set(self, guid, name, value, time = TIME_NOW): super(TestbedController, self).set(guid, name, value, time) # TODO: take on account schedule time for the task diff --git a/src/nepi/testbeds/planetlab/metadata_v01.py b/src/nepi/testbeds/planetlab/metadata_v01.py index 885c181b..85ae0993 100644 --- a/src/nepi/testbeds/planetlab/metadata_v01.py +++ b/src/nepi/testbeds/planetlab/metadata_v01.py @@ -409,7 +409,7 @@ def configure_application(testbed_instance, guid): app.node.wait_dependencies() # Install stuff - app.setup() + app.async_setup() def configure_dependency(testbed_instance, guid): dep = testbed_instance._elements[guid] @@ -421,7 +421,7 @@ def configure_dependency(testbed_instance, guid): dep.node.wait_dependencies() # Install stuff - dep.setup() + dep.async_setup() def configure_netpipe(testbed_instance, guid): netpipe = testbed_instance._elements[guid] diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index a836d44d..6e518f4a 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -222,6 +222,16 @@ class TunProtoBase(object): self._started = True def _launch_and_wait(self, *p, **kw): + try: + self.__launch_and_wait(*p, **kw) + except: + if self._launcher: + import sys + self._launcher._exc.append(sys.exc_info()) + else: + raise + + def __launch_and_wait(self, *p, **kw): local = self.local() self.launch(*p, **kw) @@ -287,13 +297,18 @@ class TunProtoBase(object): self._launcher = threading.Thread( target = self._launch_and_wait, args = (check_proto, listen, extra_args)) + self._launcher._exc = [] self._launcher.start() def async_launch_wait(self): if self._launcher: self._launcher.join() if not self._started: - raise RuntimeError, "Failed to launch TUN forwarder" + if self._launcher._exc: + exctyp,exval,exctrace = self._launcher._exc[0] + raise exctyp,exval,exctrace + else: + raise RuntimeError, "Failed to launch TUN forwarder" elif not self._started: self.launch() diff --git a/src/nepi/util/graphtools/__init__.py b/src/nepi/util/graphtools/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/src/nepi/util/graphtools/__init__.py @@ -0,0 +1 @@ + diff --git a/src/nepi/util/graphtools/mst.py b/src/nepi/util/graphtools/mst.py new file mode 100644 index 00000000..1c59b2fc --- /dev/null +++ b/src/nepi/util/graphtools/mst.py @@ -0,0 +1,148 @@ +import random +import bisect + +def mst(nodes, connected, + maxsoftbranching = None, + maxbranching = None, + root = None, + untie = lambda l : iter(l).next()): + """ + Returns an iterator over pairs (Node, Parent) + which form the spanning tree. + + Params: + + nodes: a list of nodes (can be anything) + + connected: a callable that takes two nodes + and returns either an edge weight (one + that can be compared with '<' with other + edge weights) or None if they're not + connected. + + maxbranching: the maximum number of branches + (children) allowed for a node. None for + no limit. + When maxbranching is used, the algorithm + implemented here gives no guarantee + of optimality (the spanning tree may not + be the minimum), as that problem becomes + NP-hard and we want a quick answer. + + maxsoftbranching: soft branching limit. + The algorithm is allowed to break it + if it has no other choice. Trees build with + soft branching limits are usually less + balanced than when using hard limits, + but the computation takes a lot less time. + + root: the desired root of the spanning tree, + or None to pick a random one. + + untie: a callable that, given an iterable + of candidate entries of equal weight for + the selection to be made, picks one to + be added to the spanning tree. The default + picks arbitrarily. + Entries are of the form (,,) + with and being indices in the + nodes array + """ + + if not nodes: + return + + if root is None: + root = random.sample(nodes, 1)[0] + + # We want the root's index + root = nodes.index(root) + + # Unpicked nodes, nodes we still have to add. + unpicked = set(xrange(len(nodes))) + + # Distance maps + # We need: + # min distance to picked node + # which picked node + # Or None if it was a picked or unconnected node + + N = len(nodes) + distance = [None] * N + which = [None] * N + + # Count branches + branching = [0] * N + + # Initialize with distances to root + def update_distance_map(fornode): + ref = nodes[fornode] + for other, prevdistance in enumerate(distance): + other_node = nodes[other] + d = connected(ref, other_node) + if d is not None: + if prevdistance is None or prevdistance > d: + distance[other] = d + which[other] = fornode + distance[fornode] = None + which[fornode] = None + + update_distance_map(root) + unpicked.remove(root) + + # Add remaining nodes, yield edges + def minrange(dsorted): + return dsorted[:bisect.bisect(dsorted, (dsorted[0][0], N, N))] + + needsrebuild = False + while unpicked: + # Rebuild the distance map if needed + # (ie, when a node in the partial MST is no longer + # a candidate for adjoining because of saturation) + if needsrebuild: + print "Rebuilding distance map..." + distance = [None] * N + which = [None] * N + for n in xrange(N): + if n not in unpicked and branching[n] < maxbranching: + update_distance_map(n) + + # Pick the closest unpicked node + dsorted = [(d,i,w) for i,(d,w) in enumerate(zip(distance, which)) + if d is not None + and i in unpicked + and (maxbranching is None or branching[w] < maxbranching) + and (maxsoftbranching is None or branching[w] < maxsoftbranching)] + if not dsorted and maxsoftbranching is not None: + dsorted = [(d,i,w) for i,(d,w) in enumerate(zip(distance, which)) + if d is not None + and i in unpicked + and (maxbranching is None or branching[w] < maxbranching)] + if not dsorted: + raise AssertionError, "Unconnected graph" + + dsorted.sort() + dsorted = minrange(dsorted) + + if len(dsorted) > 1: + winner = untie(dsorted) + elif dsorted: + winner = dsorted[0] + else: + raise AssertionError, "Unconnected graph" + + weight, edgefrom, edgeto = winner + + branching[edgeto] += 1 + + if maxbranching is not None and branching[edgeto] == maxbranching: + needsrebuild = True + + # Yield edge, update distance map to account + # for the picked node + yield (nodes[edgefrom], nodes[edgeto]) + + update_distance_map(edgefrom) + unpicked.remove(edgefrom) + + diff --git a/src/nepi/util/ipaddr2.py b/src/nepi/util/ipaddr2.py index 88700be1..a01067d0 100644 --- a/src/nepi/util/ipaddr2.py +++ b/src/nepi/util/ipaddr2.py @@ -25,3 +25,21 @@ def ipv4_mask2dot(mask): mask = '.'.join(map(str,map(ord,mask))) return mask +def ipdist(a,b): + a = struct.unpack('!L',socket.inet_aton(a))[0] + b = struct.unpack('!L',socket.inet_aton(b))[0] + d = 32 + while d and (b&0x80000000)==(a&0x80000000): + a <<= 1 + b <<= 1 + d -= 1 + return d + +def ipdistn(a,b): + d = 32 + while d and (b&0x80000000)==(a&0x80000000): + a <<= 1 + b <<= 1 + d -= 1 + return d + diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py new file mode 100644 index 00000000..d1169a55 --- /dev/null +++ b/src/nepi/util/parallel.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import threading +import Queue +import traceback +import sys + +N_PROCS = None + +class ParallelMap(object): + def __init__(self, maxthreads = None, maxqueue = None, results = True): + global N_PROCS + + if maxthreads is None: + if N_PROCS is None: + try: + f = open("/proc/cpuinfo") + try: + N_PROCS = sum("processor" in l for l in f) + finally: + f.close() + except: + pass + maxthreads = N_PROCS + + if maxthreads is None: + maxthreads = 4 + + self.queue = Queue.Queue(maxqueue or 0) + + self.workers = [ threading.Thread(target = self.worker) + for x in xrange(maxthreads) ] + + if results: + self.rvqueue = Queue.Queue() + else: + self.rvqueue = None + + def put(self, callable, *args, **kwargs): + self.queue.put((callable, args, kwargs)) + + def put_nowait(self, callable, *args, **kwargs): + self.queue.put_nowait((callable, args, kwargs)) + + def start(self): + for thread in self.workers: + thread.start() + + def join(self): + for thread in self.workers: + # That's the shutdown signal + self.queue.put(None) + + self.queue.join() + for thread in self.workers: + thread.join() + + def worker(self): + while True: + task = self.queue.get() + if task is None: + self.queue.task_done() + break + + try: + try: + callable, args, kwargs = task + rv = callable(*args, **kwargs) + + if self.rvqueue is not None: + self.rvqueue.put(rv) + finally: + self.queue.task_done() + except: + traceback.print_exc(file = sys.stderr) + + def __iter__(self): + if self.rvqueue is not None: + while True: + try: + yield self.rvqueue.get_nowait() + except Queue.Empty: + self.queue.join() + try: + yield self.rvqueue.get_nowait() + except Queue.Empty: + raise StopIteration + + + diff --git a/test/testbeds/planetlab/integration.py b/test/testbeds/planetlab/integration.py index 2786285d..60eba709 100755 --- a/test/testbeds/planetlab/integration.py +++ b/test/testbeds/planetlab/integration.py @@ -12,6 +12,7 @@ import test_util import time import unittest import re +import sys class PlanetLabIntegrationTestCase(unittest.TestCase): testbed_id = "planetlab" @@ -21,11 +22,14 @@ class PlanetLabIntegrationTestCase(unittest.TestCase): host1 = "nepi1.pl.sophia.inria.fr" host2 = "nepi2.pl.sophia.inria.fr" + host3 = "nepi3.pl.sophia.inria.fr" + host4 = "nepi5.pl.sophia.inria.fr" def setUp(self): self.root_dir = tempfile.mkdtemp() def tearDown(self): + return try: shutil.rmtree(self.root_dir) except: @@ -79,19 +83,91 @@ class PlanetLabIntegrationTestCase(unittest.TestCase): xml = exp.to_xml() controller = ExperimentController(xml, self.root_dir) - controller.start() - while not controller.is_finished(app.guid): - time.sleep(0.5) - ping_result = controller.trace(app.guid, "stdout") - comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data. + try: + controller.start() + while not controller.is_finished(app.guid): + time.sleep(0.5) + ping_result = controller.trace(app.guid, "stdout") + comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data. --- .* ping statistics --- 1 packets transmitted, 1 received, 0% packet loss, time \d*ms.* """ - self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE), - "Unexpected trace:\n" + ping_result) - controller.stop() - controller.shutdown() + self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE), + "Unexpected trace:\n" + ping_result) + + finally: + controller.stop() + controller.shutdown() + + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_spanning_deployment(self): + pl, exp = self.make_experiment_desc() + + from nepi.testbeds import planetlab as plpackage + + nodes = [ pl.create("Node") for i in xrange(4) ] + ifaces = [ pl.create("NodeInterface") for node in nodes ] + inet = pl.create("Internet") + for node, iface in zip(nodes,ifaces): + node.connector("devs").connect(iface.connector("node")) + iface.connector("inet").connect(inet.connector("devs")) + + apps = [] + for node in nodes: + app = pl.create("Application") + app.set_attribute_value("command", "./consts") + app.set_attribute_value("buildDepends", "gcc") + app.set_attribute_value("build", "gcc ${SOURCES}/consts.c -o consts") + app.set_attribute_value("install", "cp consts ${SOURCES}/consts") + app.set_attribute_value("sources", os.path.join( + os.path.dirname(plpackage.__file__),'scripts','consts.c')) + app.enable_trace("stdout") + app.enable_trace("stderr") + app.enable_trace("buildlog") + node.connector("apps").connect(app.connector("node")) + apps.append(app) + + comp_result = \ +r""".*ETH_P_ALL = 0x[0-9a-fA-F]{8} +ETH_P_IP = 0x[0-9a-fA-F]{8} +TUNGETIFF = 0x[0-9a-fA-F]{8} +TUNSETIFF = 0x[0-9a-fA-F]{8} +IFF_NO_PI = 0x[0-9a-fA-F]{8} +IFF_TAP = 0x[0-9a-fA-F]{8} +IFF_TUN = 0x[0-9a-fA-F]{8} +IFF_VNET_HDR = 0x[0-9a-fA-F]{8} +TUN_PKT_STRIP = 0x[0-9a-fA-F]{8} +IFHWADDRLEN = 0x[0-9a-fA-F]{8} +IFNAMSIZ = 0x[0-9a-fA-F]{8} +IFREQ_SZ = 0x[0-9a-fA-F]{8} +FIONREAD = 0x[0-9a-fA-F]{8}.* +""" + + comp_build = r".*(Identity added|gcc).*" + + xml = exp.to_xml() + + controller = ExperimentController(xml, self.root_dir) + try: + controller.start() + while not all(controller.is_finished(app.guid) for app in apps): + time.sleep(0.5) + + for app in apps: + app_result = controller.trace(app.guid, "stdout") or "" + self.assertTrue(re.match(comp_result, app_result, re.MULTILINE), + "Unexpected trace:\n" + app_result) + + build_result = controller.trace(app.guid, "buildlog") or "" + self.assertTrue(re.match(comp_build, build_result, re.MULTILINE | re.DOTALL), + "Unexpected trace:\n" + build_result) + + finally: + controller.stop() + controller.shutdown() + if __name__ == '__main__': unittest.main() -- 2.43.0