Initially working version of PlanetLab testbed implementation.
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 22 Apr 2011 11:42:29 +0000 (13:42 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 22 Apr 2011 11:42:29 +0000 (13:42 +0200)
Wiiii

Note: PlanetLab execution tests require a PLC account (password), so they're interactive.

15 files changed:
setup.py
src/nepi/core/execute.py
src/nepi/core/metadata.py
src/nepi/core/testbed_impl.py
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/scripts/consts.c [moved from scripts/consts.c with 100% similarity]
src/nepi/testbeds/planetlab/scripts/tun_connect.py [moved from scripts/tun_connect.py with 100% similarity]
src/nepi/util/proxy.py
src/nepi/util/server.py
test/testbeds/netns/execute.py
test/testbeds/ns3/execute.py
test/testbeds/ns3/execute2.py
test/testbeds/planetlab/execute.py

index 65b2919..8464859 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -19,5 +19,6 @@ setup(
             "nepi.core",
             "nepi.util.parser",
             "nepi.util" ],
-        package_dir = {"": "src"}
+        package_dir = {"": "src"},
+        package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py"] },
     )
index 8aaf1e0..add0909 100644 (file)
@@ -97,7 +97,8 @@ class ConnectorType(object):
 # need a definition!
 class Factory(AttributesMap):
     def __init__(self, factory_id, create_function, start_function, 
-            stop_function, status_function, configure_function,
+            stop_function, status_function, 
+            configure_function, preconfigure_function,
             allow_addresses = False, allow_routes = False):
         super(Factory, self).__init__()
         self._factory_id = factory_id
@@ -108,6 +109,7 @@ class Factory(AttributesMap):
         self._stop_function = stop_function
         self._status_function = status_function
         self._configure_function = configure_function
+        self._preconfigure_function = preconfigure_function
         self._connector_types = dict()
         self._traces = list()
         self._box_attributes = AttributesMap()
@@ -148,6 +150,10 @@ class Factory(AttributesMap):
     def configure_function(self):
         return self._configure_function
 
+    @property
+    def preconfigure_function(self):
+        return self._preconfigure_function
+
     @property
     def traces(self):
         return self._traces
@@ -337,7 +343,8 @@ class ExperimentController(object):
         # perform create-connect in parallel, wait
         # (internal connections only)
         self._parallel([lambda : (testbed.do_create(), 
-                                  testbed.do_connect())
+                                  testbed.do_connect(),
+                                  testbed.do_preconfigure())
                         for testbed in self._testbeds.itervalues()])
         
         # resolve netrefs
index cbe8a8c..2645b8d 100644 (file)
@@ -71,6 +71,15 @@ class VersionedMetadataInfo(object):
         """
         raise NotImplementedError
 
+    @property
+    def preconfigure_order(self):
+        """ list of factory ids that indicates the order in which the elements
+        should be preconfigured.
+        
+        Default: same as configure_order
+        """
+        return self.configure_order
+
     @property
     def factories_info(self):
         """ dictionary of dictionaries of factory specific information
@@ -83,6 +92,9 @@ class VersionedMetadataInfo(object):
                 "start_function": function for element starting,
                 "stop_function": function for element stoping,
                 "status_function": function for retrieving element status,
+                "preconfigure_function": function for element preconfiguration,
+                    (just after connections are made, 
+                    just before netrefs are resolved)
                 "configure_function": function for element configuration,
                 "factory_attributes": list of references to attribute_ids,
                 "box_attributes": list of regerences to attribute_ids,
@@ -145,6 +157,10 @@ class Metadata(object):
     def configure_order(self):
         return self._metadata.configure_order
 
+    @property
+    def preconfigure_order(self):
+        return self._metadata.preconfigure_order
+
     def testbed_attributes(self):
         attributes = AttributesMap()
 
@@ -198,22 +214,17 @@ class Metadata(object):
         from nepi.core.execute import Factory
         factories = list()
         for factory_id, info in self._metadata.factories_info.iteritems():
-            create_function = info["create_function"] \
-                    if "create_function" in info else None
-            start_function = info["start_function"] \
-                    if "start_function" in info else None
-            stop_function = info["stop_function"] \
-                    if "stop_function" in info else None
-            status_function = info["status_function"] \
-                    if "status_function" in info else None
-            configure_function = info["configure_function"] \
-                    if "configure_function" in info else None
-            allow_addresses = info["allow_addresses"] \
-                    if "allow_addresses" in info else False
-            allow_routes = info["allow_routes"] \
-                    if "allow_routes" in info else False
+            create_function = info.get("create_function")
+            start_function = info.get("start_function")
+            stop_function = info.get("stop_function")
+            status_function = info.get("status_function")
+            configure_function = info.get("configure_function")
+            preconfigure_function = info.get("preconfigure_function")
+            allow_addresses = info.get("allow_addresses", False)
+            allow_routes = info.get("allow_routes", False)
             factory = Factory(factory_id, create_function, start_function,
-                    stop_function, status_function, configure_function,
+                    stop_function, status_function, 
+                    configure_function, preconfigure_function,
                     allow_addresses, allow_routes)
                     
             # standard attributes
index 9018257..819fc14 100644 (file)
@@ -211,6 +211,24 @@ class TestbedController(execute.TestbedController):
                     if code_to_connect:
                         code_to_connect(self, element1, element2)
 
+    def do_preconfigure(self):
+        guids = dict()
+        # order guids (elements) according to factory_id
+        for guid, factory_id in self._create.iteritems():
+            if not factory_id in guids:
+               guids[factory_id] = list()
+            guids[factory_id].append(guid)
+        # configure elements following the factory_id order
+        for factory_id in self._metadata.preconfigure_order:
+            # omit the factories that have no element to create
+            if factory_id not in guids:
+                continue
+            factory = self._factories[factory_id]
+            if not factory.preconfigure_function:
+                continue
+            for guid in guids[factory_id]:
+                factory.preconfigure_function(self, guid)
+
     def do_configure(self):
         guids = dict()
         # order guids (elements) according to factory_id
index 38c40a7..ff022c9 100644 (file)
@@ -5,6 +5,10 @@ from constants import TESTBED_ID
 import plcapi
 import operator
 import os
+import os.path
+import nepi.util.server as server
+import cStringIO
+import subprocess
 
 from nepi.util.constants import STATUS_NOT_STARTED, STATUS_RUNNING, \
         STATUS_FINISHED
@@ -19,19 +23,24 @@ class Application(object):
         self.command = None
         self.sudo = False
         
+        self.stdin = None
         self.stdout = None
         self.stderr = None
         
+        # Those are filled when the app is configured
+        self.home_path = None
+        self.ident_path = None
+        self.slicename = None
+        
         # Those are filled when an actual node is connected
         self.node = None
         
         # Those are filled when the app is started
         #   Having both pid and ppid makes it harder
         #   for pid rollover to induce tracking mistakes
+        self._started = False
         self._pid = None
         self._ppid = None
-        self._stdout_path = None
-        self._stderr_path = None
     
     def __str__(self):
         return "%s<command:%s%s>" % (
@@ -41,25 +50,130 @@ class Application(object):
         )
     
     def validate(self):
-        pass
+        if self.home_path is None:
+            raise AssertionError, "Misconfigured application: missing home path"
+        if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
+            raise AssertionError, "Misconfigured application: missing slice SSH key"
+        if self.node is None:
+            raise AssertionError, "Misconfigured application: unconnected node"
+        if self.node.hostname is None:
+            raise AssertionError, "Misconfigured application: misconfigured node"
+        if self.slicename is None:
+            raise AssertionError, "Misconfigured application: unspecified slice"
 
     def start(self):
-        pass
+        # Start process in a "daemonized" way, using nohup and heavy
+        # stdin/out redirection to avoid connection issues
+        (out,err),proc = server.popen_ssh_command(
+            "cd %(home)s ; rm -f ./pid ; ( echo $$ $PPID > ./pid ; %(sudo)s nohup %(command)s > %(stdout)s 2> %(stderr)s < %(stdin)s ) &" % {
+                'home' : server.shell_escape(self.home_path),
+                'command' : self.command,
+                'stdout' : 'stdout' if self.stdout else '/dev/null' ,
+                'stderr' : 'stderr' if self.stderr else '/dev/null' ,
+                'stdin' : 'stdin' if self.stdin is not None else '/dev/null' ,
+                'sudo' : 'sudo' if self.sudo else '',
+            },
+            host = self.node.hostname,
+            port = None,
+            user = self.slicename,
+            agent = None,
+            ident_key = self.ident_path
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+        self._started = True
+
+    def checkpid(self):            
+        # Get PID/PPID
+        # NOTE: wait a bit for the pidfile to be created
+        if self._started and not self._pid or not self._ppid:
+            (out,err),proc = server.popen_ssh_command(
+                "cat %(pidfile)s" % {
+                    'pidfile' : server.shell_escape(os.path.join(self.home_path,'pid')),
+                },
+                host = self.node.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path
+                )
+            if out:
+                try:
+                    self._pid, self._ppid = map(int,out.strip().split(' ',1))
+                except:
+                    # Ignore, many ways to fail that don't matter that much
+                    pass
     
     def status(self):
-        return STATUS_FINISHED
+        self.checkpid()
+        if not self._started:
+            return STATUS_NOT_STARTED
+        elif not self._pid or not self._ppid:
+            return STATUS_NOT_STARTED
+        else:
+            (out,err),proc = server.popen_ssh_command(
+                "ps --ppid $(ppid)d -o pid | grep -c $(pid)d" % {
+                    'ppid' : self._ppid,
+                    'pid' : self._pid,
+                },
+                host = self.node.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path
+                )
+            
+            status = False
+            if out:
+                try:
+                    status = bool(int(out.strip()))
+                except:
+                    # Ignore, many ways to fail that don't matter that much
+                    pass
+            return STATUS_RUNNING if status else STATUS_FINISHED
     
     def kill(self):
         status = self.status()
         if status == STATUS_RUNNING:
-            # TODO: kill by pid & ppid
-            pass
+            # kill by ppid+pid - SIGTERM first, then try SIGKILL
+            (out,err),proc = server.popen_ssh_command(
+                """
+kill $(pid)d $(ppid)d 
+for x in 1 2 3 4 5 6 7 8 9 0 ; do 
+    sleep 0.1 
+    if [ `ps --pid $(ppid)d -o pid | grep -c $(pid)d` == `0` ]; then
+        break
+    fi
+    sleep 0.9
+done
+if [ `ps --pid $(ppid)d -o pid | grep -c $(pid)d` != `0` ]; then
+    kill -9 $(pid)d $(ppid)d
+fi
+""" % {
+                    'ppid' : self._ppid,
+                    'pid' : self._pid,
+                },
+                host = self.node.hostname,
+                port = None,
+                user = self.slicename,
+                agent = None,
+                ident_key = self.ident_path
+                )
+            
+            status = False
+            if out:
+                try:
+                    status = bool(int(out.strip()))
+                except:
+                    # Ignore, many ways to fail that don't matter that much
+                    pass
+            return STATUS_RUNNING if status else STATUS_FINISHED
     
     def remote_trace_path(self, whichtrace):
-        if whichtrace == 'stdout':
-            tracefile = self._stdout_path
-        elif whichtrace == 'stderr':
-            tracefile = self._stderr_path
+        if whichtrace in ('stdout','stderr'):
+            tracefile = os.path.join(self.home_path, whichtrace)
         else:
             tracefile = None
         
@@ -70,13 +184,61 @@ class Application(object):
         if not tracefile:
             return None
         
-        local_path = os.join(local_dir, tracefile)
+        local_path = os.path.join(local_dir, tracefile)
+        
+        # create parent local folders
+        proc = subprocess.Popen(
+            ["mkdir", "-p", os.path.dirname(local_path)],
+            stdout = open("/dev/null","w"),
+            stdin = open("/dev/null","r"))
+
+        if proc.wait():
+            raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
         
-        # TODO: sync files
-        f = open(local_path, "w")
-        f.write("BLURP!")
-        f.close()
+        # sync files
+        (out,err),proc = server.popen_scp(
+            '%s@%s:%s' % (self.slicename, self.node.hostname, 
+                tracefile),
+            local_path,
+            port = None,
+            agent = None,
+            ident_key = self.ident_path
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
         
         return local_path
     
 
+    def setup(self):
+        # Make sure all the paths are created where 
+        # they have to be created for deployment
+        (out,err),proc = server.popen_ssh_command(
+            "mkdir -p %s" % (server.shell_escape(self.home_path),),
+            host = self.node.hostname,
+            port = None,
+            user = self.slicename,
+            agent = None,
+            ident_key = self.ident_path
+            )
+        
+        if proc.wait():
+            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+        
+        
+        if self.stdin:
+            # Write program input
+            (out,err),proc = server.popen_scp(
+                cStringIO.StringIO(self.stdin),
+                '%s@%s:%s' % (self.slicename, self.node.hostname, 
+                    os.path.join(self.home_path, 'stdin') ),
+                port = None,
+                agent = None,
+                ident_key = self.ident_path
+                )
+            
+            if proc.wait():
+                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+        
index a358aaf..857b2a4 100644 (file)
@@ -55,6 +55,8 @@ class TestbedController(testbed_impl.TestbedController):
             get_attribute_value("authUser")
         self.authString = self._attributes.\
             get_attribute_value("authPass")
+        self.sliceSSHKey = self._attributes.\
+            get_attribute_value("sliceSSHKey")
 
     def do_create(self):
         # Create node elements per XML data
@@ -135,7 +137,8 @@ class TestbedController(testbed_impl.TestbedController):
         for trace in self._traces.values():
             trace.close()
         for element in self._elements.values():
-            element.destroy()
+            pass
+            #element.destroy()
 
     def trace(self, guid, trace_id, attribute='value'):
         app = self._elements[guid]
index 9110029..3aacdbf 100644 (file)
@@ -82,7 +82,7 @@ def start_application(testbed_instance, guid):
 
 def stop_application(testbed_instance, guid):
     app = testbed_instance.elements[guid]
-    app.stop()
+    app.kill()
 
 ### Status functions ###
 
@@ -129,16 +129,30 @@ def configure_tuniface(testbed_instance, guid):
     element.validate()
 
 def configure_node(testbed_instance, guid):
-    element = testbed_instance._elements[guid]
+    node = testbed_instance._elements[guid]
     
     # If we have only one candidate, simply use it
-    candidates = element.find_candidates(
+    candidates = node.find_candidates(
         filter_slice_id = testbed_instance.slice_id)
     if len(candidates) == 1:
-        element.assign_node_id(iter(candidates).next())
+        node.assign_node_id(iter(candidates).next())
     
     # Do some validations
-    element.validate()
+    node.validate()
+
+def configure_application(testbed_instance, guid):
+    app = testbed_instance._elements[guid]
+    
+    # Just inject configuration stuff
+    app.home_path = "nepi-app-%s" % (guid,)
+    app.ident_path = testbed_instance.sliceSSHKey
+    app.slicename = testbed_instance.slicename
+    
+    # Do some validations
+    app.validate()
+    
+    # Install stuff
+    app.setup()
 
 ### Factory information ###
 
@@ -358,7 +372,7 @@ traces = dict({
     "stderr": dict({
                 "name": "stderr",
                 "help": "Application standard error",
-        }) 
+              }) 
     })
 
 create_order = [ INTERNET, NODE, NODEIFACE, TUNIFACE, APPLICATION ]
@@ -371,7 +385,7 @@ factories_info = dict({
             "help": "Virtualized Node (V-Server style)",
             "category": "topology",
             "create_function": create_node,
-            "configure_function": configure_node,
+            "preconfigure_function": configure_node,
             "box_attributes": [
                 "forward_X11",
                 "hostname",
@@ -391,7 +405,7 @@ factories_info = dict({
             "help": "External network interface - they cannot be brought up or down, and they MUST be connected to the internet.",
             "category": "devices",
             "create_function": create_nodeiface,
-            "configure_function": configure_nodeiface,
+            "preconfigure_function": configure_nodeiface,
             "box_attributes": [ ],
             "connector_types": ["node", "inet"]
         }),
@@ -400,7 +414,7 @@ factories_info = dict({
             "help": "Virtual TUN network interface",
             "category": "devices",
             "create_function": create_tuniface,
-            "configure_function": configure_tuniface,
+            "preconfigure_function": configure_tuniface,
             "box_attributes": [
                 "up", "device_name", "mtu", "snat",
             ],
@@ -413,7 +427,8 @@ factories_info = dict({
             "start_function": start_application,
             "status_function": status_application,
             "stop_function": stop_application,
-            "box_attributes": ["command", "sudo"],
+            "configure_function": configure_application,
+            "box_attributes": ["command", "sudo", "stdin"],
             "connector_types": ["node"],
             "traces": ["stdout", "stderr"]
         }),
@@ -447,6 +462,17 @@ testbed_attributes = dict({
             "flags": Attribute.DesignOnly | Attribute.HasNoDefaultValue,
             "validation_function": validation.is_string
         }),
+        "slice_ssh_key": dict({
+            "name": "sliceSSHKey",
+            "help": "The controller-local path to the slice user's ssh private key. "
+                    "It is the user's responsability to deploy this file where the controller "
+                    "will run, it won't be done automatically because it's sensitive information. "
+                    "It is recommended that a NEPI-specific user be created for this purpose and "
+                    "this purpose alone.",
+            "type": Attribute.STRING,
+            "flags": Attribute.DesignOnly | Attribute.HasNoDefaultValue,
+            "validation_function": validation.is_string
+        }),
     })
 
 class VersionedMetadataInfo(metadata.VersionedMetadataInfo):
index 574ef6b..b075efa 100644 (file)
@@ -45,6 +45,7 @@ GUIDS  = 27
 GET_ROUTE = 28
 GET_ADDRESS = 29
 RECOVER = 30
+DO_PRECONFIGURE    = 31
 
 # PARAMETER TYPE
 STRING  =  100
@@ -83,6 +84,7 @@ testbed_messages = dict({
     DO_CREATE:  "%d" % DO_CREATE,
     DO_CONNECT: "%d" % DO_CONNECT,
     DO_CONFIGURE:   "%d" % DO_CONFIGURE,
+    DO_PRECONFIGURE:   "%d" % DO_PRECONFIGURE,
     DO_CROSS_CONNECT:   "%d" % DO_CROSS_CONNECT,
     GET:    "%d|%s" % (GET, "%s|%d|%s"),
     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
@@ -117,6 +119,7 @@ instruction_text = dict({
     DO_CREATE:  "DO_CREATE",
     DO_CONNECT: "DO_CONNECT",
     DO_CONFIGURE:   "DO_CONFIGURE",
+    DO_PRECONFIGURE:   "DO_PRECONFIGURE",
     DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
     GET:    "GET",
     SET:    "SET",
@@ -365,6 +368,8 @@ class TestbedControllerServer(server.Server):
                     reply = self.do_connect(params)
                 elif instruction == DO_CONFIGURE:
                     reply = self.do_configure(params)
+                elif instruction == DO_PRECONFIGURE:
+                    reply = self.do_preconfigure(params)
                 elif instruction == DO_CROSS_CONNECT:
                     reply = self.do_cross_connect(params)
                 elif instruction == GET:
@@ -511,6 +516,10 @@ class TestbedControllerServer(server.Server):
         self._testbed.do_configure()
         return "%d|%s" % (OK, "")
 
+    def do_preconfigure(self, params):
+        self._testbed.do_preconfigure()
+        return "%d|%s" % (OK, "")
+
     def do_cross_connect(self, params):
         self._testbed.do_cross_connect()
         return "%d|%s" % (OK, "")
@@ -867,6 +876,16 @@ class TestbedControllerProxy(object):
         if code == ERROR:
             raise RuntimeError(text)
 
+    def do_preconfigure(self):
+        msg = testbed_messages[DO_PRECONFIGURE]
+        self._client.send_msg(msg)
+        reply = self._client.read_reply()
+        result = reply.split("|")
+        code = int(result[0])
+        text = base64.b64decode(result[1])
+        if code == ERROR:
+            raise RuntimeError(text)
+
     def do_cross_connect(self):
         msg = testbed_messages[DO_CROSS_CONNECT]
         self._client.send_msg(msg)
index 963ac14..c1558d1 100644 (file)
@@ -13,6 +13,7 @@ import threading
 import time
 import traceback
 import signal
+import re
 
 CTRL_SOCK = "ctrl.sock"
 STD_ERR = "stderr.log"
@@ -28,6 +29,20 @@ if hasattr(os, "devnull"):
 else:
     DEV_NULL = "/dev/null"
 
+
+
+SHELL_SAFE = re.compile('[-a-zA-Z0-9_=+:.,/]*')
+
+def shell_escape(s):
+    """ Escapes strings so that they are safe to use as command-line arguments """
+    if SHELL_SAFE.match(s):
+        # safe string - no escaping needed
+        return s
+    else:
+        # unsafe string - escape
+        s = s.replace("'","\\'")
+        return "'%s'" % (s,)
+
 class Server(object):
     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
         self._root_dir = root_dir
@@ -325,8 +340,178 @@ class Client(object):
         encoded = data.rstrip() 
         return base64.b64decode(encoded)
 
+def popen_ssh_command(command, host, port, user, agent, 
+            stdin="", 
+            ident_key = None):
+        """
+        Executes a remote commands, returns ((stdout,stderr),process)
+        """
+        args = ['ssh',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-l', user, host]
+        if agent:
+            args.append('-A')
+        if port:
+            args.append('-p%d' % port)
+        if ident_key:
+            args.extend(('-i', ident_key))
+        args.append(command)
+
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        return (proc.communicate(stdin), proc)
+def popen_scp(source, dest, port, agent, 
+            recursive = False,
+            ident_key = None):
+        """
+        Copies from/to remote sites.
+        
+        Source and destination should have the user and host encoded
+        as per scp specs.
+        
+        If source is a file object, a special mode will be used to
+        create the remote file with the same contents.
+        
+        If dest is a file object, the remote file (source) will be
+        read and written into dest.
+        
+        In these modes, recursive cannot be True.
+        """
+        
+        if isinstance(source, file) or isinstance(dest, file) \
+                or hasattr(source, 'read')  or hasattr(dest, 'write'):
+            assert not resursive
+            
+            # Parse destination as <user>@<server>:<path>
+            tgtspec, path = dest.split(':',1)
+            user,host = tgtspec.rsplit('@',1)
+            
+            args = ['ssh', '-l', user, '-C',
+                    # Don't bother with localhost. Makes test easier
+                    '-o', 'NoHostAuthenticationForLocalhost=yes' ]
+            if port:
+                args.append('-P%d' % port)
+            if ident_key:
+                args.extend(('-i', ident_key))
+            
+            if isinstance(source, file) or hasattr(source, 'read'):
+                args.append('cat > %s' % (shell_escape(path),))
+            elif isinstance(dest, file) or hasattr(dest, 'write'):
+                args.append('cat %s' % (shell_escape(path),))
+            else:
+                raise AssertionError, "Unreachable code reached! :-Q"
+            
+            # connects to the remote host and starts a remote connection
+            if isinstance(source, file):
+                proc = subprocess.Popen(args, 
+                        stdout = open('/dev/null','w'),
+                        stderr = subprocess.PIPE,
+                        stdin = source)
+                err = proc.stderr.read()
+                proc.wait()
+                return ((None,err), proc)
+            elif isinstance(dest, file):
+                proc = subprocess.Popen(args, 
+                        stdout = open('/dev/null','w'),
+                        stderr = subprocess.PIPE,
+                        stdin = source)
+                err = proc.stderr.read()
+                proc.wait()
+                return ((None,err), proc)
+            elif hasattr(source, 'read'):
+                # file-like (but not file) source
+                proc = subprocess.Popen(args, 
+                        stdout = open('/dev/null','w'),
+                        stderr = subprocess.PIPE,
+                        stdin = source)
+                
+                buf = None
+                err = []
+                while True:
+                    if not buf:
+                        buf = source.read(4096)
+                    
+                    rdrdy, wrdy, broken = os.select(
+                        [proc.stderr],
+                        [proc.stdin],
+                        [proc.stderr,proc.stdin])
+                    
+                    if proc.stderr in rdrdy:
+                        # use os.read for fully unbuffered behavior
+                        err.append(os.read(proc.stderr.fileno(), 4096))
+                    
+                    if proc.stdin in wrdy:
+                        proc.stdin.write(buf)
+                        buf = None
+                    
+                    if broken:
+                        break
+                err.append(proc.stderr.read())
+                    
+                proc.wait()
+                return ((None,''.join(err)), proc)
+            elif hasattr(dest, 'write'):
+                # file-like (but not file) dest
+                proc = subprocess.Popen(args, 
+                        stdout = open('/dev/null','w'),
+                        stderr = subprocess.PIPE,
+                        stdin = source)
+                
+                buf = None
+                err = []
+                while True:
+                    rdrdy, wrdy, broken = os.select(
+                        [proc.stderr, proc.stdout],
+                        [],
+                        [proc.stderr, proc.stdout])
+                    
+                    if proc.stderr in rdrdy:
+                        # use os.read for fully unbuffered behavior
+                        err.append(os.read(proc.stderr.fileno(), 4096))
+                    
+                    if proc.stdout in rdrdy:
+                        # use os.read for fully unbuffered behavior
+                        dest.write(os.read(proc.stdout.fileno(), 4096))
+                    
+                    if broken:
+                        break
+                err.append(proc.stderr.read())
+                    
+                proc.wait()
+                return ((None,''.join(err)), proc)
+            else:
+                raise AssertionError, "Unreachable code reached! :-Q"
+        else:
+            # plain scp
+            args = ['scp', '-q', '-p', '-C',
+                    # Don't bother with localhost. Makes test easier
+                    '-o', 'NoHostAuthenticationForLocalhost=yes' ]
+            if port:
+                args.append('-P%d' % port)
+            if recursive:
+                args.append('-r')
+            if ident_key:
+                args.extend(('-i', ident_key))
+            args.append(source)
+            args.append(dest)
+
+            # connects to the remote host and starts a remote connection
+            proc = subprocess.Popen(args, 
+                    stdout = subprocess.PIPE,
+                    stdin = subprocess.PIPE, 
+                    stderr = subprocess.PIPE)
+            comm = proc.communicate()
+            proc.wait()
+            return (comm, proc)
 def popen_ssh_subprocess(python_code, host, port, user, agent, 
-        python_path = None):
+        python_path = None,
+        ident_key = None):
         if python_path:
             python_path.replace("'", r"'\''")
             cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
@@ -358,6 +543,8 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
             args.append('-A')
         if port:
             args.append('-p%d' % port)
+        if ident_key:
+            args.extend(('-i', ident_key))
         args.append(cmd)
 
         # connects to the remote host and starts a remote rpyc connection
index f1d51b4..f8bce35 100755 (executable)
@@ -44,6 +44,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.do_setup()
         instance.do_create()
         instance.do_connect()
+        instance.do_preconfigure()
         instance.do_configure()
         instance.start()
         while instance.status(7) != STATUS_FINISHED:
@@ -84,6 +85,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.do_setup()
         instance.do_create()
         instance.do_connect()
+        instance.do_preconfigure()
         instance.do_configure()
         instance.start()
         while instance.status(6) != STATUS_FINISHED:
@@ -143,6 +145,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.do_setup()
         instance.do_create()
         instance.do_connect()
+        instance.do_preconfigure()
         instance.do_configure()
         instance.start()
         while instance.status(11) != STATUS_FINISHED:
index fdf315c..3722bc7 100755 (executable)
@@ -65,6 +65,7 @@ class Ns3ExecuteTestCase(unittest.TestCase):
         instance.do_setup()
         instance.do_create()
         instance.do_connect()
+        instance.do_preconfigure()
         instance.do_configure()
         instance.start()
         while instance.status(17) != STATUS_FINISHED:
index 0677348..31d8481 100755 (executable)
@@ -101,6 +101,7 @@ class Ns3ExecuteTestCase(unittest.TestCase):
         instance.do_setup()
         instance.do_create()
         instance.do_connect()
+        instance.do_preconfigure()
         instance.do_configure()
         instance.start()
         while instance.status(27) != STATUS_FINISHED:
index 3830f09..407ff4b 100755 (executable)
@@ -21,9 +21,11 @@ class NetnsExecuteTestCase(unittest.TestCase):
     def test_simple(self):
         testbed_version = "01"
         instance = planetlab.TestbedController(testbed_version)
+        slicename = "inria_nepi12"
         
         instance.defer_configure("homeDirectory", self.root_dir)
-        instance.defer_configure("slice", "inria_nepi12")
+        instance.defer_configure("slice", slicename)
+        instance.defer_configure("sliceSSHKey", "/user/%s/home/.ssh/id_rsa_planetlab" % (getpass.getuser(),))
         instance.defer_configure("authUser", "claudio-daniel.freire@inria.fr")
         instance.defer_configure("authPass", getpass.getpass())
         
@@ -46,6 +48,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.do_setup()
         instance.do_create()
         instance.do_connect()
+        instance.do_preconfigure()
         instance.do_configure()
         
         # Manually replace netref