Ticket #45: spanning tree deployment
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 8 Jun 2011 11:58:23 +0000 (13:58 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 8 Jun 2011 11:58:23 +0000 (13:58 +0200)
setup.py
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/graphtools/__init__.py [new file with mode: 0644]
src/nepi/util/graphtools/mst.py [new file with mode: 0644]
src/nepi/util/ipaddr2.py
src/nepi/util/parallel.py [new file with mode: 0644]
test/testbeds/planetlab/integration.py

index e6690a0..80b9299 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -19,6 +19,7 @@ setup(
             "nepi.core",
             "nepi.util.parser",
             "nepi.util.settools",
+            "nepi.util.graphtools",
             "nepi.util" ],
         package_dir = {"": "src"},
         package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"],
index 72f83a8..50167ac 100644 (file)
@@ -6,10 +6,15 @@ import plcapi
 import operator
 import os
 import os.path
+import sys
 import nepi.util.server as server
 import cStringIO
 import subprocess
 import rspawn
+import random
+import time
+import socket
+import threading
 
 from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
         STATUS_FINISHED
@@ -64,12 +69,23 @@ class Dependency(object):
         self._setuper = None
         self._pid = None
         self._ppid = None
+
+        # Spanning tree deployment
+        self._master = None
+        self._master_passphrase = None
+        self._master_prk = None
+        self._master_puk = None
+        self._master_token = ''.join(map(chr,[rng.randint(0,255) 
+                                      for rng in (random.SystemRandom(),)
+                                      for i in xrange(8)] )).encode("hex")
+        self._build_pid = None
+        self._build_ppid = None
+        
     
     def __str__(self):
         return "%s<%s>" % (
             self.__class__.__name__,
-            ' '.join(list(self.depends or [])
-                   + list(self.sources or []))
+            ' '.join(filter(bool,(self.depends, self.sources)))
         )
     
     def validate(self):
@@ -127,13 +143,20 @@ class Dependency(object):
 
     def setup(self):
         self._make_home()
-        self._build()
+        self._launch_build()
+        self._finish_build()
         self._setup = True
     
     def async_setup(self):
         if not self._setuper:
+            def setuper():
+                try:
+                    self.setup()
+                except:
+                    self._setuper._exc.append(sys.exc_info())
             self._setuper = threading.Thread(
-                target = self.setup)
+                target = setuper)
+            self._setuper._exc = []
             self._setuper.start()
     
     def async_setup_wait(self):
@@ -141,7 +164,11 @@ class Dependency(object):
             if self._setuper:
                 self._setuper.join()
                 if not self._setup:
-                    raise RuntimeError, "Failed to setup application"
+                    if self._setuper._exc:
+                        exctyp,exval,exctrace = self._setuper._exc[0]
+                        raise exctyp,exval,exctrace
+                    else:
+                        raise RuntimeError, "Failed to setup application"
             else:
                 self.setup()
         
@@ -149,7 +176,7 @@ class Dependency(object):
         # Make sure all the paths are created where 
         # they have to be created for deployment
         (out,err),proc = server.popen_ssh_command(
-            "mkdir -p %s" % (server.shell_escape(self.home_path),),
+            "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" % { 'home' : server.shell_escape(self.home_path) },
             host = self.node.hostname,
             port = None,
             user = self.node.slicename,
@@ -159,8 +186,7 @@ class Dependency(object):
             )
         
         if proc.wait():
-            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-        
+            raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
         
         if self.stdin:
             # Write program input
@@ -175,7 +201,7 @@ class Dependency(object):
                 )
             
             if proc.wait():
-                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+                raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
 
     def _replace_paths(self, command):
         """
@@ -187,28 +213,131 @@ class Dependency(object):
             .replace("${SOURCES}", root+server.shell_escape(self.home_path))
             .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) )
 
-    def _build(self):
-        if self.sources:
-            sources = self.sources.split(' ')
+    def _launch_build(self):
+        if self._master is not None:
+            self._do_install_keys()
+            buildscript = self._do_build_slave()
+        else:
+            buildscript = self._do_build_master()
             
-            # Copy all sources
+        if buildscript is not None:
+            # upload build script
             (out,err),proc = server.popen_scp(
-                sources,
-                "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
-                    os.path.join(self.home_path,'.'),),
+                buildscript,
+                '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
+                    os.path.join(self.home_path, 'nepi-build.sh') ),
+                port = None,
+                agent = None,
                 ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
-        
+            
             if proc.wait():
-                raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,)
+                raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
             
-        if self.buildDepends:
-            # Install build dependencies
-            (out,err),proc = server.popen_ssh_command(
-                "sudo -S yum -y install %(packages)s" % {
-                    'packages' : self.buildDepends
-                },
+            # launch build
+            self._do_launch_build()
+    
+    def _finish_build(self):
+        self._do_wait_build()
+        self._do_install()
+
+    def _do_build_slave(self):
+        if not self.sources and not self.build:
+            return None
+            
+        # Create build script
+        files = set()
+        
+        if self.sources:
+            sources = self.sources.split(' ')
+            files.update(
+                "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostname, 
+                    os.path.join(self._master.home_path, os.path.basename(source)),)
+                for source in sources
+            )
+        
+        if self.build:
+            files.add(
+                "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostname, 
+                    os.path.join(self._master.home_path, 'build.tar.gz'),)
+            )
+        
+        launch_agent = "{ ( echo -e '#!/bin/sh\\ncat' > .ssh-askpass ) && chmod u+x .ssh-askpass"\
+                        " && export SSH_ASKPASS=$(pwd)/.ssh-askpass "\
+                        " && ssh-agent > .ssh-agent.sh ; } && . ./.ssh-agent.sh && ( echo $NEPI_MASTER_PASSPHRASE | ssh-add %(prk)s ) && rm -rf %(prk)s %(puk)s" %  \
+        {
+            'prk' : server.shell_escape(self._master_prk_name),
+            'puk' : server.shell_escape(self._master_puk_name),
+        }
+        
+        kill_agent = "kill $SSH_AGENT_PID"
+        
+        waitmaster = "{ . ./.ssh-agent.sh ; while [[ $(ssh -q -o UserKnownHostsFile=%(hostkey)s %(master)s cat %(token_path)s) != %(token)s ]] ; do sleep 5 ; done ; }" % {
+            'hostkey' : 'master_known_hosts',
+            'master' : "%s@%s" % (self._master.node.slicename, self._master.node.hostname),
+            'token_path' : os.path.join(self._master.home_path, 'build.token'),
+            'token' : server.shell_escape(self._master._master_token),
+        }
+        
+        syncfiles = "scp -p -o UserKnownHostsFile=%(hostkey)s %(files)s ." % {
+            'hostkey' : 'master_known_hosts',
+            'files' : ' '.join(files),
+        }
+        if self.build:
+            syncfiles += " && tar xzf build.tar.gz"
+        syncfiles += " && ( echo %s > build.token )" % (server.shell_escape(self._master_token),)
+        syncfiles = "{ . ./.ssh-agent.sh ; %s ; }" % (syncfiles,)
+        
+        cleanup = "{ . ./.ssh-agent.sh ; kill $SSH_AGENT_PID ; rm -rf %(prk)s %(puk)s master_known_hosts .ssh-askpass ; }" % {
+            'prk' : server.shell_escape(self._master_prk_name),
+            'puk' : server.shell_escape(self._master_puk_name),
+        }
+        
+        slavescript = "( ( %(launch_agent)s && %(waitmaster)s && %(syncfiles)s && %(kill_agent)s && %(cleanup)s ) || %(cleanup)s )" % {
+            'waitmaster' : waitmaster,
+            'syncfiles' : syncfiles,
+            'cleanup' : cleanup,
+            'kill_agent' : kill_agent,
+            'launch_agent' : launch_agent,
+            'home' : server.shell_escape(self.home_path),
+        }
+        
+        return cStringIO.StringIO(slavescript)
+         
+    def _do_launch_build(self):
+        script = "bash ./nepi-build.sh"
+        if self._master_passphrase:
+            script = "NEPI_MASTER_PASSPHRASE=%s %s" % (
+                server.shell_escape(self._master_passphrase),
+                script
+            )
+        (out,err),proc = rspawn.remote_spawn(
+            script,
+            
+            pidfile = 'build-pid',
+            home = self.home_path,
+            stdin = '/dev/null',
+            stdout = 'buildlog',
+            stderr = rspawn.STDOUT,
+            
+            host = self.node.hostname,
+            port = None,
+            user = self.node.slicename,
+            agent = None,
+            ident_key = self.node.ident_path,
+            server_key = self.node.server_key
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up build slave %s: %s %s" % (self.home_path, out,err,)
+        
+        
+        pid = ppid = None
+        delay = 1.0
+        for i in xrange(5):
+            pidtuple = rspawn.remote_check_pid(
+                os.path.join(self.home_path,'build-pid'),
                 host = self.node.hostname,
                 port = None,
                 user = self.node.slicename,
@@ -216,17 +345,46 @@ class Dependency(object):
                 ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
-        
-            if proc.wait():
-                raise RuntimeError, "Failed instal build dependencies: %s %s" % (out,err,)
-        
             
-        if self.build:
-            # Build sources
+            if pidtuple:
+                pid, ppid = pidtuple
+                self._build_pid, self._build_ppid = pidtuple
+                break
+            else:
+                time.sleep(delay)
+                delay = min(30,delay*1.2)
+        else:
+            raise RuntimeError, "Failed to set up build slave %s: cannot get pid" % (self.home_path,)
+        
+    def _do_wait_build(self):
+        pid = self._build_pid
+        ppid = self._build_ppid
+        
+        if pid and ppid:
+            delay = 1.0
+            while True:
+                status = rspawn.remote_status(
+                    pid, ppid,
+                    host = self.node.hostname,
+                    port = None,
+                    user = self.node.slicename,
+                    agent = None,
+                    ident_key = self.node.ident_path,
+                    server_key = self.node.server_key
+                    )
+                
+                if status is not rspawn.RUNNING:
+                    self._build_pid = self._build_ppid = None
+                    break
+                else:
+                    time.sleep(delay*(0.5+random.random()))
+                    delay = min(30,delay*1.2)
+            
+            # check build token
+
             (out,err),proc = server.popen_ssh_command(
-                "cd %(home)s && mkdir -p build && cd build && ( %(command)s ) > ${HOME}/%(home)s/buildlog 2>&1 || ( tail ${HOME}/%(home)s/buildlog >&2 && false )" % {
-                    'command' : self._replace_paths(self.build),
-                    'home' : server.shell_escape(self.home_path),
+                "cat %(token_path)s" % {
+                    'token_path' : os.path.join(self.home_path, 'build.token'),
                 },
                 host = self.node.hostname,
                 port = None,
@@ -235,27 +393,101 @@ class Dependency(object):
                 ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
-        
-            if proc.wait():
-                raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+            
+            slave_token = ""
+            if not proc.wait() and out:
+                slave_token = out.strip()
+            
+            if slave_token != self._master_token:
+                # Get buildlog for the error message
 
-            # Make archive
-            (out,err),proc = server.popen_ssh_command(
-                "cd %(home)s && tar czf build.tar.gz build" % {
-                    'command' : self._replace_paths(self.build),
-                    'home' : server.shell_escape(self.home_path),
-                },
+                (buildlog,err),proc = server.popen_ssh_command(
+                    "cat %(buildlog)s" % {
+                        'buildlog' : os.path.join(self.home_path, 'buildlog'),
+                        'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
+                    },
+                    host = self.node.hostname,
+                    port = None,
+                    user = self.node.slicename,
+                    agent = None,
+                    ident_key = self.node.ident_path,
+                    server_key = self.node.server_key
+                    )
+                
+                proc.wait()
+                
+                raise RuntimeError, "Failed to set up application %s: "\
+                        "build failed, got wrong token from pid %s/%s "\
+                        "(expected %r, got %r), see buildlog: %s" % (
+                    self.home_path, pid, ppid, self._master_token, slave_token, buildlog)
+
+    def _do_kill_build(self):
+        pid = self._build_pid
+        ppid = self._build_ppid
+        
+        if pid and ppid:
+            rspawn.remote_kill(
+                pid, ppid,
                 host = self.node.hostname,
                 port = None,
                 user = self.node.slicename,
                 agent = None,
+                ident_key = self.node.ident_path
+                )
+        
+        
+    def _do_build_master(self):
+        if not self.sources and not self.build and not self.buildDepends:
+            return None
+            
+        if self.sources:
+            sources = self.sources.split(' ')
+            
+            # Copy all sources
+            (out,err),proc = server.popen_scp(
+                sources,
+                "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
+                    os.path.join(self.home_path,'.'),),
                 ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
         
             if proc.wait():
-                raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+                raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,)
+            
+        buildscript = cStringIO.StringIO()
+        
+        if self.buildDepends:
+            # Install build dependencies
+            buildscript.write(
+                "sudo -S yum -y install %(packages)s\n" % {
+                    'packages' : self.buildDepends
+                }
+            )
+        
+            
+        if self.build:
+            # Build sources
+            buildscript.write(
+                "mkdir -p build && ( cd build && ( %(command)s ) )\n" % {
+                    'command' : self._replace_paths(self.build),
+                    'home' : server.shell_escape(self.home_path),
+                }
+            )
+        
+            # Make archive
+            buildscript.write(
+                "tar czf build.tar.gz build && ( echo %(master_token)s > build.token )\n" % {
+                    'master_token' : server.shell_escape(self._master_token)
+                }
+            )
+        
+        buildscript.seek(0)
 
+        return buildscript
+        
+
+    def _do_install(self):
         if self.install:
             # Install application
             (out,err),proc = server.popen_ssh_command(
@@ -274,6 +506,57 @@ class Dependency(object):
             if proc.wait():
                 raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
 
+    def set_master(self, master):
+        self._master = master
+        
+    def install_keys(self, prk, puk, passphrase):
+        # Install keys
+        self._master_passphrase = passphrase
+        self._master_prk = prk
+        self._master_puk = puk
+        self._master_prk_name = os.path.basename(prk.name)
+        self._master_puk_name = os.path.basename(puk.name)
+        
+    def _do_install_keys(self):
+        prk = self._master_prk
+        puk = self._master_puk
+        
+        (out,err),proc = server.popen_scp(
+            [ prk.name, puk.name ],
+            '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path ),
+            port = None,
+            agent = None,
+            ident_key = self.node.ident_path,
+            server_key = self.node.server_key
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,)
+
+        (out,err),proc = server.popen_scp(
+            cStringIO.StringIO('%s,%s %s\n' % (
+                self._master.node.hostname, socket.gethostbyname(self._master.node.hostname), 
+                self._master.node.server_key)),
+            '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
+                os.path.join(self.home_path,"master_known_hosts") ),
+            port = None,
+            agent = None,
+            ident_key = self.node.ident_path,
+            server_key = self.node.server_key
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,)
+        
+        # No longer need'em
+        self._master_prk = None
+        self._master_puk = None
+    
+    def cleanup(self):
+        # make sure there's no leftover build processes
+        self._do_kill_build()
+
+
 class Application(Dependency):
     """
     An application also has dependencies, but also a command to be ran and monitored.
@@ -397,7 +680,8 @@ class Application(Dependency):
                 port = None,
                 user = self.node.slicename,
                 agent = None,
-                ident_key = self.node.ident_path
+                ident_key = self.node.ident_path,
+                server_key = self.node.server_key
                 )
             
             if status is rspawn.NOT_STARTED:
@@ -448,7 +732,7 @@ class NepiDependency(Dependency):
         tarname = os.path.basename(self.tarball.name)
         
         # it's already built - just move the tarball into place
-        self.build = "mv ${SOURCES}/%s ." % (tarname,)
+        self.build = "mv -f ${SOURCES}/%s ." % (tarname,)
         
         # unpack it into sources, and we're done
         self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,)
@@ -492,12 +776,12 @@ class NS3Dependency(Dependency):
     def __init__(self, api = None):
         super(NS3Dependency, self).__init__(api)
         
-        self.buildDepends = 'build-essential waf gcc gcc-c++ gccxml unzip'
+        self.buildDepends = 'make waf gcc gcc-c++ gccxml unzip'
         
         # We have to download the sources, untar, build...
         pybindgen_source_url = "http://pybindgen.googlecode.com/files/pybindgen-0.15.0.zip"
         pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip"
-        ns3_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/ns-3-dev/archive/tip.tar.gz"
+        ns3_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/nepi-ns-3.9/archive/tip.tar.gz"
         passfd_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/tip.tar.gz"
         self.build =(
             " ( "
index f6693bf..aa17562 100644 (file)
@@ -4,9 +4,26 @@
 from constants import TESTBED_ID
 from nepi.core import testbed_impl
 from nepi.util.constants import TIME_NOW
+from nepi.util.graphtools import mst
+from nepi.util import ipaddr2
 import os
+import os.path
 import time
 import resourcealloc
+import collections
+import operator
+import functools
+import socket
+import struct
+import tempfile
+import subprocess
+import random
+import shutil
+
+from nepi.util.constants import TESTBED_STATUS_CONFIGURED
+
+class TempKeyError(Exception):
+    pass
 
 class TestbedController(testbed_impl.TestbedController):
     def __init__(self, testbed_version):
@@ -63,12 +80,24 @@ class TestbedController(testbed_impl.TestbedController):
             get_attribute_value("authPass")
         self.sliceSSHKey = self._attributes.\
             get_attribute_value("sliceSSHKey")
+        self.sliceSSHKeyPass = None
         self.plcHost = self._attributes.\
             get_attribute_value("plcHost")
         self.plcUrl = self._attributes.\
             get_attribute_value("plcUrl")
         super(TestbedController, self).do_setup()
 
+    def do_post_asynclaunch(self, guid):
+        # Dependencies were launched asynchronously,
+        # so wait for them
+        dep = self._elements[guid]
+        if isinstance(dep, self._app.Dependency):
+            dep.async_setup_wait()
+    
+    # Two-phase configuration for asynchronous launch
+    do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
+    do_poststep_configure = staticmethod(do_post_asynclaunch)
+
     def do_preconfigure(self):
         # Perform resource discovery if we don't have
         # specific resources assigned yet
@@ -76,6 +105,9 @@ class TestbedController(testbed_impl.TestbedController):
 
         # Create PlanetLab slivers
         self.do_provisioning()
+        
+        # Plan application deployment
+        self.do_spanning_deployment_plan()
 
         # Configure elements per XML data
         super(TestbedController, self).do_preconfigure()
@@ -146,7 +178,151 @@ class TestbedController(testbed_impl.TestbedController):
 
         # cleanup
         del self._to_provision
-
+    
+    def do_spanning_deployment_plan(self):
+        # Create application groups by collecting all applications
+        # based on their hash - the hash should contain everything that
+        # defines them and the platform they're built
+        
+        def dephash(app):
+            return (
+                frozenset((app.depends or "").split(' ')),
+                frozenset((app.sources or "").split(' ')),
+                app.build,
+                app.install,
+                app.node.architecture,
+                app.node.operatingSystem,
+                app.node.pl_distro,
+            )
+        
+        depgroups = collections.defaultdict(list)
+        
+        for element in self._elements.itervalues():
+            if isinstance(element, self._app.Dependency):
+                depgroups[dephash(element)].append(element)
+        
+        # Set up spanning deployment for those applications that
+        # have been deployed in several nodes.
+        for dh, group in depgroups.iteritems():
+            if len(group) > 1:
+                # Pick root (deterministically)
+                root = min(group, key=lambda app:app.node.hostname)
+                
+                # Obtain all IPs in numeric format
+                # (which means faster distance computations)
+                for dep in group:
+                    dep._ip = socket.gethostbyname(dep.node.hostname)
+                    dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
+                
+                # Compute plan
+                # NOTE: the plan is an iterator
+                plan = mst.mst(
+                    group,
+                    lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
+                    root = root,
+                    maxbranching = 2)
+                
+                # Re-sign private key
+                try:
+                    tempprk, temppuk, tmppass = self._make_temp_private_key()
+                except TempKeyError:
+                    continue
+                
+                # Set up slaves
+                plan = list(plan)
+                for slave, master in plan:
+                    slave.set_master(master)
+                    slave.install_keys(tempprk, temppuk, tmppass)
+                    
+        # We don't need the user's passphrase anymore
+        self.sliceSSHKeyPass = None
+    
+    def _make_temp_private_key(self):
+        # Get the user's key's passphrase
+        if not self.sliceSSHKeyPass:
+            if 'SSH_ASKPASS' in os.environ:
+                proc = subprocess.Popen(
+                    [ os.environ['SSH_ASKPASS'],
+                      "Please type the passphrase for the %s SSH identity file. "
+                      "The passphrase will be used to re-cipher the identity file with "
+                      "a random 256-bit key for automated chain deployment on the "
+                      "%s PlanetLab slice" % ( 
+                        os.path.basename(self.sliceSSHKey), 
+                        self.slicename
+                    ) ],
+                    stdin = open("/dev/null"),
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.PIPE)
+                out,err = proc.communicate()
+                self.sliceSSHKeyPass = out.strip()
+        
+        if not self.sliceSSHKeyPass:
+            raise TempKeyError
+        
+        # Create temporary key files
+        prk = tempfile.NamedTemporaryFile(
+            dir = self.root_directory,
+            prefix = "pl_deploy_tmpk_",
+            suffix = "")
+
+        puk = tempfile.NamedTemporaryFile(
+            dir = self.root_directory,
+            prefix = "pl_deploy_tmpk_",
+            suffix = ".pub")
+            
+        # Create secure 256-bits temporary passphrase
+        passphrase = ''.join(map(chr,[rng.randint(0,255) 
+                                      for rng in (random.SystemRandom(),)
+                                      for i in xrange(32)] )).encode("hex")
+                
+        # Copy keys
+        oprk = open(self.sliceSSHKey, "rb")
+        opuk = open(self.sliceSSHKey+".pub", "rb")
+        shutil.copymode(oprk.name, prk.name)
+        shutil.copymode(opuk.name, puk.name)
+        shutil.copyfileobj(oprk, prk)
+        shutil.copyfileobj(opuk, puk)
+        prk.flush()
+        puk.flush()
+        oprk.close()
+        opuk.close()
+        
+        # A descriptive comment
+        comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
+        
+        # Recipher keys
+        proc = subprocess.Popen(
+            ["ssh-keygen", "-p",
+             "-f", prk.name,
+             "-P", self.sliceSSHKeyPass,
+             "-N", passphrase,
+             "-C", comment ],
+            stdout = subprocess.PIPE,
+            stderr = subprocess.PIPE,
+            stdin = subprocess.PIPE
+        )
+        out, err = proc.communicate()
+        
+        if err:
+            raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
+                out, err)
+        
+        prk.seek(0)
+        puk.seek(0)
+        
+        # Change comment on public key
+        puklines = puk.readlines()
+        puklines[0] = puklines[0].split(' ')
+        puklines[0][-1] = comment+'\n'
+        puklines[0] = ' '.join(puklines[0])
+        puk.seek(0)
+        puk.truncate()
+        puk.writelines(puklines)
+        del puklines
+        puk.flush()
+        
+        return prk, puk, passphrase
+    
     def set(self, guid, name, value, time = TIME_NOW):
         super(TestbedController, self).set(guid, name, value, time)
         # TODO: take on account schedule time for the task
index 885c181..85ae099 100644 (file)
@@ -409,7 +409,7 @@ def configure_application(testbed_instance, guid):
     app.node.wait_dependencies()
     
     # Install stuff
-    app.setup()
+    app.async_setup()
 
 def configure_dependency(testbed_instance, guid):
     dep = testbed_instance._elements[guid]
@@ -421,7 +421,7 @@ def configure_dependency(testbed_instance, guid):
     dep.node.wait_dependencies()
     
     # Install stuff
-    dep.setup()
+    dep.async_setup()
 
 def configure_netpipe(testbed_instance, guid):
     netpipe = testbed_instance._elements[guid]
index a836d44..6e518f4 100644 (file)
@@ -222,6 +222,16 @@ class TunProtoBase(object):
         self._started = True
     
     def _launch_and_wait(self, *p, **kw):
+        try:
+            self.__launch_and_wait(*p, **kw)
+        except:
+            if self._launcher:
+                import sys
+                self._launcher._exc.append(sys.exc_info())
+            else:
+                raise
+            
+    def __launch_and_wait(self, *p, **kw):
         local = self.local()
         
         self.launch(*p, **kw)
@@ -287,13 +297,18 @@ class TunProtoBase(object):
             self._launcher = threading.Thread(
                 target = self._launch_and_wait,
                 args = (check_proto, listen, extra_args))
+            self._launcher._exc = []
             self._launcher.start()
     
     def async_launch_wait(self):
         if self._launcher:
             self._launcher.join()
             if not self._started:
-                raise RuntimeError, "Failed to launch TUN forwarder"
+                if self._launcher._exc:
+                    exctyp,exval,exctrace = self._launcher._exc[0]
+                    raise exctyp,exval,exctrace
+                else:
+                    raise RuntimeError, "Failed to launch TUN forwarder"
         elif not self._started:
             self.launch()
 
diff --git a/src/nepi/util/graphtools/__init__.py b/src/nepi/util/graphtools/__init__.py
new file mode 100644 (file)
index 0000000..8b13789
--- /dev/null
@@ -0,0 +1 @@
+
diff --git a/src/nepi/util/graphtools/mst.py b/src/nepi/util/graphtools/mst.py
new file mode 100644 (file)
index 0000000..1c59b2f
--- /dev/null
@@ -0,0 +1,148 @@
+import random
+import bisect
+
+def mst(nodes, connected, 
+        maxsoftbranching = None,
+        maxbranching = None, 
+        root = None,
+        untie = lambda l : iter(l).next()):
+    """
+    Returns an iterator over pairs (Node, Parent)
+    which form the spanning tree.
+    
+    Params:
+    
+        nodes: a list of nodes (can be anything)
+        
+        connected: a callable that takes two nodes
+            and returns either an edge weight (one
+            that can be compared with '<' with other
+            edge weights) or None if they're not
+            connected.
+        
+        maxbranching: the maximum number of branches
+            (children) allowed for a node. None for
+            no limit.
+            When maxbranching is used, the algorithm
+            implemented here gives no guarantee
+            of optimality (the spanning tree may not
+            be the minimum), as that problem becomes
+            NP-hard and we want a quick answer.
+        
+        maxsoftbranching: soft branching limit.
+            The algorithm is allowed to break it
+            if it has no other choice. Trees build with
+            soft branching limits are usually less
+            balanced than when using hard limits,
+            but the computation takes a lot less time.
+        
+        root: the desired root of the spanning tree,
+            or None to pick a random one.
+        
+        untie: a callable that, given an iterable
+            of candidate entries of equal weight for
+            the selection to be made, picks one to
+            be added to the spanning tree. The default
+            picks arbitrarily.
+            Entries are of the form (<weight>,<from>,<to>)
+            with <from> and <to> being indices in the
+            nodes array
+    """
+    
+    if not nodes:
+        return
+        
+    if root is None:
+        root = random.sample(nodes, 1)[0]
+    
+    # We want the root's index
+    root = nodes.index(root)
+    
+    # Unpicked nodes, nodes we still have to add.
+    unpicked = set(xrange(len(nodes)))
+    
+    # Distance maps
+    #   We need:
+    #       min distance to picked node
+    #       which picked node
+    #   Or None if it was a picked or unconnected node
+    
+    N = len(nodes)
+    distance = [None] * N
+    which    = [None] * N
+    
+    # Count branches
+    branching = [0] * N
+    
+    # Initialize with distances to root
+    def update_distance_map(fornode):
+        ref = nodes[fornode]
+        for other, prevdistance in enumerate(distance):
+            other_node = nodes[other]
+            d = connected(ref, other_node)
+            if d is not None:
+                if prevdistance is None or prevdistance > d:
+                    distance[other] = d
+                    which[other] = fornode
+        distance[fornode] = None
+        which[fornode] = None
+    
+    update_distance_map(root)
+    unpicked.remove(root)
+    
+    # Add remaining nodes, yield edges
+    def minrange(dsorted):
+        return dsorted[:bisect.bisect(dsorted, (dsorted[0][0], N, N))]
+        
+    needsrebuild = False
+    while unpicked:
+        # Rebuild the distance map if needed
+        # (ie, when a node in the partial MST is no longer
+        # a candidate for adjoining because of saturation)
+        if needsrebuild:
+            print "Rebuilding distance map..."
+            distance = [None] * N
+            which    = [None] * N
+            for n in xrange(N):
+                if n not in unpicked and branching[n] < maxbranching:
+                    update_distance_map(n)
+        
+        # Pick the closest unpicked node
+        dsorted = [(d,i,w) for i,(d,w) in enumerate(zip(distance, which)) 
+                   if d is not None 
+                      and i in unpicked
+                      and (maxbranching is None or branching[w] < maxbranching)
+                      and (maxsoftbranching is None or branching[w] < maxsoftbranching)]
+        if not dsorted and maxsoftbranching is not None:
+            dsorted = [(d,i,w) for i,(d,w) in enumerate(zip(distance, which)) 
+                       if d is not None 
+                          and i in unpicked
+                          and (maxbranching is None or branching[w] < maxbranching)]
+        if not dsorted:
+            raise AssertionError, "Unconnected graph"
+        
+        dsorted.sort()
+        dsorted = minrange(dsorted)
+        
+        if len(dsorted) > 1:
+            winner = untie(dsorted)
+        elif dsorted:
+            winner = dsorted[0]
+        else:
+            raise AssertionError, "Unconnected graph"
+        
+        weight, edgefrom, edgeto = winner
+        
+        branching[edgeto] += 1
+        
+        if maxbranching is not None and branching[edgeto] == maxbranching:
+            needsrebuild = True
+        
+        # Yield edge, update distance map to account
+        # for the picked node
+        yield (nodes[edgefrom], nodes[edgeto])
+        
+        update_distance_map(edgefrom)
+        unpicked.remove(edgefrom)
+
+
index 88700be..a01067d 100644 (file)
@@ -25,3 +25,21 @@ def ipv4_mask2dot(mask):
     mask = '.'.join(map(str,map(ord,mask)))
     return mask
 
+def ipdist(a,b):
+    a = struct.unpack('!L',socket.inet_aton(a))[0]
+    b = struct.unpack('!L',socket.inet_aton(b))[0]
+    d = 32
+    while d and (b&0x80000000)==(a&0x80000000):
+        a <<= 1
+        b <<= 1
+        d -= 1
+    return d
+
+def ipdistn(a,b):
+    d = 32
+    while d and (b&0x80000000)==(a&0x80000000):
+        a <<= 1
+        b <<= 1
+        d -= 1
+    return d
+
diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py
new file mode 100644 (file)
index 0000000..d1169a5
--- /dev/null
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import threading
+import Queue
+import traceback
+import sys
+
+N_PROCS = None
+
+class ParallelMap(object):
+    def __init__(self, maxthreads = None, maxqueue = None, results = True):
+        global N_PROCS
+        
+        if maxthreads is None:
+            if N_PROCS is None:
+                try:
+                    f = open("/proc/cpuinfo")
+                    try:
+                        N_PROCS = sum("processor" in l for l in f)
+                    finally:
+                        f.close()
+                except:
+                    pass
+            maxthreads = N_PROCS
+        
+        if maxthreads is None:
+            maxthreads = 4
+
+        self.queue = Queue.Queue(maxqueue or 0)
+    
+        self.workers = [ threading.Thread(target = self.worker) 
+                         for x in xrange(maxthreads) ]
+        
+        if results:
+            self.rvqueue = Queue.Queue()
+        else:
+            self.rvqueue = None
+        
+    def put(self, callable, *args, **kwargs):
+        self.queue.put((callable, args, kwargs))
+    
+    def put_nowait(self, callable, *args, **kwargs):
+        self.queue.put_nowait((callable, args, kwargs))
+
+    def start(self):
+        for thread in self.workers:
+            thread.start()
+    
+    def join(self):
+        for thread in self.workers:
+            # That's the shutdown signal
+            self.queue.put(None)
+            
+        self.queue.join()
+        for thread in self.workers:
+            thread.join()
+        
+    def worker(self):
+        while True:
+            task = self.queue.get()
+            if task is None:
+                self.queue.task_done()
+                break
+            
+            try:
+                try:
+                    callable, args, kwargs = task
+                    rv = callable(*args, **kwargs)
+                    
+                    if self.rvqueue is not None:
+                        self.rvqueue.put(rv)
+                finally:
+                    self.queue.task_done()
+            except:
+                traceback.print_exc(file = sys.stderr)
+
+    def __iter__(self):
+        if self.rvqueue is not None:
+            while True:
+                try:
+                    yield self.rvqueue.get_nowait()
+                except Queue.Empty:
+                    self.queue.join()
+                    try:
+                        yield self.rvqueue.get_nowait()
+                    except Queue.Empty:
+                        raise StopIteration
+            
+    
+    
index 2786285..60eba70 100755 (executable)
@@ -12,6 +12,7 @@ import test_util
 import time
 import unittest
 import re
+import sys
 
 class PlanetLabIntegrationTestCase(unittest.TestCase):
     testbed_id = "planetlab"
@@ -21,11 +22,14 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
     
     host1 = "nepi1.pl.sophia.inria.fr"
     host2 = "nepi2.pl.sophia.inria.fr"
+    host3 = "nepi3.pl.sophia.inria.fr"
+    host4 = "nepi5.pl.sophia.inria.fr"
 
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
 
     def tearDown(self):
+        return
         try:
             shutil.rmtree(self.root_dir)
         except:
@@ -79,19 +83,91 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
         xml = exp.to_xml()
 
         controller = ExperimentController(xml, self.root_dir)
-        controller.start()
-        while not controller.is_finished(app.guid):
-            time.sleep(0.5)
-        ping_result = controller.trace(app.guid, "stdout")
-        comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+        try:
+            controller.start()
+            while not controller.is_finished(app.guid):
+                time.sleep(0.5)
+            ping_result = controller.trace(app.guid, "stdout")
+            comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
 
 --- .* ping statistics ---
 1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
 """
-        self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
-            "Unexpected trace:\n" + ping_result)
-        controller.stop()
-        controller.shutdown()
+            self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+                "Unexpected trace:\n" + ping_result)
+        
+        finally:
+            controller.stop()
+            controller.shutdown()
+
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_spanning_deployment(self):
+        pl, exp = self.make_experiment_desc()
+        
+        from nepi.testbeds import planetlab as plpackage
+        
+        nodes = [ pl.create("Node") for i in xrange(4) ]
+        ifaces = [ pl.create("NodeInterface") for node in nodes ]
+        inet = pl.create("Internet")
+        for node, iface in zip(nodes,ifaces):
+            node.connector("devs").connect(iface.connector("node"))
+            iface.connector("inet").connect(inet.connector("devs"))
+        
+        apps = []
+        for node in nodes:
+            app = pl.create("Application")
+            app.set_attribute_value("command", "./consts")
+            app.set_attribute_value("buildDepends", "gcc")
+            app.set_attribute_value("build", "gcc ${SOURCES}/consts.c -o consts")
+            app.set_attribute_value("install", "cp consts ${SOURCES}/consts")
+            app.set_attribute_value("sources", os.path.join(
+                os.path.dirname(plpackage.__file__),'scripts','consts.c'))
+            app.enable_trace("stdout")
+            app.enable_trace("stderr")
+            app.enable_trace("buildlog")
+            node.connector("apps").connect(app.connector("node"))
+            apps.append(app)
+
+        comp_result = \
+r""".*ETH_P_ALL = 0x[0-9a-fA-F]{8}
+ETH_P_IP = 0x[0-9a-fA-F]{8}
+TUNGETIFF = 0x[0-9a-fA-F]{8}
+TUNSETIFF = 0x[0-9a-fA-F]{8}
+IFF_NO_PI = 0x[0-9a-fA-F]{8}
+IFF_TAP = 0x[0-9a-fA-F]{8}
+IFF_TUN = 0x[0-9a-fA-F]{8}
+IFF_VNET_HDR = 0x[0-9a-fA-F]{8}
+TUN_PKT_STRIP = 0x[0-9a-fA-F]{8}
+IFHWADDRLEN = 0x[0-9a-fA-F]{8}
+IFNAMSIZ = 0x[0-9a-fA-F]{8}
+IFREQ_SZ = 0x[0-9a-fA-F]{8}
+FIONREAD = 0x[0-9a-fA-F]{8}.*
+"""
+
+        comp_build = r".*(Identity added|gcc).*"
+
+        xml = exp.to_xml()
+
+        controller = ExperimentController(xml, self.root_dir)
+        try:
+            controller.start()
+            while not all(controller.is_finished(app.guid) for app in apps):
+                time.sleep(0.5)
+            
+            for app in apps:
+                app_result = controller.trace(app.guid, "stdout") or ""
+                self.assertTrue(re.match(comp_result, app_result, re.MULTILINE),
+                    "Unexpected trace:\n" + app_result)
+
+                build_result = controller.trace(app.guid, "buildlog") or ""
+                self.assertTrue(re.match(comp_build, build_result, re.MULTILINE | re.DOTALL),
+                    "Unexpected trace:\n" + build_result)
+        
+        finally:
+            controller.stop()
+            controller.shutdown()
+        
 
 if __name__ == '__main__':
     unittest.main()