server daemon launched over ssh connection.
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 16 Mar 2011 22:15:20 +0000 (23:15 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 16 Mar 2011 22:15:20 +0000 (23:15 +0100)
src/nepi/util/proxy.py
src/nepi/util/server.py
test/core/integration.py
test/lib/test_util.py
test/testbeds/netns/execute.py
test/testbeds/netns/integration.py
test/util/server.py

index e3d6edb..c3b98b4 100644 (file)
@@ -5,7 +5,9 @@ import base64
 from nepi.core.attributes import AttributesMap, Attribute
 from nepi.util import server, validation
 from nepi.util.constants import TIME_NOW
+import getpass
 import sys
+import time
 
 # PROTOCOL REPLIES
 OK = 0
@@ -48,7 +50,7 @@ FLOAT   = 103
 # EXPERIMENT CONTROLER PROTOCOL MESSAGES
 controller_messages = dict({
     XML:    "%d" % XML,
-    ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"),
+    ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
     TRACE:  "%d|%s" % (TRACE, "%d|%d|%s"),
     FINISHED:   "%d|%s" % (FINISHED, "%d"),
     START:  "%d" % START,
@@ -153,6 +155,38 @@ def log_reply(server, reply):
     server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
             code_txt, txt))
 
+def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
+    # launch daemon
+    proc = server.popen_ssh_subprocess(python_code, host = host,
+        port = port, user = user, agent = agent)
+    #while not proc.poll():
+    #    time.sleep(0.5)
+    if proc.poll():
+        err = proc.stderr.read()
+        raise RuntimeError("Client could not be executed: %s" % \
+                err)
+    # create client
+    return server.Client(root_dir, host = host, port = port, user = user, 
+            agent = agent)
+
+def to_server_log_level(log_level):
+    return server.DEBUG_LEVEL \
+            if log_level == AccessConfiguration.DEBUG_LEVEL \
+                else server.ERROR_LEVEL
+
+def get_access_config_params(access_config):
+    root_dir = access_config.get_attribute_value("rootDirectory")
+    log_level = access_config.get_attribute_value("logLevel")
+    log_level = to_server_log_level(log_level)
+    user = host = port = agent = None
+    communication = access_config.get_attribute_value("communication")
+    if communication == AccessConfiguration.ACCESS_SSH:
+        user = access_config.get_attribute_value("user")
+        host = access_config.get_attribute_value("host")
+        port = access_config.get_attribute_value("port")
+        agent = access_config.get_attribute_value("useAgent")
+    return (root_dir, log_level, user, host, port, agent)
+
 class AccessConfiguration(AttributesMap):
     MODE_SINGLE_PROCESS = "SINGLE"
     MODE_DAEMON = "DAEMON"
@@ -185,6 +219,7 @@ class AccessConfiguration(AttributesMap):
         self.add_attribute(name = "user",
                 help = "User on the Host to execute the testbed",
                 type = Attribute.STRING,
+                value = getpass.getuser(),
                 validation_function = validation.is_string)
         self.add_attribute(name = "port",
                 help = "Port on the Host",
@@ -210,28 +245,32 @@ class AccessConfiguration(AttributesMap):
                 validation_function = validation.is_enum)
 
 def create_controller(xml, access_config = None):
-    mode = None if not access_config else access_config.get_attribute_value("mode")
+    mode = None if not access_config else \
+            access_config.get_attribute_value("mode")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
         from nepi.core.execute import ExperimentController
         return ExperimentController(xml)
     elif mode == AccessConfiguration.MODE_DAEMON:
-        root_dir = access_config.get_attribute_value("rootDirectory")
-        log_level = access_config.get_attribute_value("logLevel")
-        return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml)
+        (root_dir, log_level, user, host, port, agent) = \
+                get_access_config_params(access_config)
+        return ExperimentControllerProxy(root_dir, log_level,
+                experiment_xml = xml, host = host, port = port, user = user, 
+                agent = agent)
     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
 
 def create_testbed_instance(testbed_id, testbed_version, access_config):
     mode = None if not access_config else access_config.get_attribute_value("mode")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
-        return  _build_testbed_testbed(testbed_id, testbed_version)
+        return  _build_testbed_instance(testbed_id, testbed_version)
     elif mode == AccessConfiguration.MODE_DAEMON:
-        root_dir = access_config.get_attribute_value("rootDirectory")
-        log_level = access_config.get_attribute_value("logLevel")
+        (root_dir, log_level, user, host, port, agent) = \
+                get_access_config_params(access_config)
         return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, 
-                testbed_version = testbed_version)
+                testbed_version = testbed_version, host = host, port = port,
+                user = user, agent = agent)
     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
 
-def _build_testbed_testbed(testbed_id, testbed_version):
+def _build_testbed_instance(testbed_id, testbed_version):
     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
     if not mod_name in sys.modules:
         __import__(mod_name)
@@ -239,14 +278,14 @@ def _build_testbed_testbed(testbed_id, testbed_version):
     return module.TestbedInstance(testbed_version)
 
 class TestbedInstanceServer(server.Server):
-    def __init__(self, root_dir, testbed_id, testbed_version):
-        super(TestbedInstanceServer, self).__init__(root_dir)
+    def __init__(self, root_dir, log_level, testbed_id, testbed_version):
+        super(TestbedInstanceServer, self).__init__(root_dir, log_level)
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
         self._testbed = None
 
     def post_daemonize(self):
-        self._testbed = _build_testbed_testbed(self._testbed_id, 
+        self._testbed = _build_testbed_instance(self._testbed_id, 
                 self._testbed_version)
 
     def reply_action(self, msg):
@@ -468,8 +507,8 @@ class TestbedInstanceServer(server.Server):
         return "%d|%s" % (OK, result)
  
 class ExperimentControllerServer(server.Server):
-    def __init__(self, root_dir, experiment_xml):
-        super(ExperimentControllerServer, self).__init__(root_dir)
+    def __init__(self, root_dir, log_level, experiment_xml):
+        super(ExperimentControllerServer, self).__init__(root_dir, log_level)
         self._experiment_xml = experiment_xml
         self._controller = None
 
@@ -522,6 +561,7 @@ class ExperimentControllerServer(server.Server):
         port = int(params[6])
         root_dir = params[7]
         use_agent = params[8] == "True"
+        log_level = params[9]
         access_config = AccessConfiguration()
         access_config.set_attribute_value("mode", mode)
         access_config.set_attribute_value("communication", communication)
@@ -530,6 +570,7 @@ class ExperimentControllerServer(server.Server):
         access_config.set_attribute_value("port", port)
         access_config.set_attribute_value("rootDirectory", root_dir)
         access_config.set_attribute_value("useAgent", use_agent)
+        access_config.set_attribute_value("logLevel", log_level)
         self._controller.set_access_configuration(testbed_guid, 
                 access_config)
         return "%d|%s" % (OK, "")
@@ -562,18 +603,28 @@ class ExperimentControllerServer(server.Server):
 
 class TestbedIntanceProxy(object):
     def __init__(self, root_dir, log_level, testbed_id = None, 
-            testbed_version = None, launch = True):
+            testbed_version = None, launch = True, host = None, 
+            port = None, user = None, agent = None):
         if launch:
             if testbed_id == None or testbed_version == None:
                 raise RuntimeError("To launch a TesbedInstance server a \
                         testbed_id and testbed_version are required")
-            # launch daemon
-            s = TestbedInstanceServer(root_dir, testbed_id, testbed_version)
-            if log_level == AccessConfiguration.DEBUG_LEVEL:
-                s.set_debug_log_level()
-            s.run()
-        # create_client
-        self._client = server.Client(root_dir)
+            # ssh
+            if host != None:
+                python_code = "from nepi.util.proxy import \
+                        TesbedInstanceServer;\
+                        s = TestbedInstanceServer('%s', %d, '%s', '%s');\
+                        s.run()" % (root_dir, log_level, testbed_id, 
+                                testbed_version)
+                self._client = launch_ssh_daemon_client(root_dir, python_code,
+                        host, port, user, agent)
+            else:
+                # launch daemon
+                s = TestbedInstanceServer(root_dir, log_level, testbed_id, 
+                    testbed_version)
+                s.run()
+                # create client
+                self._client = server.Client(root_dir)
 
     @property
     def guids(self):
@@ -850,18 +901,31 @@ class TestbedIntanceProxy(object):
         self._client.send_stop()
 
 class ExperimentControllerProxy(object):
-    def __init__(self, root_dir, log_level, experiment_xml = None, launch = True):
+    def __init__(self, root_dir, log_level, experiment_xml = None, 
+            launch = True, host = None, port = None, user = None, 
+            agent = None):
         if launch:
+            # launch server
             if experiment_xml == None:
                 raise RuntimeError("To launch a ExperimentControllerServer a \
                         xml description of the experiment is required")
-            # launch daemon
-            s = ExperimentControllerServer(root_dir, experiment_xml)
-            if log_level == AccessConfiguration.DEBUG_LEVEL:
-                s.set_debug_log_level()
-            s.run()
-        # create_client
-        self._client = server.Client(root_dir)
+            # ssh
+            if host != None:
+                xml = experiment_xml
+                xml = xml.replace("'", r"\'")
+                xml = xml.replace("\"", r"\'")
+                xml = xml.replace("\n", r"")
+                python_code = "from nepi.util.proxy import ExperimentControllerServer;\
+                        s = ExperimentControllerServer('%s', %d, '%s');\
+                        s.run()" % (root_dir, log_level, xml)
+                self._client = launch_ssh_daemon_client(root_dir, python_code,
+                        host, port, user, agent)
+            else:
+                # launch daemon
+                s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
+                s.run()
+                # create client
+                self._client = server.Client(root_dir)
 
     @property
     def experiment_xml(self):
@@ -883,9 +947,10 @@ class ExperimentControllerProxy(object):
         port = access_config.get_attribute_value("port")
         root_dir = access_config.get_attribute_value("rootDirectory")
         use_agent = access_config.get_attribute_value("useAgent")
+        log_level = access_config.get_attribute_value("logLevel")
         msg = controller_messages[ACCESS]
         msg = msg % (testbed_guid, mode, communication, host, user, port, 
-                root_dir, use_agent)
+                root_dir, use_agent, log_level)
         self._client.send_msg(msg)
         reply = self._client.read_reply()
         result = reply.split("|")
index 8a3dbed..2e02c74 100644 (file)
@@ -4,12 +4,13 @@
 import base64
 import errno
 import os
+import resource
 import select
 import socket
 import sys
 import subprocess
 import threading
-from time import strftime
+import time
 import traceback
 
 CTRL_SOCK = "ctrl.sock"
@@ -21,13 +22,17 @@ STOP_MSG = "STOP"
 ERROR_LEVEL = 0
 DEBUG_LEVEL = 1
 
+if hasattr(os, "devnull"):
+    DEV_NULL = os.devnull
+else:
+    DEV_NULL = "/dev/null"
+
 class Server(object):
-    def __init__(self, root_dir = "."):
+    def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
         self._root_dir = root_dir
         self._stop = False
         self._ctrl_sock = None
-        self._stderr = None
-        self._log_level = ERROR_LEVEL
+        self._log_level = log_level
 
     def run(self):
         try:
@@ -77,7 +82,10 @@ class Server(object):
             os._exit(0)
 
         # close all open file descriptors.
-        for fd in range(3, MAX_FD):
+        max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+        if (max_fd == resource.RLIM_INFINITY):
+            max_fd = MAX_FD
+        for fd in range(3, max_fd):
             if fd != w:
                 try:
                     os.close(fd)
@@ -85,11 +93,14 @@ class Server(object):
                     pass
 
         # Redirect standard file descriptors.
-        self._stderr = stdout = file(STD_ERR, "a", 0)
-        stdin = open('/dev/null', 'r')
+        stdin = open(DEV_NULL, "r")
+        stderr = stdout = open(STD_ERR, "a", 0)
         os.dup2(stdin.fileno(), sys.stdin.fileno())
+        # NOTE: sys.stdout.write will still be buffered, even if the file
+        # was opened with 0 buffer
         os.dup2(stdout.fileno(), sys.stdout.fileno())
-        os.dup2(self._stderr.fileno(), sys.stderr.fileno())
+        os.dup2(stderr.fileno(), sys.stderr.fileno())
+
         # let the parent process know that the daemonization is finished
         os.write(w, "\n")
         os.close(w)
@@ -152,22 +163,16 @@ class Server(object):
     def reply_action(self, msg):
         return "Reply to: %s" % msg
 
-    def set_error_log_level(self):
-        self._log_level = ERROR_LEVEL
-
-    def set_debug_log_level(self):
-        self._log_level = DEBUG_LEVEL
-
     def log_error(self, text = None):
         if text == None:
             text = traceback.format_exc()
-        date = strftime("%Y-%m-%d %H:%M:%S")
+        date = time.strftime("%Y-%m-%d %H:%M:%S")
         sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
         return text
 
     def log_debug(self, text):
         if self._log_level == DEBUG_LEVEL:
-            date = strftime("%Y-%m-%d %H:%M:%S")
+            date = time.strftime("%Y-%m-%d %H:%M:%S")
             sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
 
 class Forwarder(object):
@@ -190,6 +195,7 @@ class Forwarder(object):
 
     def write_data(self, data):
         sys.stdout.write(data)
+        # sys.stdout.write is buffered, for this we need to do a flush()
         sys.stdout.flush()
 
     def send_to_server(self, data):
@@ -234,14 +240,20 @@ class Forwarder(object):
             pass
 
 class Client(object):
-    def __init__(self, root_dir = "."):
-        self._process = subprocess.Popen(
-                ["python", "-c", 
-                "from nepi.util import server;c=server.Forwarder('%s');\
-                        c.forward()" % root_dir
-                ],
-                stdin = subprocess.PIPE, 
-                stdout = subprocess.PIPE)
+    def __init__(self, root_dir = ".", host = None, port = None, user = None, 
+            agent = None):
+        python_code = "from nepi.util import server;c=server.Forwarder('%s');\
+                c.forward()" % root_dir
+        if host != None:
+            self._process = popen_ssh_subprocess(python_code, host, port, 
+                    user, agent)
+        else:
+            self._process = subprocess.Popen(
+                    ["python", "-c", python_code],
+                    stdin = subprocess.PIPE, 
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.PIPE
+                )
 
     def send_msg(self, msg):
         encoded = base64.b64encode(msg)
@@ -256,3 +268,51 @@ class Client(object):
         encoded = data.rstrip() 
         return base64.b64decode(encoded)
 
+def popen_ssh_subprocess(python_code, host, port, user, agent, 
+        python_path = None):
+        if python_path:
+            python_path.replace("'", r"'\''")
+            cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
+        else:
+            cmd = ""
+        # Uncomment for debug (to run everything under strace)
+        # We had to verify if strace works (cannot nest them)
+        #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
+        #cmd += "$CMD "
+        #if self.mode == MODE_SSH:
+        #    cmd += "strace -f -tt -s 200 -o strace$$.out "
+        cmd += "python -c '"
+        cmd += "import base64, os\n"
+        cmd += "cmd = \"\"\n"
+        cmd += "while True:\n"
+        cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
+        cmd += " if cmd[-1] == \"\\n\": break\n"
+        cmd += "cmd = base64.b64decode(cmd)\n"
+        # Uncomment for debug
+        #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
+        cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
+        cmd += "exec(cmd)\n'"
+
+        args = ['ssh',
+                # Don't bother with localhost. Makes test easier
+                '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-l', user, host]
+        if agent:
+            args.append('-A')
+        if port:
+            args.append('-p%d' % port)
+        args.append(cmd)
+
+        # connects to the remote host and starts a remote rpyc connection
+        proc = subprocess.Popen(args, 
+                stdout = subprocess.PIPE,
+                stdin = subprocess.PIPE, 
+                stderr = subprocess.PIPE)
+        # send the command to execute
+        os.write(proc.stdin.fileno(),
+                base64.b64encode(python_code) + "\n")
+        msg = os.read(proc.stdout.fileno(), 3)
+        if msg != "OK\n":
+            raise RuntimeError("Failed to start remote python interpreter")
+        return proc
index 069d5c5..3830f03 100755 (executable)
@@ -9,17 +9,16 @@ import mock.metadata_v01
 import os
 import shutil
 import sys
+import tempfile
+import test_util
 import time
 import unittest
-import uuid
 
 class ExecuteTestCase(unittest.TestCase):
     def setUp(self):
         sys.modules["nepi.testbeds.mock.metadata_v01"] = mock.metadata_v01
         sys.modules["nepi.testbeds.mock"] = mock
-        self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", 
-                str(uuid.uuid1()))
-        os.makedirs(self._root_dir)
+        self.root_dir = tempfile.mkdtemp()
 
     def test_single_process_integration(self):
         exp_desc = ExperimentDescription()
@@ -81,7 +80,7 @@ class ExecuteTestCase(unittest.TestCase):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
-        access_config.set_attribute_value("rootDirectory", self._root_dir)
+        access_config.set_attribute_value("rootDirectory", self.root_dir)
         controller = proxy.create_controller(xml, access_config)
 
         controller.start()
@@ -122,7 +121,7 @@ class ExecuteTestCase(unittest.TestCase):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
-        access_config.set_attribute_value("rootDirectory", self._root_dir)
+        access_config.set_attribute_value("rootDirectory", self.root_dir)
         controller.set_access_configuration(desc.guid, access_config)
 
         controller.start()
@@ -162,13 +161,13 @@ class ExecuteTestCase(unittest.TestCase):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
-        access_config.set_attribute_value("rootDirectory", self._root_dir)
+        access_config.set_attribute_value("rootDirectory", self.root_dir)
         controller = proxy.create_controller(xml, access_config)
 
         access_config2 = proxy.AccessConfiguration()
         access_config2.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
-        inst_root_dir = os.path.join(self._root_dir, "instance")
+        inst_root_dir = os.path.join(self.root_dir, "instance")
         os.mkdir(inst_root_dir)
         access_config2.set_attribute_value("rootDirectory", inst_root_dir)
         controller.set_access_configuration(desc.guid, access_config2)
@@ -186,8 +185,68 @@ class ExecuteTestCase(unittest.TestCase):
         controller.stop()
         controller.shutdown()
 
+    def TODO_test_ssh_daemonized_all_integration(self):
+        # This test doesn't run because
+        # sys.modules["nepi.testbeds.mock"] = mock
+        # is not set in the ssh process
+        exp_desc = ExperimentDescription()
+        testbed_version = "01"
+        testbed_id = "mock"
+        env = test_util.test_environment()
+        provider = FactoriesProvider(testbed_id, testbed_version)
+        desc = exp_desc.add_testbed_description(provider)
+        desc.set_attribute_value("fake", True)
+        node1 = desc.create("Node")
+        node2 = desc.create("Node")
+        iface1 = desc.create("Interface")
+        iface1.set_attribute_value("fake", True)
+        node1.connector("devs").connect(iface1.connector("node"))
+        iface2 = desc.create("Interface")
+        iface2.set_attribute_value("fake", True)
+        node2.connector("devs").connect(iface2.connector("node"))
+        iface1.connector("iface").connect(iface2.connector("iface"))
+        app = desc.create("Application")
+        app.connector("node").connect(node1.connector("apps"))
+        app.enable_trace("fake")
+
+        xml = exp_desc.to_xml()
+        access_config = proxy.AccessConfiguration()
+        access_config.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        access_config.set_attribute_value("rootDirectory", self.root_dir)
+        access_config.set_attribute_value("communication", 
+                proxy.AccessConfiguration.ACCESS_SSH)
+        access_config.set_attribute_value("port", env.port)
+        access_config.set_attribute_value("useAgent", True)
+        controller = proxy.create_controller(xml, access_config)
+
+        access_config2 = proxy.AccessConfiguration()
+        access_config2.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+        access_config2.set_attribute_value("communication", 
+                proxy.AccessConfiguration.ACCESS_SSH)
+        access_config2.set_attribute_value("port", env.port)
+        access_config2.set_attribute_value("useAgent", True)
+        controller.set_access_configuration(desc.guid, access_config2)
+
+        controller.start()
+        while not controller.is_finished(app.guid):
+            time.sleep(0.5)
+        fake_result = controller.trace(desc.guid, app.guid, "fake")
+        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+        self.assertTrue(fake_result.startswith(comp_result))
+        controller.stop()
+        controller.shutdown()
     def tearDown(self):
-        shutil.rmtree(self._root_dir)
+        shutil.rmtree(self.root_dir)
 
 if __name__ == '__main__':
     unittest.main()
index 0181c49..bdedb9b 100644 (file)
@@ -19,6 +19,32 @@ def skipUnless(cond, text):
 def skipIf(cond, text):
     return (lambda f: _bannerwrap(f, text)) if cond else lambda f: f
 
+def find_bin(name, extra_path = None):
+    search = []
+    if "PATH" in os.environ:
+        search += os.environ["PATH"].split(":")
+    for pref in ("/", "/usr/", "/usr/local/"):
+        for d in ("bin", "sbin"):
+            search.append(pref + d)
+    if extra_path:
+        search += extra_path
+
+    for d in search:
+            try:
+                os.stat(d + "/" + name)
+                return d + "/" + name
+            except OSError, e:
+                if e.errno != os.errno.ENOENT:
+                    raise
+    return None
+
+def find_bin_or_die(name, extra_path = None):
+    r = find_bin(name)
+    if not r:
+        raise RuntimeError(("Cannot find `%s' command, impossible to " +
+                "continue.") % name)
+    return r
+
 # SSH stuff
 
 import os, os.path, re, signal, shutil, socket, subprocess, tempfile
@@ -101,4 +127,35 @@ def stop_ssh_agent(data):
     for k in data:
         del os.environ[k]
 
+class test_environment(object):
+    def __init__(self):
+        sshd = find_bin_or_die("sshd")
+        environ = {}
+        if 'PYTHONPATH' in os.environ:
+            environ['PYTHONPATH'] = ":".join(map(os.path.realpath, 
+                os.environ['PYTHONPATH'].split(":")))
+
+        self.dir = tempfile.mkdtemp()
+        self.server_keypair = gen_ssh_keypair(
+                os.path.join(self.dir, "server_key"))
+        self.client_keypair = gen_ssh_keypair(
+                os.path.join(self.dir, "client_key"))
+        self.authorized_keys = gen_auth_keys(self.client_keypair[1],
+                os.path.join(self.dir, "authorized_keys"), environ)
+        self.port = get_free_port()
+        self.sshd_conf = gen_sshd_config(
+                os.path.join(self.dir, "sshd_config"),
+                self.port, self.server_keypair[0], self.authorized_keys)
+
+        self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf])
+        self.ssh_agent_vars = start_ssh_agent()
+        add_key_to_agent(self.client_keypair[0])
+
+    def __del__(self):
+        if self.sshd:
+            os.kill(self.sshd.pid, signal.SIGTERM)
+            self.sshd.wait()
+        if self.ssh_agent_vars:
+            stop_ssh_agent(self.ssh_agent_vars)
+        shutil.rmtree(self.dir)
 
index 1e4d4c1..cf3579e 100755 (executable)
@@ -6,23 +6,21 @@ from nepi.util.constants import AF_INET, STATUS_FINISHED
 from nepi.testbeds import netns
 import os
 import shutil
+import tempfile
 import test_util
 import time
 import unittest
-import uuid
 
 class NetnsExecuteTestCase(unittest.TestCase):
     def setUp(self):
-        self._home_dir = os.path.join(os.getenv("HOME"), ".nepi", 
-                str(uuid.uuid1()))
-        os.makedirs(self._home_dir)
+        self.root_dir = tempfile.mkdtemp()
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
     def test_run_ping_if(self):
         user = getpass.getuser()
         testbed_version = "01"
         instance = netns.TestbedInstance(testbed_version)
-        instance.configure("homeDirectory", self._home_dir)
+        instance.configure("homeDirectory", self.root_dir)
         instance.create(2, "Node")
         instance.create(3, "Node")
         instance.create(4, "NodeInterface")
@@ -65,7 +63,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         user = getpass.getuser()
         testbed_version = "01"
         instance = netns.TestbedInstance(testbed_version)
-        instance.configure("homeDirectory", self._home_dir)
+        instance.configure("homeDirectory", self.root_dir)
         instance.create(2, "Node")
         instance.create(3, "Node")
         instance.create(4, "P2PNodeInterface")
@@ -105,7 +103,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         user = getpass.getuser()
         testbed_version = "01"
         instance = netns.TestbedInstance(testbed_version)
-        instance.configure("homeDirectory", self._home_dir)
+        instance.configure("homeDirectory", self.root_dir)
         instance.create(2, "Node")
         instance.create(3, "Node")
         instance.create(4, "Node")
@@ -160,7 +158,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.shutdown()
         
     def tearDown(self):
-        shutil.rmtree(self._home_dir)
+        shutil.rmtree(self.root_dir)
 
 if __name__ == '__main__':
     unittest.main()
index b7a0822..543a635 100755 (executable)
@@ -7,16 +7,14 @@ from nepi.core.execute import ExperimentController
 from nepi.util import proxy
 import os
 import shutil
+import tempfile
 import test_util
 import time
 import unittest
-import uuid
 
 class NetnsIntegrationTestCase(unittest.TestCase):
     def setUp(self):
-        self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", 
-                str(uuid.uuid1()))
-        os.makedirs(self._root_dir)
+        self.root_dir = tempfile.mkdtemp()
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
     def test_local_if(self):
@@ -26,7 +24,7 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         user = getpass.getuser()
         netns_provider = FactoriesProvider(testbed_id, testbed_version)
         netns_desc = exp_desc.add_testbed_description(netns_provider)
-        netns_desc.set_attribute_value("homeDirectory", self._root_dir)
+        netns_desc.set_attribute_value("homeDirectory", self.root_dir)
         #netns_desc.set_attribute_value("enableDebug", True)
         node1 = netns_desc.create("Node")
         node2 = netns_desc.create("Node")
@@ -73,7 +71,7 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         user = getpass.getuser()
         netns_provider = FactoriesProvider(testbed_id, testbed_version)
         netns_desc = exp_desc.add_testbed_description(netns_provider)
-        netns_desc.set_attribute_value("homeDirectory", self._root_dir)
+        netns_desc.set_attribute_value("homeDirectory", self.root_dir)
         #netns_desc.set_attribute_value("enableDebug", True)
         node1 = netns_desc.create("Node")
         node2 = netns_desc.create("Node")
@@ -101,7 +99,7 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
-        access_config.set_attribute_value("rootDirectory", self._root_dir)
+        access_config.set_attribute_value("rootDirectory", self.root_dir)
         access_config.set_attribute_value("logLevel", 
                 proxy.AccessConfiguration.DEBUG_LEVEL)
         controller = proxy.create_controller(xml, access_config)
@@ -109,7 +107,76 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         access_config2 = proxy.AccessConfiguration()
         access_config2.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
-        inst_root_dir = os.path.join(self._root_dir, "instance")
+        inst_root_dir = os.path.join(self.root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+        access_config2.set_attribute_value("logLevel", 
+                proxy.AccessConfiguration.DEBUG_LEVEL)
+        controller.set_access_configuration(netns_desc.guid, access_config2)
+
+        controller.start()
+        while not controller.is_finished(app.guid):
+            time.sleep(0.5)
+        ping_result = controller.trace(netns_desc.guid, app.guid, "stdout")
+        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+        self.assertTrue(ping_result.startswith(comp_result))
+        controller.stop()
+        controller.shutdown()
+
+    @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+    def test_all_ssh_daemonized_if(self):
+        exp_desc = ExperimentDescription()
+        testbed_version = "01"
+        testbed_id = "netns"
+        env = test_util.test_environment()
+        user = getpass.getuser()
+        netns_provider = FactoriesProvider(testbed_id, testbed_version)
+        netns_desc = exp_desc.add_testbed_description(netns_provider)
+        netns_desc.set_attribute_value("homeDirectory", self.root_dir)
+        #netns_desc.set_attribute_value("enableDebug", True)
+        node1 = netns_desc.create("Node")
+        node2 = netns_desc.create("Node")
+        iface1 = netns_desc.create("NodeInterface")
+        iface1.set_attribute_value("up", True)
+        node1.connector("devs").connect(iface1.connector("node"))
+        ip1 = iface1.add_address()
+        ip1.set_attribute_value("Address", "10.0.0.1")
+        iface2 = netns_desc.create("NodeInterface")
+        iface2.set_attribute_value("up", True)
+        node2.connector("devs").connect(iface2.connector("node"))
+        ip2 = iface2.add_address()
+        ip2.set_attribute_value("Address", "10.0.0.2")
+        switch = netns_desc.create("Switch")
+        switch.set_attribute_value("up", True)
+        iface1.connector("switch").connect(switch.connector("devs"))
+        iface2.connector("switch").connect(switch.connector("devs"))
+        app = netns_desc.create("Application")
+        app.set_attribute_value("command", "ping -qc1 10.0.0.2")
+        app.set_attribute_value("user", user)
+        app.connector("node").connect(node1.connector("apps"))
+        app.enable_trace("stdout")
+        xml = exp_desc.to_xml()
+
+        access_config = proxy.AccessConfiguration()
+        access_config.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        access_config.set_attribute_value("rootDirectory", self.root_dir)
+        access_config.set_attribute_value("logLevel", 
+                proxy.AccessConfiguration.DEBUG_LEVEL)
+        access_config.set_attribute_value("communication", 
+                proxy.AccessConfiguration.ACCESS_SSH)
+        access_config.set_attribute_value("port", env.port)
+        access_config.set_attribute_value("useAgent", True)
+        controller = proxy.create_controller(xml, access_config)
+
+        access_config2 = proxy.AccessConfiguration()
+        access_config2.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        inst_root_dir = os.path.join(self.root_dir, "instance")
         os.mkdir(inst_root_dir)
         access_config2.set_attribute_value("rootDirectory", inst_root_dir)
         access_config2.set_attribute_value("logLevel", 
@@ -130,7 +197,7 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         controller.shutdown()
 
     def tearDown(self):
-        shutil.rmtree(self._root_dir)
+        shutil.rmtree(self.root_dir)
 
 if __name__ == '__main__':
     unittest.main()
index 3766520..615b1ef 100755 (executable)
@@ -1,23 +1,23 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
+import getpass
 from nepi.util import server
 import os
 import shutil
 import sys
+import tempfile
+import test_util
 import unittest
-import uuid
 
 class ServerTestCase(unittest.TestCase):
     def setUp(self):
-        self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", 
-                str(uuid.uuid1()))
-        os.makedirs(self._root_dir)
+        self.root_dir = tempfile.mkdtemp()
 
     def test_server(self):
-        s = server.Server(self._root_dir)
+        s = server.Server(self.root_dir)
         s.run()
-        c = server.Client(self._root_dir)
+        c = server.Client(self.root_dir)
         c.send_msg("Hola")
         reply = c.read_reply()
         self.assertTrue(reply == "Reply to: Hola")
@@ -26,9 +26,9 @@ class ServerTestCase(unittest.TestCase):
         self.assertTrue(reply == "Stopping server")
 
     def test_server_long_message(self):
-        s = server.Server(self._root_dir)
+        s = server.Server(self.root_dir)
         s.run()
-        c = server.Client(self._root_dir)
+        c = server.Client(self.root_dir)
         c.send_msg("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         reply = c.read_reply()
         self.assertTrue(reply == "Reply to: 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -36,8 +36,25 @@ class ServerTestCase(unittest.TestCase):
         reply = c.read_reply()
         self.assertTrue(reply == "Stopping server")
 
+    def test_ssh_server(self):
+        env = test_util.test_environment()
+        user = getpass.getuser()
+        # launch server
+        python_code = "from nepi.util import server;s=server.Server('%s');\
+                s.run()" % self.root_dir
+        server.popen_ssh_subprocess(python_code, host = "localhost", 
+                port = env.port, user = user, agent = True)
+        c = server.Client(self.root_dir, host = "localhost", port = env.port,
+                user = user, agent = True)
+        c.send_msg("Hola")
+        reply = c.read_reply()
+        self.assertTrue(reply == "Reply to: Hola")
+        c.send_stop()
+        reply = c.read_reply()
+        self.assertTrue(reply == "Stopping server")
+
     def tearDown(self):
-        shutil.rmtree(self._root_dir)
+        shutil.rmtree(self.root_dir)
 
 if __name__ == '__main__':
     unittest.main()