From: Alina Quereilhac Date: Wed, 16 Mar 2011 22:15:20 +0000 (+0100) Subject: server daemon launched over ssh connection. X-Git-Tag: nepi_v2~184 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=b1761d355dd90f719da39eb4265cd136b11fe175;p=nepi.git server daemon launched over ssh connection. --- diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index e3d6edbc..c3b98b47 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -5,7 +5,9 @@ import base64 from nepi.core.attributes import AttributesMap, Attribute from nepi.util import server, validation from nepi.util.constants import TIME_NOW +import getpass import sys +import time # PROTOCOL REPLIES OK = 0 @@ -48,7 +50,7 @@ FLOAT = 103 # EXPERIMENT CONTROLER PROTOCOL MESSAGES controller_messages = dict({ XML: "%d" % XML, - ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"), + ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"), TRACE: "%d|%s" % (TRACE, "%d|%d|%s"), FINISHED: "%d|%s" % (FINISHED, "%d"), START: "%d" % START, @@ -153,6 +155,38 @@ def log_reply(server, reply): server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, code_txt, txt)) +def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent): + # launch daemon + proc = server.popen_ssh_subprocess(python_code, host = host, + port = port, user = user, agent = agent) + #while not proc.poll(): + # time.sleep(0.5) + if proc.poll(): + err = proc.stderr.read() + raise RuntimeError("Client could not be executed: %s" % \ + err) + # create client + return server.Client(root_dir, host = host, port = port, user = user, + agent = agent) + +def to_server_log_level(log_level): + return server.DEBUG_LEVEL \ + if log_level == AccessConfiguration.DEBUG_LEVEL \ + else server.ERROR_LEVEL + +def get_access_config_params(access_config): + root_dir = access_config.get_attribute_value("rootDirectory") + log_level = access_config.get_attribute_value("logLevel") + log_level = to_server_log_level(log_level) + user = host = port = agent = None + communication = access_config.get_attribute_value("communication") + if communication == AccessConfiguration.ACCESS_SSH: + user = access_config.get_attribute_value("user") + host = access_config.get_attribute_value("host") + port = access_config.get_attribute_value("port") + agent = access_config.get_attribute_value("useAgent") + return (root_dir, log_level, user, host, port, agent) + class AccessConfiguration(AttributesMap): MODE_SINGLE_PROCESS = "SINGLE" MODE_DAEMON = "DAEMON" @@ -185,6 +219,7 @@ class AccessConfiguration(AttributesMap): self.add_attribute(name = "user", help = "User on the Host to execute the testbed", type = Attribute.STRING, + value = getpass.getuser(), validation_function = validation.is_string) self.add_attribute(name = "port", help = "Port on the Host", @@ -210,28 +245,32 @@ class AccessConfiguration(AttributesMap): validation_function = validation.is_enum) def create_controller(xml, access_config = None): - mode = None if not access_config else access_config.get_attribute_value("mode") + mode = None if not access_config else \ + access_config.get_attribute_value("mode") if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS: from nepi.core.execute import ExperimentController return ExperimentController(xml) elif mode == AccessConfiguration.MODE_DAEMON: - root_dir = access_config.get_attribute_value("rootDirectory") - log_level = access_config.get_attribute_value("logLevel") - return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml) + (root_dir, log_level, user, host, port, agent) = \ + get_access_config_params(access_config) + return ExperimentControllerProxy(root_dir, log_level, + experiment_xml = xml, host = host, port = port, user = user, + agent = agent) raise RuntimeError("Unsupported access configuration 'mode'" % mode) def create_testbed_instance(testbed_id, testbed_version, access_config): mode = None if not access_config else access_config.get_attribute_value("mode") if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS: - return _build_testbed_testbed(testbed_id, testbed_version) + return _build_testbed_instance(testbed_id, testbed_version) elif mode == AccessConfiguration.MODE_DAEMON: - root_dir = access_config.get_attribute_value("rootDirectory") - log_level = access_config.get_attribute_value("logLevel") + (root_dir, log_level, user, host, port, agent) = \ + get_access_config_params(access_config) return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, - testbed_version = testbed_version) + testbed_version = testbed_version, host = host, port = port, + user = user, agent = agent) raise RuntimeError("Unsupported access configuration 'mode'" % mode) -def _build_testbed_testbed(testbed_id, testbed_version): +def _build_testbed_instance(testbed_id, testbed_version): mod_name = "nepi.testbeds.%s" % (testbed_id.lower()) if not mod_name in sys.modules: __import__(mod_name) @@ -239,14 +278,14 @@ def _build_testbed_testbed(testbed_id, testbed_version): return module.TestbedInstance(testbed_version) class TestbedInstanceServer(server.Server): - def __init__(self, root_dir, testbed_id, testbed_version): - super(TestbedInstanceServer, self).__init__(root_dir) + def __init__(self, root_dir, log_level, testbed_id, testbed_version): + super(TestbedInstanceServer, self).__init__(root_dir, log_level) self._testbed_id = testbed_id self._testbed_version = testbed_version self._testbed = None def post_daemonize(self): - self._testbed = _build_testbed_testbed(self._testbed_id, + self._testbed = _build_testbed_instance(self._testbed_id, self._testbed_version) def reply_action(self, msg): @@ -468,8 +507,8 @@ class TestbedInstanceServer(server.Server): return "%d|%s" % (OK, result) class ExperimentControllerServer(server.Server): - def __init__(self, root_dir, experiment_xml): - super(ExperimentControllerServer, self).__init__(root_dir) + def __init__(self, root_dir, log_level, experiment_xml): + super(ExperimentControllerServer, self).__init__(root_dir, log_level) self._experiment_xml = experiment_xml self._controller = None @@ -522,6 +561,7 @@ class ExperimentControllerServer(server.Server): port = int(params[6]) root_dir = params[7] use_agent = params[8] == "True" + log_level = params[9] access_config = AccessConfiguration() access_config.set_attribute_value("mode", mode) access_config.set_attribute_value("communication", communication) @@ -530,6 +570,7 @@ class ExperimentControllerServer(server.Server): access_config.set_attribute_value("port", port) access_config.set_attribute_value("rootDirectory", root_dir) access_config.set_attribute_value("useAgent", use_agent) + access_config.set_attribute_value("logLevel", log_level) self._controller.set_access_configuration(testbed_guid, access_config) return "%d|%s" % (OK, "") @@ -562,18 +603,28 @@ class ExperimentControllerServer(server.Server): class TestbedIntanceProxy(object): def __init__(self, root_dir, log_level, testbed_id = None, - testbed_version = None, launch = True): + testbed_version = None, launch = True, host = None, + port = None, user = None, agent = None): if launch: if testbed_id == None or testbed_version == None: raise RuntimeError("To launch a TesbedInstance server a \ testbed_id and testbed_version are required") - # launch daemon - s = TestbedInstanceServer(root_dir, testbed_id, testbed_version) - if log_level == AccessConfiguration.DEBUG_LEVEL: - s.set_debug_log_level() - s.run() - # create_client - self._client = server.Client(root_dir) + # ssh + if host != None: + python_code = "from nepi.util.proxy import \ + TesbedInstanceServer;\ + s = TestbedInstanceServer('%s', %d, '%s', '%s');\ + s.run()" % (root_dir, log_level, testbed_id, + testbed_version) + self._client = launch_ssh_daemon_client(root_dir, python_code, + host, port, user, agent) + else: + # launch daemon + s = TestbedInstanceServer(root_dir, log_level, testbed_id, + testbed_version) + s.run() + # create client + self._client = server.Client(root_dir) @property def guids(self): @@ -850,18 +901,31 @@ class TestbedIntanceProxy(object): self._client.send_stop() class ExperimentControllerProxy(object): - def __init__(self, root_dir, log_level, experiment_xml = None, launch = True): + def __init__(self, root_dir, log_level, experiment_xml = None, + launch = True, host = None, port = None, user = None, + agent = None): if launch: + # launch server if experiment_xml == None: raise RuntimeError("To launch a ExperimentControllerServer a \ xml description of the experiment is required") - # launch daemon - s = ExperimentControllerServer(root_dir, experiment_xml) - if log_level == AccessConfiguration.DEBUG_LEVEL: - s.set_debug_log_level() - s.run() - # create_client - self._client = server.Client(root_dir) + # ssh + if host != None: + xml = experiment_xml + xml = xml.replace("'", r"\'") + xml = xml.replace("\"", r"\'") + xml = xml.replace("\n", r"") + python_code = "from nepi.util.proxy import ExperimentControllerServer;\ + s = ExperimentControllerServer('%s', %d, '%s');\ + s.run()" % (root_dir, log_level, xml) + self._client = launch_ssh_daemon_client(root_dir, python_code, + host, port, user, agent) + else: + # launch daemon + s = ExperimentControllerServer(root_dir, log_level, experiment_xml) + s.run() + # create client + self._client = server.Client(root_dir) @property def experiment_xml(self): @@ -883,9 +947,10 @@ class ExperimentControllerProxy(object): port = access_config.get_attribute_value("port") root_dir = access_config.get_attribute_value("rootDirectory") use_agent = access_config.get_attribute_value("useAgent") + log_level = access_config.get_attribute_value("logLevel") msg = controller_messages[ACCESS] msg = msg % (testbed_guid, mode, communication, host, user, port, - root_dir, use_agent) + root_dir, use_agent, log_level) self._client.send_msg(msg) reply = self._client.read_reply() result = reply.split("|") diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 8a3dbed7..2e02c74a 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -4,12 +4,13 @@ import base64 import errno import os +import resource import select import socket import sys import subprocess import threading -from time import strftime +import time import traceback CTRL_SOCK = "ctrl.sock" @@ -21,13 +22,17 @@ STOP_MSG = "STOP" ERROR_LEVEL = 0 DEBUG_LEVEL = 1 +if hasattr(os, "devnull"): + DEV_NULL = os.devnull +else: + DEV_NULL = "/dev/null" + class Server(object): - def __init__(self, root_dir = "."): + def __init__(self, root_dir = ".", log_level = ERROR_LEVEL): self._root_dir = root_dir self._stop = False self._ctrl_sock = None - self._stderr = None - self._log_level = ERROR_LEVEL + self._log_level = log_level def run(self): try: @@ -77,7 +82,10 @@ class Server(object): os._exit(0) # close all open file descriptors. - for fd in range(3, MAX_FD): + max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if (max_fd == resource.RLIM_INFINITY): + max_fd = MAX_FD + for fd in range(3, max_fd): if fd != w: try: os.close(fd) @@ -85,11 +93,14 @@ class Server(object): pass # Redirect standard file descriptors. - self._stderr = stdout = file(STD_ERR, "a", 0) - stdin = open('/dev/null', 'r') + stdin = open(DEV_NULL, "r") + stderr = stdout = open(STD_ERR, "a", 0) os.dup2(stdin.fileno(), sys.stdin.fileno()) + # NOTE: sys.stdout.write will still be buffered, even if the file + # was opened with 0 buffer os.dup2(stdout.fileno(), sys.stdout.fileno()) - os.dup2(self._stderr.fileno(), sys.stderr.fileno()) + os.dup2(stderr.fileno(), sys.stderr.fileno()) + # let the parent process know that the daemonization is finished os.write(w, "\n") os.close(w) @@ -152,22 +163,16 @@ class Server(object): def reply_action(self, msg): return "Reply to: %s" % msg - def set_error_log_level(self): - self._log_level = ERROR_LEVEL - - def set_debug_log_level(self): - self._log_level = DEBUG_LEVEL - def log_error(self, text = None): if text == None: text = traceback.format_exc() - date = strftime("%Y-%m-%d %H:%M:%S") + date = time.strftime("%Y-%m-%d %H:%M:%S") sys.stderr.write("ERROR: %s\n%s\n" % (date, text)) return text def log_debug(self, text): if self._log_level == DEBUG_LEVEL: - date = strftime("%Y-%m-%d %H:%M:%S") + date = time.strftime("%Y-%m-%d %H:%M:%S") sys.stderr.write("DEBUG: %s\n%s\n" % (date, text)) class Forwarder(object): @@ -190,6 +195,7 @@ class Forwarder(object): def write_data(self, data): sys.stdout.write(data) + # sys.stdout.write is buffered, for this we need to do a flush() sys.stdout.flush() def send_to_server(self, data): @@ -234,14 +240,20 @@ class Forwarder(object): pass class Client(object): - def __init__(self, root_dir = "."): - self._process = subprocess.Popen( - ["python", "-c", - "from nepi.util import server;c=server.Forwarder('%s');\ - c.forward()" % root_dir - ], - stdin = subprocess.PIPE, - stdout = subprocess.PIPE) + def __init__(self, root_dir = ".", host = None, port = None, user = None, + agent = None): + python_code = "from nepi.util import server;c=server.Forwarder('%s');\ + c.forward()" % root_dir + if host != None: + self._process = popen_ssh_subprocess(python_code, host, port, + user, agent) + else: + self._process = subprocess.Popen( + ["python", "-c", python_code], + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE + ) def send_msg(self, msg): encoded = base64.b64encode(msg) @@ -256,3 +268,51 @@ class Client(object): encoded = data.rstrip() return base64.b64decode(encoded) +def popen_ssh_subprocess(python_code, host, port, user, agent, + python_path = None): + if python_path: + python_path.replace("'", r"'\''") + cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path + else: + cmd = "" + # Uncomment for debug (to run everything under strace) + # We had to verify if strace works (cannot nest them) + #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n" + #cmd += "$CMD " + #if self.mode == MODE_SSH: + # cmd += "strace -f -tt -s 200 -o strace$$.out " + cmd += "python -c '" + cmd += "import base64, os\n" + cmd += "cmd = \"\"\n" + cmd += "while True:\n" + cmd += " cmd += os.read(0, 1)\n" # one byte from stdin + cmd += " if cmd[-1] == \"\\n\": break\n" + cmd += "cmd = base64.b64decode(cmd)\n" + # Uncomment for debug + #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n" + cmd += "os.write(1, \"OK\\n\")\n" # send a sync message + cmd += "exec(cmd)\n'" + + args = ['ssh', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + '-l', user, host] + if agent: + args.append('-A') + if port: + args.append('-p%d' % port) + args.append(cmd) + + # connects to the remote host and starts a remote rpyc connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + # send the command to execute + os.write(proc.stdin.fileno(), + base64.b64encode(python_code) + "\n") + msg = os.read(proc.stdout.fileno(), 3) + if msg != "OK\n": + raise RuntimeError("Failed to start remote python interpreter") + return proc + diff --git a/test/core/integration.py b/test/core/integration.py index 069d5c55..3830f03d 100755 --- a/test/core/integration.py +++ b/test/core/integration.py @@ -9,17 +9,16 @@ import mock.metadata_v01 import os import shutil import sys +import tempfile +import test_util import time import unittest -import uuid class ExecuteTestCase(unittest.TestCase): def setUp(self): sys.modules["nepi.testbeds.mock.metadata_v01"] = mock.metadata_v01 sys.modules["nepi.testbeds.mock"] = mock - self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", - str(uuid.uuid1())) - os.makedirs(self._root_dir) + self.root_dir = tempfile.mkdtemp() def test_single_process_integration(self): exp_desc = ExperimentDescription() @@ -81,7 +80,7 @@ class ExecuteTestCase(unittest.TestCase): access_config = proxy.AccessConfiguration() access_config.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) - access_config.set_attribute_value("rootDirectory", self._root_dir) + access_config.set_attribute_value("rootDirectory", self.root_dir) controller = proxy.create_controller(xml, access_config) controller.start() @@ -122,7 +121,7 @@ class ExecuteTestCase(unittest.TestCase): access_config = proxy.AccessConfiguration() access_config.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) - access_config.set_attribute_value("rootDirectory", self._root_dir) + access_config.set_attribute_value("rootDirectory", self.root_dir) controller.set_access_configuration(desc.guid, access_config) controller.start() @@ -162,13 +161,13 @@ class ExecuteTestCase(unittest.TestCase): access_config = proxy.AccessConfiguration() access_config.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) - access_config.set_attribute_value("rootDirectory", self._root_dir) + access_config.set_attribute_value("rootDirectory", self.root_dir) controller = proxy.create_controller(xml, access_config) access_config2 = proxy.AccessConfiguration() access_config2.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) - inst_root_dir = os.path.join(self._root_dir, "instance") + inst_root_dir = os.path.join(self.root_dir, "instance") os.mkdir(inst_root_dir) access_config2.set_attribute_value("rootDirectory", inst_root_dir) controller.set_access_configuration(desc.guid, access_config2) @@ -186,8 +185,68 @@ class ExecuteTestCase(unittest.TestCase): controller.stop() controller.shutdown() + def TODO_test_ssh_daemonized_all_integration(self): + # This test doesn't run because + # sys.modules["nepi.testbeds.mock"] = mock + # is not set in the ssh process + exp_desc = ExperimentDescription() + testbed_version = "01" + testbed_id = "mock" + env = test_util.test_environment() + provider = FactoriesProvider(testbed_id, testbed_version) + desc = exp_desc.add_testbed_description(provider) + desc.set_attribute_value("fake", True) + node1 = desc.create("Node") + node2 = desc.create("Node") + iface1 = desc.create("Interface") + iface1.set_attribute_value("fake", True) + node1.connector("devs").connect(iface1.connector("node")) + iface2 = desc.create("Interface") + iface2.set_attribute_value("fake", True) + node2.connector("devs").connect(iface2.connector("node")) + iface1.connector("iface").connect(iface2.connector("iface")) + app = desc.create("Application") + app.connector("node").connect(node1.connector("apps")) + app.enable_trace("fake") + + xml = exp_desc.to_xml() + access_config = proxy.AccessConfiguration() + access_config.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + access_config.set_attribute_value("rootDirectory", self.root_dir) + access_config.set_attribute_value("communication", + proxy.AccessConfiguration.ACCESS_SSH) + access_config.set_attribute_value("port", env.port) + access_config.set_attribute_value("useAgent", True) + controller = proxy.create_controller(xml, access_config) + + access_config2 = proxy.AccessConfiguration() + access_config2.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + access_config2.set_attribute_value("rootDirectory", inst_root_dir) + access_config2.set_attribute_value("communication", + proxy.AccessConfiguration.ACCESS_SSH) + access_config2.set_attribute_value("port", env.port) + access_config2.set_attribute_value("useAgent", True) + controller.set_access_configuration(desc.guid, access_config2) + + controller.start() + while not controller.is_finished(app.guid): + time.sleep(0.5) + fake_result = controller.trace(desc.guid, app.guid, "fake") + comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. + +--- 10.0.0.2 ping statistics --- +1 packets transmitted, 1 received, 0% packet loss, time 0ms +""" + self.assertTrue(fake_result.startswith(comp_result)) + controller.stop() + controller.shutdown() + def tearDown(self): - shutil.rmtree(self._root_dir) + shutil.rmtree(self.root_dir) if __name__ == '__main__': unittest.main() diff --git a/test/lib/test_util.py b/test/lib/test_util.py index 0181c49b..bdedb9bb 100644 --- a/test/lib/test_util.py +++ b/test/lib/test_util.py @@ -19,6 +19,32 @@ def skipUnless(cond, text): def skipIf(cond, text): return (lambda f: _bannerwrap(f, text)) if cond else lambda f: f +def find_bin(name, extra_path = None): + search = [] + if "PATH" in os.environ: + search += os.environ["PATH"].split(":") + for pref in ("/", "/usr/", "/usr/local/"): + for d in ("bin", "sbin"): + search.append(pref + d) + if extra_path: + search += extra_path + + for d in search: + try: + os.stat(d + "/" + name) + return d + "/" + name + except OSError, e: + if e.errno != os.errno.ENOENT: + raise + return None + +def find_bin_or_die(name, extra_path = None): + r = find_bin(name) + if not r: + raise RuntimeError(("Cannot find `%s' command, impossible to " + + "continue.") % name) + return r + # SSH stuff import os, os.path, re, signal, shutil, socket, subprocess, tempfile @@ -101,4 +127,35 @@ def stop_ssh_agent(data): for k in data: del os.environ[k] +class test_environment(object): + def __init__(self): + sshd = find_bin_or_die("sshd") + environ = {} + if 'PYTHONPATH' in os.environ: + environ['PYTHONPATH'] = ":".join(map(os.path.realpath, + os.environ['PYTHONPATH'].split(":"))) + + self.dir = tempfile.mkdtemp() + self.server_keypair = gen_ssh_keypair( + os.path.join(self.dir, "server_key")) + self.client_keypair = gen_ssh_keypair( + os.path.join(self.dir, "client_key")) + self.authorized_keys = gen_auth_keys(self.client_keypair[1], + os.path.join(self.dir, "authorized_keys"), environ) + self.port = get_free_port() + self.sshd_conf = gen_sshd_config( + os.path.join(self.dir, "sshd_config"), + self.port, self.server_keypair[0], self.authorized_keys) + + self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf]) + self.ssh_agent_vars = start_ssh_agent() + add_key_to_agent(self.client_keypair[0]) + + def __del__(self): + if self.sshd: + os.kill(self.sshd.pid, signal.SIGTERM) + self.sshd.wait() + if self.ssh_agent_vars: + stop_ssh_agent(self.ssh_agent_vars) + shutil.rmtree(self.dir) diff --git a/test/testbeds/netns/execute.py b/test/testbeds/netns/execute.py index 1e4d4c12..cf3579e5 100755 --- a/test/testbeds/netns/execute.py +++ b/test/testbeds/netns/execute.py @@ -6,23 +6,21 @@ from nepi.util.constants import AF_INET, STATUS_FINISHED from nepi.testbeds import netns import os import shutil +import tempfile import test_util import time import unittest -import uuid class NetnsExecuteTestCase(unittest.TestCase): def setUp(self): - self._home_dir = os.path.join(os.getenv("HOME"), ".nepi", - str(uuid.uuid1())) - os.makedirs(self._home_dir) + self.root_dir = tempfile.mkdtemp() @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") def test_run_ping_if(self): user = getpass.getuser() testbed_version = "01" instance = netns.TestbedInstance(testbed_version) - instance.configure("homeDirectory", self._home_dir) + instance.configure("homeDirectory", self.root_dir) instance.create(2, "Node") instance.create(3, "Node") instance.create(4, "NodeInterface") @@ -65,7 +63,7 @@ class NetnsExecuteTestCase(unittest.TestCase): user = getpass.getuser() testbed_version = "01" instance = netns.TestbedInstance(testbed_version) - instance.configure("homeDirectory", self._home_dir) + instance.configure("homeDirectory", self.root_dir) instance.create(2, "Node") instance.create(3, "Node") instance.create(4, "P2PNodeInterface") @@ -105,7 +103,7 @@ class NetnsExecuteTestCase(unittest.TestCase): user = getpass.getuser() testbed_version = "01" instance = netns.TestbedInstance(testbed_version) - instance.configure("homeDirectory", self._home_dir) + instance.configure("homeDirectory", self.root_dir) instance.create(2, "Node") instance.create(3, "Node") instance.create(4, "Node") @@ -160,7 +158,7 @@ class NetnsExecuteTestCase(unittest.TestCase): instance.shutdown() def tearDown(self): - shutil.rmtree(self._home_dir) + shutil.rmtree(self.root_dir) if __name__ == '__main__': unittest.main() diff --git a/test/testbeds/netns/integration.py b/test/testbeds/netns/integration.py index b7a08224..543a6352 100755 --- a/test/testbeds/netns/integration.py +++ b/test/testbeds/netns/integration.py @@ -7,16 +7,14 @@ from nepi.core.execute import ExperimentController from nepi.util import proxy import os import shutil +import tempfile import test_util import time import unittest -import uuid class NetnsIntegrationTestCase(unittest.TestCase): def setUp(self): - self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", - str(uuid.uuid1())) - os.makedirs(self._root_dir) + self.root_dir = tempfile.mkdtemp() @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") def test_local_if(self): @@ -26,7 +24,7 @@ class NetnsIntegrationTestCase(unittest.TestCase): user = getpass.getuser() netns_provider = FactoriesProvider(testbed_id, testbed_version) netns_desc = exp_desc.add_testbed_description(netns_provider) - netns_desc.set_attribute_value("homeDirectory", self._root_dir) + netns_desc.set_attribute_value("homeDirectory", self.root_dir) #netns_desc.set_attribute_value("enableDebug", True) node1 = netns_desc.create("Node") node2 = netns_desc.create("Node") @@ -73,7 +71,7 @@ class NetnsIntegrationTestCase(unittest.TestCase): user = getpass.getuser() netns_provider = FactoriesProvider(testbed_id, testbed_version) netns_desc = exp_desc.add_testbed_description(netns_provider) - netns_desc.set_attribute_value("homeDirectory", self._root_dir) + netns_desc.set_attribute_value("homeDirectory", self.root_dir) #netns_desc.set_attribute_value("enableDebug", True) node1 = netns_desc.create("Node") node2 = netns_desc.create("Node") @@ -101,7 +99,7 @@ class NetnsIntegrationTestCase(unittest.TestCase): access_config = proxy.AccessConfiguration() access_config.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) - access_config.set_attribute_value("rootDirectory", self._root_dir) + access_config.set_attribute_value("rootDirectory", self.root_dir) access_config.set_attribute_value("logLevel", proxy.AccessConfiguration.DEBUG_LEVEL) controller = proxy.create_controller(xml, access_config) @@ -109,7 +107,76 @@ class NetnsIntegrationTestCase(unittest.TestCase): access_config2 = proxy.AccessConfiguration() access_config2.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) - inst_root_dir = os.path.join(self._root_dir, "instance") + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + access_config2.set_attribute_value("rootDirectory", inst_root_dir) + access_config2.set_attribute_value("logLevel", + proxy.AccessConfiguration.DEBUG_LEVEL) + controller.set_access_configuration(netns_desc.guid, access_config2) + + controller.start() + while not controller.is_finished(app.guid): + time.sleep(0.5) + ping_result = controller.trace(netns_desc.guid, app.guid, "stdout") + comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. + +--- 10.0.0.2 ping statistics --- +1 packets transmitted, 1 received, 0% packet loss, time 0ms +""" + self.assertTrue(ping_result.startswith(comp_result)) + controller.stop() + controller.shutdown() + + @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") + def test_all_ssh_daemonized_if(self): + exp_desc = ExperimentDescription() + testbed_version = "01" + testbed_id = "netns" + env = test_util.test_environment() + user = getpass.getuser() + netns_provider = FactoriesProvider(testbed_id, testbed_version) + netns_desc = exp_desc.add_testbed_description(netns_provider) + netns_desc.set_attribute_value("homeDirectory", self.root_dir) + #netns_desc.set_attribute_value("enableDebug", True) + node1 = netns_desc.create("Node") + node2 = netns_desc.create("Node") + iface1 = netns_desc.create("NodeInterface") + iface1.set_attribute_value("up", True) + node1.connector("devs").connect(iface1.connector("node")) + ip1 = iface1.add_address() + ip1.set_attribute_value("Address", "10.0.0.1") + iface2 = netns_desc.create("NodeInterface") + iface2.set_attribute_value("up", True) + node2.connector("devs").connect(iface2.connector("node")) + ip2 = iface2.add_address() + ip2.set_attribute_value("Address", "10.0.0.2") + switch = netns_desc.create("Switch") + switch.set_attribute_value("up", True) + iface1.connector("switch").connect(switch.connector("devs")) + iface2.connector("switch").connect(switch.connector("devs")) + app = netns_desc.create("Application") + app.set_attribute_value("command", "ping -qc1 10.0.0.2") + app.set_attribute_value("user", user) + app.connector("node").connect(node1.connector("apps")) + app.enable_trace("stdout") + xml = exp_desc.to_xml() + + access_config = proxy.AccessConfiguration() + access_config.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + access_config.set_attribute_value("rootDirectory", self.root_dir) + access_config.set_attribute_value("logLevel", + proxy.AccessConfiguration.DEBUG_LEVEL) + access_config.set_attribute_value("communication", + proxy.AccessConfiguration.ACCESS_SSH) + access_config.set_attribute_value("port", env.port) + access_config.set_attribute_value("useAgent", True) + controller = proxy.create_controller(xml, access_config) + + access_config2 = proxy.AccessConfiguration() + access_config2.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + inst_root_dir = os.path.join(self.root_dir, "instance") os.mkdir(inst_root_dir) access_config2.set_attribute_value("rootDirectory", inst_root_dir) access_config2.set_attribute_value("logLevel", @@ -130,7 +197,7 @@ class NetnsIntegrationTestCase(unittest.TestCase): controller.shutdown() def tearDown(self): - shutil.rmtree(self._root_dir) + shutil.rmtree(self.root_dir) if __name__ == '__main__': unittest.main() diff --git a/test/util/server.py b/test/util/server.py index 3766520e..615b1ef1 100755 --- a/test/util/server.py +++ b/test/util/server.py @@ -1,23 +1,23 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import getpass from nepi.util import server import os import shutil import sys +import tempfile +import test_util import unittest -import uuid class ServerTestCase(unittest.TestCase): def setUp(self): - self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", - str(uuid.uuid1())) - os.makedirs(self._root_dir) + self.root_dir = tempfile.mkdtemp() def test_server(self): - s = server.Server(self._root_dir) + s = server.Server(self.root_dir) s.run() - c = server.Client(self._root_dir) + c = server.Client(self.root_dir) c.send_msg("Hola") reply = c.read_reply() self.assertTrue(reply == "Reply to: Hola") @@ -26,9 +26,9 @@ class ServerTestCase(unittest.TestCase): self.assertTrue(reply == "Stopping server") def test_server_long_message(self): - s = server.Server(self._root_dir) + s = server.Server(self.root_dir) s.run() - c = server.Client(self._root_dir) + c = server.Client(self.root_dir) c.send_msgreply = c.read_reply() self.assertTrue(reply == "Reply to: 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") @@ -36,8 +36,25 @@ class ServerTestCase(unittest.TestCase): reply = c.read_reply() self.assertTrue(reply == "Stopping server") + def test_ssh_server(self): + env = test_util.test_environment() + user = getpass.getuser() + # launch server + python_code = "from nepi.util import server;s=server.Server('%s');\ + s.run()" % self.root_dir + server.popen_ssh_subprocess(python_code, host = "localhost", + port = env.port, user = user, agent = True) + c = server.Client(self.root_dir, host = "localhost", port = env.port, + user = user, agent = True) + c.send_msg("Hola") + reply = c.read_reply() + self.assertTrue(reply == "Reply to: Hola") + c.send_stop() + reply = c.read_reply() + self.assertTrue(reply == "Stopping server") + def tearDown(self): - shutil.rmtree(self._root_dir) + shutil.rmtree(self.root_dir) if __name__ == '__main__': unittest.main()