"nepi.core",
"nepi.util.parser",
"nepi.util.settools",
+ "nepi.util.graphtools",
"nepi.util" ],
package_dir = {"": "src"},
package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"],
import operator
import os
import os.path
+import sys
import nepi.util.server as server
import cStringIO
import subprocess
import rspawn
+import random
+import time
+import socket
+import threading
from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
STATUS_FINISHED
self._setuper = None
self._pid = None
self._ppid = None
+
+ # Spanning tree deployment
+ self._master = None
+ self._master_passphrase = None
+ self._master_prk = None
+ self._master_puk = None
+ self._master_token = ''.join(map(chr,[rng.randint(0,255)
+ for rng in (random.SystemRandom(),)
+ for i in xrange(8)] )).encode("hex")
+ self._build_pid = None
+ self._build_ppid = None
+
def __str__(self):
return "%s<%s>" % (
self.__class__.__name__,
- ' '.join(list(self.depends or [])
- + list(self.sources or []))
+ ' '.join(filter(bool,(self.depends, self.sources)))
)
def validate(self):
def setup(self):
self._make_home()
- self._build()
+ self._launch_build()
+ self._finish_build()
self._setup = True
def async_setup(self):
if not self._setuper:
+ def setuper():
+ try:
+ self.setup()
+ except:
+ self._setuper._exc.append(sys.exc_info())
self._setuper = threading.Thread(
- target = self.setup)
+ target = setuper)
+ self._setuper._exc = []
self._setuper.start()
def async_setup_wait(self):
if self._setuper:
self._setuper.join()
if not self._setup:
- raise RuntimeError, "Failed to setup application"
+ if self._setuper._exc:
+ exctyp,exval,exctrace = self._setuper._exc[0]
+ raise exctyp,exval,exctrace
+ else:
+ raise RuntimeError, "Failed to setup application"
else:
self.setup()
# Make sure all the paths are created where
# they have to be created for deployment
(out,err),proc = server.popen_ssh_command(
- "mkdir -p %s" % (server.shell_escape(self.home_path),),
+ "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" % { 'home' : server.shell_escape(self.home_path) },
host = self.node.hostname,
port = None,
user = self.node.slicename,
)
if proc.wait():
- raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-
+ raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
if self.stdin:
# Write program input
)
if proc.wait():
- raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+ raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
def _replace_paths(self, command):
"""
.replace("${SOURCES}", root+server.shell_escape(self.home_path))
.replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) )
- def _build(self):
- if self.sources:
- sources = self.sources.split(' ')
+ def _launch_build(self):
+ if self._master is not None:
+ self._do_install_keys()
+ buildscript = self._do_build_slave()
+ else:
+ buildscript = self._do_build_master()
- # Copy all sources
+ if buildscript is not None:
+ # upload build script
(out,err),proc = server.popen_scp(
- sources,
- "%s@%s:%s" % (self.node.slicename, self.node.hostname,
- os.path.join(self.home_path,'.'),),
+ buildscript,
+ '%s@%s:%s' % (self.node.slicename, self.node.hostname,
+ os.path.join(self.home_path, 'nepi-build.sh') ),
+ port = None,
+ agent = None,
ident_key = self.node.ident_path,
server_key = self.node.server_key
)
-
+
if proc.wait():
- raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,)
+ raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
- if self.buildDepends:
- # Install build dependencies
- (out,err),proc = server.popen_ssh_command(
- "sudo -S yum -y install %(packages)s" % {
- 'packages' : self.buildDepends
- },
+ # launch build
+ self._do_launch_build()
+
+ def _finish_build(self):
+ self._do_wait_build()
+ self._do_install()
+
+ def _do_build_slave(self):
+ if not self.sources and not self.build:
+ return None
+
+ # Create build script
+ files = set()
+
+ if self.sources:
+ sources = self.sources.split(' ')
+ files.update(
+ "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostname,
+ os.path.join(self._master.home_path, os.path.basename(source)),)
+ for source in sources
+ )
+
+ if self.build:
+ files.add(
+ "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostname,
+ os.path.join(self._master.home_path, 'build.tar.gz'),)
+ )
+
+ launch_agent = "{ ( echo -e '#!/bin/sh\\ncat' > .ssh-askpass ) && chmod u+x .ssh-askpass"\
+ " && export SSH_ASKPASS=$(pwd)/.ssh-askpass "\
+ " && ssh-agent > .ssh-agent.sh ; } && . ./.ssh-agent.sh && ( echo $NEPI_MASTER_PASSPHRASE | ssh-add %(prk)s ) && rm -rf %(prk)s %(puk)s" % \
+ {
+ 'prk' : server.shell_escape(self._master_prk_name),
+ 'puk' : server.shell_escape(self._master_puk_name),
+ }
+
+ kill_agent = "kill $SSH_AGENT_PID"
+
+ waitmaster = "{ . ./.ssh-agent.sh ; while [[ $(ssh -q -o UserKnownHostsFile=%(hostkey)s %(master)s cat %(token_path)s) != %(token)s ]] ; do sleep 5 ; done ; }" % {
+ 'hostkey' : 'master_known_hosts',
+ 'master' : "%s@%s" % (self._master.node.slicename, self._master.node.hostname),
+ 'token_path' : os.path.join(self._master.home_path, 'build.token'),
+ 'token' : server.shell_escape(self._master._master_token),
+ }
+
+ syncfiles = "scp -p -o UserKnownHostsFile=%(hostkey)s %(files)s ." % {
+ 'hostkey' : 'master_known_hosts',
+ 'files' : ' '.join(files),
+ }
+ if self.build:
+ syncfiles += " && tar xzf build.tar.gz"
+ syncfiles += " && ( echo %s > build.token )" % (server.shell_escape(self._master_token),)
+ syncfiles = "{ . ./.ssh-agent.sh ; %s ; }" % (syncfiles,)
+
+ cleanup = "{ . ./.ssh-agent.sh ; kill $SSH_AGENT_PID ; rm -rf %(prk)s %(puk)s master_known_hosts .ssh-askpass ; }" % {
+ 'prk' : server.shell_escape(self._master_prk_name),
+ 'puk' : server.shell_escape(self._master_puk_name),
+ }
+
+ slavescript = "( ( %(launch_agent)s && %(waitmaster)s && %(syncfiles)s && %(kill_agent)s && %(cleanup)s ) || %(cleanup)s )" % {
+ 'waitmaster' : waitmaster,
+ 'syncfiles' : syncfiles,
+ 'cleanup' : cleanup,
+ 'kill_agent' : kill_agent,
+ 'launch_agent' : launch_agent,
+ 'home' : server.shell_escape(self.home_path),
+ }
+
+ return cStringIO.StringIO(slavescript)
+
+ def _do_launch_build(self):
+ script = "bash ./nepi-build.sh"
+ if self._master_passphrase:
+ script = "NEPI_MASTER_PASSPHRASE=%s %s" % (
+ server.shell_escape(self._master_passphrase),
+ script
+ )
+ (out,err),proc = rspawn.remote_spawn(
+ script,
+
+ pidfile = 'build-pid',
+ home = self.home_path,
+ stdin = '/dev/null',
+ stdout = 'buildlog',
+ stderr = rspawn.STDOUT,
+
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed to set up build slave %s: %s %s" % (self.home_path, out,err,)
+
+
+ pid = ppid = None
+ delay = 1.0
+ for i in xrange(5):
+ pidtuple = rspawn.remote_check_pid(
+ os.path.join(self.home_path,'build-pid'),
host = self.node.hostname,
port = None,
user = self.node.slicename,
ident_key = self.node.ident_path,
server_key = self.node.server_key
)
-
- if proc.wait():
- raise RuntimeError, "Failed instal build dependencies: %s %s" % (out,err,)
-
- if self.build:
- # Build sources
+ if pidtuple:
+ pid, ppid = pidtuple
+ self._build_pid, self._build_ppid = pidtuple
+ break
+ else:
+ time.sleep(delay)
+ delay = min(30,delay*1.2)
+ else:
+ raise RuntimeError, "Failed to set up build slave %s: cannot get pid" % (self.home_path,)
+
+ def _do_wait_build(self):
+ pid = self._build_pid
+ ppid = self._build_ppid
+
+ if pid and ppid:
+ delay = 1.0
+ while True:
+ status = rspawn.remote_status(
+ pid, ppid,
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if status is not rspawn.RUNNING:
+ self._build_pid = self._build_ppid = None
+ break
+ else:
+ time.sleep(delay*(0.5+random.random()))
+ delay = min(30,delay*1.2)
+
+ # check build token
+
(out,err),proc = server.popen_ssh_command(
- "cd %(home)s && mkdir -p build && cd build && ( %(command)s ) > ${HOME}/%(home)s/buildlog 2>&1 || ( tail ${HOME}/%(home)s/buildlog >&2 && false )" % {
- 'command' : self._replace_paths(self.build),
- 'home' : server.shell_escape(self.home_path),
+ "cat %(token_path)s" % {
+ 'token_path' : os.path.join(self.home_path, 'build.token'),
},
host = self.node.hostname,
port = None,
ident_key = self.node.ident_path,
server_key = self.node.server_key
)
-
- if proc.wait():
- raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+
+ slave_token = ""
+ if not proc.wait() and out:
+ slave_token = out.strip()
+
+ if slave_token != self._master_token:
+ # Get buildlog for the error message
- # Make archive
- (out,err),proc = server.popen_ssh_command(
- "cd %(home)s && tar czf build.tar.gz build" % {
- 'command' : self._replace_paths(self.build),
- 'home' : server.shell_escape(self.home_path),
- },
+ (buildlog,err),proc = server.popen_ssh_command(
+ "cat %(buildlog)s" % {
+ 'buildlog' : os.path.join(self.home_path, 'buildlog'),
+ 'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
+ },
+ host = self.node.hostname,
+ port = None,
+ user = self.node.slicename,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ proc.wait()
+
+ raise RuntimeError, "Failed to set up application %s: "\
+ "build failed, got wrong token from pid %s/%s "\
+ "(expected %r, got %r), see buildlog: %s" % (
+ self.home_path, pid, ppid, self._master_token, slave_token, buildlog)
+
+ def _do_kill_build(self):
+ pid = self._build_pid
+ ppid = self._build_ppid
+
+ if pid and ppid:
+ rspawn.remote_kill(
+ pid, ppid,
host = self.node.hostname,
port = None,
user = self.node.slicename,
agent = None,
+ ident_key = self.node.ident_path
+ )
+
+
+ def _do_build_master(self):
+ if not self.sources and not self.build and not self.buildDepends:
+ return None
+
+ if self.sources:
+ sources = self.sources.split(' ')
+
+ # Copy all sources
+ (out,err),proc = server.popen_scp(
+ sources,
+ "%s@%s:%s" % (self.node.slicename, self.node.hostname,
+ os.path.join(self.home_path,'.'),),
ident_key = self.node.ident_path,
server_key = self.node.server_key
)
if proc.wait():
- raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+ raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,)
+
+ buildscript = cStringIO.StringIO()
+
+ if self.buildDepends:
+ # Install build dependencies
+ buildscript.write(
+ "sudo -S yum -y install %(packages)s\n" % {
+ 'packages' : self.buildDepends
+ }
+ )
+
+
+ if self.build:
+ # Build sources
+ buildscript.write(
+ "mkdir -p build && ( cd build && ( %(command)s ) )\n" % {
+ 'command' : self._replace_paths(self.build),
+ 'home' : server.shell_escape(self.home_path),
+ }
+ )
+
+ # Make archive
+ buildscript.write(
+ "tar czf build.tar.gz build && ( echo %(master_token)s > build.token )\n" % {
+ 'master_token' : server.shell_escape(self._master_token)
+ }
+ )
+
+ buildscript.seek(0)
+ return buildscript
+
+
+ def _do_install(self):
if self.install:
# Install application
(out,err),proc = server.popen_ssh_command(
if proc.wait():
raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+ def set_master(self, master):
+ self._master = master
+
+ def install_keys(self, prk, puk, passphrase):
+ # Install keys
+ self._master_passphrase = passphrase
+ self._master_prk = prk
+ self._master_puk = puk
+ self._master_prk_name = os.path.basename(prk.name)
+ self._master_puk_name = os.path.basename(puk.name)
+
+ def _do_install_keys(self):
+ prk = self._master_prk
+ puk = self._master_puk
+
+ (out,err),proc = server.popen_scp(
+ [ prk.name, puk.name ],
+ '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path ),
+ port = None,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,)
+
+ (out,err),proc = server.popen_scp(
+ cStringIO.StringIO('%s,%s %s\n' % (
+ self._master.node.hostname, socket.gethostbyname(self._master.node.hostname),
+ self._master.node.server_key)),
+ '%s@%s:%s' % (self.node.slicename, self.node.hostname,
+ os.path.join(self.home_path,"master_known_hosts") ),
+ port = None,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
+
+ if proc.wait():
+ raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,)
+
+ # No longer need'em
+ self._master_prk = None
+ self._master_puk = None
+
+ def cleanup(self):
+ # make sure there's no leftover build processes
+ self._do_kill_build()
+
+
class Application(Dependency):
"""
An application also has dependencies, but also a command to be ran and monitored.
port = None,
user = self.node.slicename,
agent = None,
- ident_key = self.node.ident_path
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
)
if status is rspawn.NOT_STARTED:
tarname = os.path.basename(self.tarball.name)
# it's already built - just move the tarball into place
- self.build = "mv ${SOURCES}/%s ." % (tarname,)
+ self.build = "mv -f ${SOURCES}/%s ." % (tarname,)
# unpack it into sources, and we're done
self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,)
def __init__(self, api = None):
super(NS3Dependency, self).__init__(api)
- self.buildDepends = 'build-essential waf gcc gcc-c++ gccxml unzip'
+ self.buildDepends = 'make waf gcc gcc-c++ gccxml unzip'
# We have to download the sources, untar, build...
pybindgen_source_url = "http://pybindgen.googlecode.com/files/pybindgen-0.15.0.zip"
pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip"
- ns3_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/ns-3-dev/archive/tip.tar.gz"
+ ns3_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/nepi-ns-3.9/archive/tip.tar.gz"
passfd_source_url = "http://yans.pl.sophia.inria.fr/code/hgwebdir.cgi/python-passfd/archive/tip.tar.gz"
self.build =(
" ( "
from constants import TESTBED_ID
from nepi.core import testbed_impl
from nepi.util.constants import TIME_NOW
+from nepi.util.graphtools import mst
+from nepi.util import ipaddr2
import os
+import os.path
import time
import resourcealloc
+import collections
+import operator
+import functools
+import socket
+import struct
+import tempfile
+import subprocess
+import random
+import shutil
+
+from nepi.util.constants import TESTBED_STATUS_CONFIGURED
+
+class TempKeyError(Exception):
+ pass
class TestbedController(testbed_impl.TestbedController):
def __init__(self, testbed_version):
get_attribute_value("authPass")
self.sliceSSHKey = self._attributes.\
get_attribute_value("sliceSSHKey")
+ self.sliceSSHKeyPass = None
self.plcHost = self._attributes.\
get_attribute_value("plcHost")
self.plcUrl = self._attributes.\
get_attribute_value("plcUrl")
super(TestbedController, self).do_setup()
+ def do_post_asynclaunch(self, guid):
+ # Dependencies were launched asynchronously,
+ # so wait for them
+ dep = self._elements[guid]
+ if isinstance(dep, self._app.Dependency):
+ dep.async_setup_wait()
+
+ # Two-phase configuration for asynchronous launch
+ do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
+ do_poststep_configure = staticmethod(do_post_asynclaunch)
+
def do_preconfigure(self):
# Perform resource discovery if we don't have
# specific resources assigned yet
# Create PlanetLab slivers
self.do_provisioning()
+
+ # Plan application deployment
+ self.do_spanning_deployment_plan()
# Configure elements per XML data
super(TestbedController, self).do_preconfigure()
# cleanup
del self._to_provision
-
+
+ def do_spanning_deployment_plan(self):
+ # Create application groups by collecting all applications
+ # based on their hash - the hash should contain everything that
+ # defines them and the platform they're built
+
+ def dephash(app):
+ return (
+ frozenset((app.depends or "").split(' ')),
+ frozenset((app.sources or "").split(' ')),
+ app.build,
+ app.install,
+ app.node.architecture,
+ app.node.operatingSystem,
+ app.node.pl_distro,
+ )
+
+ depgroups = collections.defaultdict(list)
+
+ for element in self._elements.itervalues():
+ if isinstance(element, self._app.Dependency):
+ depgroups[dephash(element)].append(element)
+
+ # Set up spanning deployment for those applications that
+ # have been deployed in several nodes.
+ for dh, group in depgroups.iteritems():
+ if len(group) > 1:
+ # Pick root (deterministically)
+ root = min(group, key=lambda app:app.node.hostname)
+
+ # Obtain all IPs in numeric format
+ # (which means faster distance computations)
+ for dep in group:
+ dep._ip = socket.gethostbyname(dep.node.hostname)
+ dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
+
+ # Compute plan
+ # NOTE: the plan is an iterator
+ plan = mst.mst(
+ group,
+ lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
+ root = root,
+ maxbranching = 2)
+
+ # Re-sign private key
+ try:
+ tempprk, temppuk, tmppass = self._make_temp_private_key()
+ except TempKeyError:
+ continue
+
+ # Set up slaves
+ plan = list(plan)
+ for slave, master in plan:
+ slave.set_master(master)
+ slave.install_keys(tempprk, temppuk, tmppass)
+
+ # We don't need the user's passphrase anymore
+ self.sliceSSHKeyPass = None
+
+ def _make_temp_private_key(self):
+ # Get the user's key's passphrase
+ if not self.sliceSSHKeyPass:
+ if 'SSH_ASKPASS' in os.environ:
+ proc = subprocess.Popen(
+ [ os.environ['SSH_ASKPASS'],
+ "Please type the passphrase for the %s SSH identity file. "
+ "The passphrase will be used to re-cipher the identity file with "
+ "a random 256-bit key for automated chain deployment on the "
+ "%s PlanetLab slice" % (
+ os.path.basename(self.sliceSSHKey),
+ self.slicename
+ ) ],
+ stdin = open("/dev/null"),
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ out,err = proc.communicate()
+ self.sliceSSHKeyPass = out.strip()
+
+ if not self.sliceSSHKeyPass:
+ raise TempKeyError
+
+ # Create temporary key files
+ prk = tempfile.NamedTemporaryFile(
+ dir = self.root_directory,
+ prefix = "pl_deploy_tmpk_",
+ suffix = "")
+
+ puk = tempfile.NamedTemporaryFile(
+ dir = self.root_directory,
+ prefix = "pl_deploy_tmpk_",
+ suffix = ".pub")
+
+ # Create secure 256-bits temporary passphrase
+ passphrase = ''.join(map(chr,[rng.randint(0,255)
+ for rng in (random.SystemRandom(),)
+ for i in xrange(32)] )).encode("hex")
+
+ # Copy keys
+ oprk = open(self.sliceSSHKey, "rb")
+ opuk = open(self.sliceSSHKey+".pub", "rb")
+ shutil.copymode(oprk.name, prk.name)
+ shutil.copymode(opuk.name, puk.name)
+ shutil.copyfileobj(oprk, prk)
+ shutil.copyfileobj(opuk, puk)
+ prk.flush()
+ puk.flush()
+ oprk.close()
+ opuk.close()
+
+ # A descriptive comment
+ comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
+
+ # Recipher keys
+ proc = subprocess.Popen(
+ ["ssh-keygen", "-p",
+ "-f", prk.name,
+ "-P", self.sliceSSHKeyPass,
+ "-N", passphrase,
+ "-C", comment ],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ stdin = subprocess.PIPE
+ )
+ out, err = proc.communicate()
+
+ if err:
+ raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
+ out, err)
+
+ prk.seek(0)
+ puk.seek(0)
+
+ # Change comment on public key
+ puklines = puk.readlines()
+ puklines[0] = puklines[0].split(' ')
+ puklines[0][-1] = comment+'\n'
+ puklines[0] = ' '.join(puklines[0])
+ puk.seek(0)
+ puk.truncate()
+ puk.writelines(puklines)
+ del puklines
+ puk.flush()
+
+ return prk, puk, passphrase
+
def set(self, guid, name, value, time = TIME_NOW):
super(TestbedController, self).set(guid, name, value, time)
# TODO: take on account schedule time for the task
app.node.wait_dependencies()
# Install stuff
- app.setup()
+ app.async_setup()
def configure_dependency(testbed_instance, guid):
dep = testbed_instance._elements[guid]
dep.node.wait_dependencies()
# Install stuff
- dep.setup()
+ dep.async_setup()
def configure_netpipe(testbed_instance, guid):
netpipe = testbed_instance._elements[guid]
self._started = True
def _launch_and_wait(self, *p, **kw):
+ try:
+ self.__launch_and_wait(*p, **kw)
+ except:
+ if self._launcher:
+ import sys
+ self._launcher._exc.append(sys.exc_info())
+ else:
+ raise
+
+ def __launch_and_wait(self, *p, **kw):
local = self.local()
self.launch(*p, **kw)
self._launcher = threading.Thread(
target = self._launch_and_wait,
args = (check_proto, listen, extra_args))
+ self._launcher._exc = []
self._launcher.start()
def async_launch_wait(self):
if self._launcher:
self._launcher.join()
if not self._started:
- raise RuntimeError, "Failed to launch TUN forwarder"
+ if self._launcher._exc:
+ exctyp,exval,exctrace = self._launcher._exc[0]
+ raise exctyp,exval,exctrace
+ else:
+ raise RuntimeError, "Failed to launch TUN forwarder"
elif not self._started:
self.launch()
--- /dev/null
+import random
+import bisect
+
+def mst(nodes, connected,
+ maxsoftbranching = None,
+ maxbranching = None,
+ root = None,
+ untie = lambda l : iter(l).next()):
+ """
+ Returns an iterator over pairs (Node, Parent)
+ which form the spanning tree.
+
+ Params:
+
+ nodes: a list of nodes (can be anything)
+
+ connected: a callable that takes two nodes
+ and returns either an edge weight (one
+ that can be compared with '<' with other
+ edge weights) or None if they're not
+ connected.
+
+ maxbranching: the maximum number of branches
+ (children) allowed for a node. None for
+ no limit.
+ When maxbranching is used, the algorithm
+ implemented here gives no guarantee
+ of optimality (the spanning tree may not
+ be the minimum), as that problem becomes
+ NP-hard and we want a quick answer.
+
+ maxsoftbranching: soft branching limit.
+ The algorithm is allowed to break it
+ if it has no other choice. Trees build with
+ soft branching limits are usually less
+ balanced than when using hard limits,
+ but the computation takes a lot less time.
+
+ root: the desired root of the spanning tree,
+ or None to pick a random one.
+
+ untie: a callable that, given an iterable
+ of candidate entries of equal weight for
+ the selection to be made, picks one to
+ be added to the spanning tree. The default
+ picks arbitrarily.
+ Entries are of the form (<weight>,<from>,<to>)
+ with <from> and <to> being indices in the
+ nodes array
+ """
+
+ if not nodes:
+ return
+
+ if root is None:
+ root = random.sample(nodes, 1)[0]
+
+ # We want the root's index
+ root = nodes.index(root)
+
+ # Unpicked nodes, nodes we still have to add.
+ unpicked = set(xrange(len(nodes)))
+
+ # Distance maps
+ # We need:
+ # min distance to picked node
+ # which picked node
+ # Or None if it was a picked or unconnected node
+
+ N = len(nodes)
+ distance = [None] * N
+ which = [None] * N
+
+ # Count branches
+ branching = [0] * N
+
+ # Initialize with distances to root
+ def update_distance_map(fornode):
+ ref = nodes[fornode]
+ for other, prevdistance in enumerate(distance):
+ other_node = nodes[other]
+ d = connected(ref, other_node)
+ if d is not None:
+ if prevdistance is None or prevdistance > d:
+ distance[other] = d
+ which[other] = fornode
+ distance[fornode] = None
+ which[fornode] = None
+
+ update_distance_map(root)
+ unpicked.remove(root)
+
+ # Add remaining nodes, yield edges
+ def minrange(dsorted):
+ return dsorted[:bisect.bisect(dsorted, (dsorted[0][0], N, N))]
+
+ needsrebuild = False
+ while unpicked:
+ # Rebuild the distance map if needed
+ # (ie, when a node in the partial MST is no longer
+ # a candidate for adjoining because of saturation)
+ if needsrebuild:
+ print "Rebuilding distance map..."
+ distance = [None] * N
+ which = [None] * N
+ for n in xrange(N):
+ if n not in unpicked and branching[n] < maxbranching:
+ update_distance_map(n)
+
+ # Pick the closest unpicked node
+ dsorted = [(d,i,w) for i,(d,w) in enumerate(zip(distance, which))
+ if d is not None
+ and i in unpicked
+ and (maxbranching is None or branching[w] < maxbranching)
+ and (maxsoftbranching is None or branching[w] < maxsoftbranching)]
+ if not dsorted and maxsoftbranching is not None:
+ dsorted = [(d,i,w) for i,(d,w) in enumerate(zip(distance, which))
+ if d is not None
+ and i in unpicked
+ and (maxbranching is None or branching[w] < maxbranching)]
+ if not dsorted:
+ raise AssertionError, "Unconnected graph"
+
+ dsorted.sort()
+ dsorted = minrange(dsorted)
+
+ if len(dsorted) > 1:
+ winner = untie(dsorted)
+ elif dsorted:
+ winner = dsorted[0]
+ else:
+ raise AssertionError, "Unconnected graph"
+
+ weight, edgefrom, edgeto = winner
+
+ branching[edgeto] += 1
+
+ if maxbranching is not None and branching[edgeto] == maxbranching:
+ needsrebuild = True
+
+ # Yield edge, update distance map to account
+ # for the picked node
+ yield (nodes[edgefrom], nodes[edgeto])
+
+ update_distance_map(edgefrom)
+ unpicked.remove(edgefrom)
+
+
mask = '.'.join(map(str,map(ord,mask)))
return mask
+def ipdist(a,b):
+ a = struct.unpack('!L',socket.inet_aton(a))[0]
+ b = struct.unpack('!L',socket.inet_aton(b))[0]
+ d = 32
+ while d and (b&0x80000000)==(a&0x80000000):
+ a <<= 1
+ b <<= 1
+ d -= 1
+ return d
+
+def ipdistn(a,b):
+ d = 32
+ while d and (b&0x80000000)==(a&0x80000000):
+ a <<= 1
+ b <<= 1
+ d -= 1
+ return d
+
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import threading
+import Queue
+import traceback
+import sys
+
+N_PROCS = None
+
+class ParallelMap(object):
+ def __init__(self, maxthreads = None, maxqueue = None, results = True):
+ global N_PROCS
+
+ if maxthreads is None:
+ if N_PROCS is None:
+ try:
+ f = open("/proc/cpuinfo")
+ try:
+ N_PROCS = sum("processor" in l for l in f)
+ finally:
+ f.close()
+ except:
+ pass
+ maxthreads = N_PROCS
+
+ if maxthreads is None:
+ maxthreads = 4
+
+ self.queue = Queue.Queue(maxqueue or 0)
+
+ self.workers = [ threading.Thread(target = self.worker)
+ for x in xrange(maxthreads) ]
+
+ if results:
+ self.rvqueue = Queue.Queue()
+ else:
+ self.rvqueue = None
+
+ def put(self, callable, *args, **kwargs):
+ self.queue.put((callable, args, kwargs))
+
+ def put_nowait(self, callable, *args, **kwargs):
+ self.queue.put_nowait((callable, args, kwargs))
+
+ def start(self):
+ for thread in self.workers:
+ thread.start()
+
+ def join(self):
+ for thread in self.workers:
+ # That's the shutdown signal
+ self.queue.put(None)
+
+ self.queue.join()
+ for thread in self.workers:
+ thread.join()
+
+ def worker(self):
+ while True:
+ task = self.queue.get()
+ if task is None:
+ self.queue.task_done()
+ break
+
+ try:
+ try:
+ callable, args, kwargs = task
+ rv = callable(*args, **kwargs)
+
+ if self.rvqueue is not None:
+ self.rvqueue.put(rv)
+ finally:
+ self.queue.task_done()
+ except:
+ traceback.print_exc(file = sys.stderr)
+
+ def __iter__(self):
+ if self.rvqueue is not None:
+ while True:
+ try:
+ yield self.rvqueue.get_nowait()
+ except Queue.Empty:
+ self.queue.join()
+ try:
+ yield self.rvqueue.get_nowait()
+ except Queue.Empty:
+ raise StopIteration
+
+
+
import time
import unittest
import re
+import sys
class PlanetLabIntegrationTestCase(unittest.TestCase):
testbed_id = "planetlab"
host1 = "nepi1.pl.sophia.inria.fr"
host2 = "nepi2.pl.sophia.inria.fr"
+ host3 = "nepi3.pl.sophia.inria.fr"
+ host4 = "nepi5.pl.sophia.inria.fr"
def setUp(self):
self.root_dir = tempfile.mkdtemp()
def tearDown(self):
+ return
try:
shutil.rmtree(self.root_dir)
except:
xml = exp.to_xml()
controller = ExperimentController(xml, self.root_dir)
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
- ping_result = controller.trace(app.guid, "stdout")
- comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+ try:
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ ping_result = controller.trace(app.guid, "stdout")
+ comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
--- .* ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
"""
- self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
- "Unexpected trace:\n" + ping_result)
- controller.stop()
- controller.shutdown()
+ self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+ "Unexpected trace:\n" + ping_result)
+
+ finally:
+ controller.stop()
+ controller.shutdown()
+
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_spanning_deployment(self):
+ pl, exp = self.make_experiment_desc()
+
+ from nepi.testbeds import planetlab as plpackage
+
+ nodes = [ pl.create("Node") for i in xrange(4) ]
+ ifaces = [ pl.create("NodeInterface") for node in nodes ]
+ inet = pl.create("Internet")
+ for node, iface in zip(nodes,ifaces):
+ node.connector("devs").connect(iface.connector("node"))
+ iface.connector("inet").connect(inet.connector("devs"))
+
+ apps = []
+ for node in nodes:
+ app = pl.create("Application")
+ app.set_attribute_value("command", "./consts")
+ app.set_attribute_value("buildDepends", "gcc")
+ app.set_attribute_value("build", "gcc ${SOURCES}/consts.c -o consts")
+ app.set_attribute_value("install", "cp consts ${SOURCES}/consts")
+ app.set_attribute_value("sources", os.path.join(
+ os.path.dirname(plpackage.__file__),'scripts','consts.c'))
+ app.enable_trace("stdout")
+ app.enable_trace("stderr")
+ app.enable_trace("buildlog")
+ node.connector("apps").connect(app.connector("node"))
+ apps.append(app)
+
+ comp_result = \
+r""".*ETH_P_ALL = 0x[0-9a-fA-F]{8}
+ETH_P_IP = 0x[0-9a-fA-F]{8}
+TUNGETIFF = 0x[0-9a-fA-F]{8}
+TUNSETIFF = 0x[0-9a-fA-F]{8}
+IFF_NO_PI = 0x[0-9a-fA-F]{8}
+IFF_TAP = 0x[0-9a-fA-F]{8}
+IFF_TUN = 0x[0-9a-fA-F]{8}
+IFF_VNET_HDR = 0x[0-9a-fA-F]{8}
+TUN_PKT_STRIP = 0x[0-9a-fA-F]{8}
+IFHWADDRLEN = 0x[0-9a-fA-F]{8}
+IFNAMSIZ = 0x[0-9a-fA-F]{8}
+IFREQ_SZ = 0x[0-9a-fA-F]{8}
+FIONREAD = 0x[0-9a-fA-F]{8}.*
+"""
+
+ comp_build = r".*(Identity added|gcc).*"
+
+ xml = exp.to_xml()
+
+ controller = ExperimentController(xml, self.root_dir)
+ try:
+ controller.start()
+ while not all(controller.is_finished(app.guid) for app in apps):
+ time.sleep(0.5)
+
+ for app in apps:
+ app_result = controller.trace(app.guid, "stdout") or ""
+ self.assertTrue(re.match(comp_result, app_result, re.MULTILINE),
+ "Unexpected trace:\n" + app_result)
+
+ build_result = controller.trace(app.guid, "buildlog") or ""
+ self.assertTrue(re.match(comp_build, build_result, re.MULTILINE | re.DOTALL),
+ "Unexpected trace:\n" + build_result)
+
+ finally:
+ controller.stop()
+ controller.shutdown()
+
if __name__ == '__main__':
unittest.main()