from nepi.core.attributes import AttributesMap, Attribute
from nepi.util import server, validation
from nepi.util.constants import TIME_NOW
+import getpass
import sys
+import time
# PROTOCOL REPLIES
OK = 0
# EXPERIMENT CONTROLER PROTOCOL MESSAGES
controller_messages = dict({
XML: "%d" % XML,
- ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r"),
+ ACCESS: "%d|%s" % (ACCESS, "%d|%s|%s|%s|%s|%d|%s|%r|%s"),
TRACE: "%d|%s" % (TRACE, "%d|%d|%s"),
FINISHED: "%d|%s" % (FINISHED, "%d"),
START: "%d" % START,
server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
code_txt, txt))
+def launch_ssh_daemon_client(root_dir, python_code, host, port, user, agent):
+ # launch daemon
+ proc = server.popen_ssh_subprocess(python_code, host = host,
+ port = port, user = user, agent = agent)
+ #while not proc.poll():
+ # time.sleep(0.5)
+ if proc.poll():
+ err = proc.stderr.read()
+ raise RuntimeError("Client could not be executed: %s" % \
+ err)
+ # create client
+ return server.Client(root_dir, host = host, port = port, user = user,
+ agent = agent)
+
+def to_server_log_level(log_level):
+ return server.DEBUG_LEVEL \
+ if log_level == AccessConfiguration.DEBUG_LEVEL \
+ else server.ERROR_LEVEL
+
+def get_access_config_params(access_config):
+ root_dir = access_config.get_attribute_value("rootDirectory")
+ log_level = access_config.get_attribute_value("logLevel")
+ log_level = to_server_log_level(log_level)
+ user = host = port = agent = None
+ communication = access_config.get_attribute_value("communication")
+ if communication == AccessConfiguration.ACCESS_SSH:
+ user = access_config.get_attribute_value("user")
+ host = access_config.get_attribute_value("host")
+ port = access_config.get_attribute_value("port")
+ agent = access_config.get_attribute_value("useAgent")
+ return (root_dir, log_level, user, host, port, agent)
+
class AccessConfiguration(AttributesMap):
MODE_SINGLE_PROCESS = "SINGLE"
MODE_DAEMON = "DAEMON"
self.add_attribute(name = "user",
help = "User on the Host to execute the testbed",
type = Attribute.STRING,
+ value = getpass.getuser(),
validation_function = validation.is_string)
self.add_attribute(name = "port",
help = "Port on the Host",
validation_function = validation.is_enum)
def create_controller(xml, access_config = None):
- mode = None if not access_config else access_config.get_attribute_value("mode")
+ mode = None if not access_config else \
+ access_config.get_attribute_value("mode")
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
from nepi.core.execute import ExperimentController
return ExperimentController(xml)
elif mode == AccessConfiguration.MODE_DAEMON:
- root_dir = access_config.get_attribute_value("rootDirectory")
- log_level = access_config.get_attribute_value("logLevel")
- return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml)
+ (root_dir, log_level, user, host, port, agent) = \
+ get_access_config_params(access_config)
+ return ExperimentControllerProxy(root_dir, log_level,
+ experiment_xml = xml, host = host, port = port, user = user,
+ agent = agent)
raise RuntimeError("Unsupported access configuration 'mode'" % mode)
def create_testbed_instance(testbed_id, testbed_version, access_config):
mode = None if not access_config else access_config.get_attribute_value("mode")
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
- return _build_testbed_testbed(testbed_id, testbed_version)
+ return _build_testbed_instance(testbed_id, testbed_version)
elif mode == AccessConfiguration.MODE_DAEMON:
- root_dir = access_config.get_attribute_value("rootDirectory")
- log_level = access_config.get_attribute_value("logLevel")
+ (root_dir, log_level, user, host, port, agent) = \
+ get_access_config_params(access_config)
return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id,
- testbed_version = testbed_version)
+ testbed_version = testbed_version, host = host, port = port,
+ user = user, agent = agent)
raise RuntimeError("Unsupported access configuration 'mode'" % mode)
-def _build_testbed_testbed(testbed_id, testbed_version):
+def _build_testbed_instance(testbed_id, testbed_version):
mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
if not mod_name in sys.modules:
__import__(mod_name)
return module.TestbedInstance(testbed_version)
class TestbedInstanceServer(server.Server):
- def __init__(self, root_dir, testbed_id, testbed_version):
- super(TestbedInstanceServer, self).__init__(root_dir)
+ def __init__(self, root_dir, log_level, testbed_id, testbed_version):
+ super(TestbedInstanceServer, self).__init__(root_dir, log_level)
self._testbed_id = testbed_id
self._testbed_version = testbed_version
self._testbed = None
def post_daemonize(self):
- self._testbed = _build_testbed_testbed(self._testbed_id,
+ self._testbed = _build_testbed_instance(self._testbed_id,
self._testbed_version)
def reply_action(self, msg):
return "%d|%s" % (OK, result)
class ExperimentControllerServer(server.Server):
- def __init__(self, root_dir, experiment_xml):
- super(ExperimentControllerServer, self).__init__(root_dir)
+ def __init__(self, root_dir, log_level, experiment_xml):
+ super(ExperimentControllerServer, self).__init__(root_dir, log_level)
self._experiment_xml = experiment_xml
self._controller = None
port = int(params[6])
root_dir = params[7]
use_agent = params[8] == "True"
+ log_level = params[9]
access_config = AccessConfiguration()
access_config.set_attribute_value("mode", mode)
access_config.set_attribute_value("communication", communication)
access_config.set_attribute_value("port", port)
access_config.set_attribute_value("rootDirectory", root_dir)
access_config.set_attribute_value("useAgent", use_agent)
+ access_config.set_attribute_value("logLevel", log_level)
self._controller.set_access_configuration(testbed_guid,
access_config)
return "%d|%s" % (OK, "")
class TestbedIntanceProxy(object):
def __init__(self, root_dir, log_level, testbed_id = None,
- testbed_version = None, launch = True):
+ testbed_version = None, launch = True, host = None,
+ port = None, user = None, agent = None):
if launch:
if testbed_id == None or testbed_version == None:
raise RuntimeError("To launch a TesbedInstance server a \
testbed_id and testbed_version are required")
- # launch daemon
- s = TestbedInstanceServer(root_dir, testbed_id, testbed_version)
- if log_level == AccessConfiguration.DEBUG_LEVEL:
- s.set_debug_log_level()
- s.run()
- # create_client
- self._client = server.Client(root_dir)
+ # ssh
+ if host != None:
+ python_code = "from nepi.util.proxy import \
+ TesbedInstanceServer;\
+ s = TestbedInstanceServer('%s', %d, '%s', '%s');\
+ s.run()" % (root_dir, log_level, testbed_id,
+ testbed_version)
+ self._client = launch_ssh_daemon_client(root_dir, python_code,
+ host, port, user, agent)
+ else:
+ # launch daemon
+ s = TestbedInstanceServer(root_dir, log_level, testbed_id,
+ testbed_version)
+ s.run()
+ # create client
+ self._client = server.Client(root_dir)
@property
def guids(self):
self._client.send_stop()
class ExperimentControllerProxy(object):
- def __init__(self, root_dir, log_level, experiment_xml = None, launch = True):
+ def __init__(self, root_dir, log_level, experiment_xml = None,
+ launch = True, host = None, port = None, user = None,
+ agent = None):
if launch:
+ # launch server
if experiment_xml == None:
raise RuntimeError("To launch a ExperimentControllerServer a \
xml description of the experiment is required")
- # launch daemon
- s = ExperimentControllerServer(root_dir, experiment_xml)
- if log_level == AccessConfiguration.DEBUG_LEVEL:
- s.set_debug_log_level()
- s.run()
- # create_client
- self._client = server.Client(root_dir)
+ # ssh
+ if host != None:
+ xml = experiment_xml
+ xml = xml.replace("'", r"\'")
+ xml = xml.replace("\"", r"\'")
+ xml = xml.replace("\n", r"")
+ python_code = "from nepi.util.proxy import ExperimentControllerServer;\
+ s = ExperimentControllerServer('%s', %d, '%s');\
+ s.run()" % (root_dir, log_level, xml)
+ self._client = launch_ssh_daemon_client(root_dir, python_code,
+ host, port, user, agent)
+ else:
+ # launch daemon
+ s = ExperimentControllerServer(root_dir, log_level, experiment_xml)
+ s.run()
+ # create client
+ self._client = server.Client(root_dir)
@property
def experiment_xml(self):
port = access_config.get_attribute_value("port")
root_dir = access_config.get_attribute_value("rootDirectory")
use_agent = access_config.get_attribute_value("useAgent")
+ log_level = access_config.get_attribute_value("logLevel")
msg = controller_messages[ACCESS]
msg = msg % (testbed_guid, mode, communication, host, user, port,
- root_dir, use_agent)
+ root_dir, use_agent, log_level)
self._client.send_msg(msg)
reply = self._client.read_reply()
result = reply.split("|")
import base64
import errno
import os
+import resource
import select
import socket
import sys
import subprocess
import threading
-from time import strftime
+import time
import traceback
CTRL_SOCK = "ctrl.sock"
ERROR_LEVEL = 0
DEBUG_LEVEL = 1
+if hasattr(os, "devnull"):
+ DEV_NULL = os.devnull
+else:
+ DEV_NULL = "/dev/null"
+
class Server(object):
- def __init__(self, root_dir = "."):
+ def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
self._root_dir = root_dir
self._stop = False
self._ctrl_sock = None
- self._stderr = None
- self._log_level = ERROR_LEVEL
+ self._log_level = log_level
def run(self):
try:
os._exit(0)
# close all open file descriptors.
- for fd in range(3, MAX_FD):
+ max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+ if (max_fd == resource.RLIM_INFINITY):
+ max_fd = MAX_FD
+ for fd in range(3, max_fd):
if fd != w:
try:
os.close(fd)
pass
# Redirect standard file descriptors.
- self._stderr = stdout = file(STD_ERR, "a", 0)
- stdin = open('/dev/null', 'r')
+ stdin = open(DEV_NULL, "r")
+ stderr = stdout = open(STD_ERR, "a", 0)
os.dup2(stdin.fileno(), sys.stdin.fileno())
+ # NOTE: sys.stdout.write will still be buffered, even if the file
+ # was opened with 0 buffer
os.dup2(stdout.fileno(), sys.stdout.fileno())
- os.dup2(self._stderr.fileno(), sys.stderr.fileno())
+ os.dup2(stderr.fileno(), sys.stderr.fileno())
+
# let the parent process know that the daemonization is finished
os.write(w, "\n")
os.close(w)
def reply_action(self, msg):
return "Reply to: %s" % msg
- def set_error_log_level(self):
- self._log_level = ERROR_LEVEL
-
- def set_debug_log_level(self):
- self._log_level = DEBUG_LEVEL
-
def log_error(self, text = None):
if text == None:
text = traceback.format_exc()
- date = strftime("%Y-%m-%d %H:%M:%S")
+ date = time.strftime("%Y-%m-%d %H:%M:%S")
sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
return text
def log_debug(self, text):
if self._log_level == DEBUG_LEVEL:
- date = strftime("%Y-%m-%d %H:%M:%S")
+ date = time.strftime("%Y-%m-%d %H:%M:%S")
sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
class Forwarder(object):
def write_data(self, data):
sys.stdout.write(data)
+ # sys.stdout.write is buffered, for this we need to do a flush()
sys.stdout.flush()
def send_to_server(self, data):
pass
class Client(object):
- def __init__(self, root_dir = "."):
- self._process = subprocess.Popen(
- ["python", "-c",
- "from nepi.util import server;c=server.Forwarder('%s');\
- c.forward()" % root_dir
- ],
- stdin = subprocess.PIPE,
- stdout = subprocess.PIPE)
+ def __init__(self, root_dir = ".", host = None, port = None, user = None,
+ agent = None):
+ python_code = "from nepi.util import server;c=server.Forwarder('%s');\
+ c.forward()" % root_dir
+ if host != None:
+ self._process = popen_ssh_subprocess(python_code, host, port,
+ user, agent)
+ else:
+ self._process = subprocess.Popen(
+ ["python", "-c", python_code],
+ stdin = subprocess.PIPE,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE
+ )
def send_msg(self, msg):
encoded = base64.b64encode(msg)
encoded = data.rstrip()
return base64.b64decode(encoded)
+def popen_ssh_subprocess(python_code, host, port, user, agent,
+ python_path = None):
+ if python_path:
+ python_path.replace("'", r"'\''")
+ cmd = """PYTHONPATH="$PYTHONPATH":'%s' """ % python_path
+ else:
+ cmd = ""
+ # Uncomment for debug (to run everything under strace)
+ # We had to verify if strace works (cannot nest them)
+ #cmd += "if strace echo >/dev/null 2>&1; then CMD='strace -ff -tt -s 200 -o strace.out'; else CMD=''; fi\n"
+ #cmd += "$CMD "
+ #if self.mode == MODE_SSH:
+ # cmd += "strace -f -tt -s 200 -o strace$$.out "
+ cmd += "python -c '"
+ cmd += "import base64, os\n"
+ cmd += "cmd = \"\"\n"
+ cmd += "while True:\n"
+ cmd += " cmd += os.read(0, 1)\n" # one byte from stdin
+ cmd += " if cmd[-1] == \"\\n\": break\n"
+ cmd += "cmd = base64.b64decode(cmd)\n"
+ # Uncomment for debug
+ #cmd += "os.write(2, \"Executing python code: %s\\n\" % cmd)\n"
+ cmd += "os.write(1, \"OK\\n\")\n" # send a sync message
+ cmd += "exec(cmd)\n'"
+
+ args = ['ssh',
+ # Don't bother with localhost. Makes test easier
+ '-o', 'NoHostAuthenticationForLocalhost=yes',
+ '-l', user, host]
+ if agent:
+ args.append('-A')
+ if port:
+ args.append('-p%d' % port)
+ args.append(cmd)
+
+ # connects to the remote host and starts a remote rpyc connection
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ # send the command to execute
+ os.write(proc.stdin.fileno(),
+ base64.b64encode(python_code) + "\n")
+ msg = os.read(proc.stdout.fileno(), 3)
+ if msg != "OK\n":
+ raise RuntimeError("Failed to start remote python interpreter")
+ return proc
+
import os
import shutil
import sys
+import tempfile
+import test_util
import time
import unittest
-import uuid
class ExecuteTestCase(unittest.TestCase):
def setUp(self):
sys.modules["nepi.testbeds.mock.metadata_v01"] = mock.metadata_v01
sys.modules["nepi.testbeds.mock"] = mock
- self._root_dir = os.path.join(os.getenv("HOME"), ".nepi",
- str(uuid.uuid1()))
- os.makedirs(self._root_dir)
+ self.root_dir = tempfile.mkdtemp()
def test_single_process_integration(self):
exp_desc = ExperimentDescription()
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
- access_config.set_attribute_value("rootDirectory", self._root_dir)
+ access_config.set_attribute_value("rootDirectory", self.root_dir)
controller = proxy.create_controller(xml, access_config)
controller.start()
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
- access_config.set_attribute_value("rootDirectory", self._root_dir)
+ access_config.set_attribute_value("rootDirectory", self.root_dir)
controller.set_access_configuration(desc.guid, access_config)
controller.start()
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
- access_config.set_attribute_value("rootDirectory", self._root_dir)
+ access_config.set_attribute_value("rootDirectory", self.root_dir)
controller = proxy.create_controller(xml, access_config)
access_config2 = proxy.AccessConfiguration()
access_config2.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
- inst_root_dir = os.path.join(self._root_dir, "instance")
+ inst_root_dir = os.path.join(self.root_dir, "instance")
os.mkdir(inst_root_dir)
access_config2.set_attribute_value("rootDirectory", inst_root_dir)
controller.set_access_configuration(desc.guid, access_config2)
controller.stop()
controller.shutdown()
+ def TODO_test_ssh_daemonized_all_integration(self):
+ # This test doesn't run because
+ # sys.modules["nepi.testbeds.mock"] = mock
+ # is not set in the ssh process
+ exp_desc = ExperimentDescription()
+ testbed_version = "01"
+ testbed_id = "mock"
+ env = test_util.test_environment()
+ provider = FactoriesProvider(testbed_id, testbed_version)
+ desc = exp_desc.add_testbed_description(provider)
+ desc.set_attribute_value("fake", True)
+ node1 = desc.create("Node")
+ node2 = desc.create("Node")
+ iface1 = desc.create("Interface")
+ iface1.set_attribute_value("fake", True)
+ node1.connector("devs").connect(iface1.connector("node"))
+ iface2 = desc.create("Interface")
+ iface2.set_attribute_value("fake", True)
+ node2.connector("devs").connect(iface2.connector("node"))
+ iface1.connector("iface").connect(iface2.connector("iface"))
+ app = desc.create("Application")
+ app.connector("node").connect(node1.connector("apps"))
+ app.enable_trace("fake")
+
+ xml = exp_desc.to_xml()
+ access_config = proxy.AccessConfiguration()
+ access_config.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ access_config.set_attribute_value("rootDirectory", self.root_dir)
+ access_config.set_attribute_value("communication",
+ proxy.AccessConfiguration.ACCESS_SSH)
+ access_config.set_attribute_value("port", env.port)
+ access_config.set_attribute_value("useAgent", True)
+ controller = proxy.create_controller(xml, access_config)
+
+ access_config2 = proxy.AccessConfiguration()
+ access_config2.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+ access_config2.set_attribute_value("communication",
+ proxy.AccessConfiguration.ACCESS_SSH)
+ access_config2.set_attribute_value("port", env.port)
+ access_config2.set_attribute_value("useAgent", True)
+ controller.set_access_configuration(desc.guid, access_config2)
+
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ fake_result = controller.trace(desc.guid, app.guid, "fake")
+ comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+ self.assertTrue(fake_result.startswith(comp_result))
+ controller.stop()
+ controller.shutdown()
+
def tearDown(self):
- shutil.rmtree(self._root_dir)
+ shutil.rmtree(self.root_dir)
if __name__ == '__main__':
unittest.main()
def skipIf(cond, text):
return (lambda f: _bannerwrap(f, text)) if cond else lambda f: f
+def find_bin(name, extra_path = None):
+ search = []
+ if "PATH" in os.environ:
+ search += os.environ["PATH"].split(":")
+ for pref in ("/", "/usr/", "/usr/local/"):
+ for d in ("bin", "sbin"):
+ search.append(pref + d)
+ if extra_path:
+ search += extra_path
+
+ for d in search:
+ try:
+ os.stat(d + "/" + name)
+ return d + "/" + name
+ except OSError, e:
+ if e.errno != os.errno.ENOENT:
+ raise
+ return None
+
+def find_bin_or_die(name, extra_path = None):
+ r = find_bin(name)
+ if not r:
+ raise RuntimeError(("Cannot find `%s' command, impossible to " +
+ "continue.") % name)
+ return r
+
# SSH stuff
import os, os.path, re, signal, shutil, socket, subprocess, tempfile
for k in data:
del os.environ[k]
+class test_environment(object):
+ def __init__(self):
+ sshd = find_bin_or_die("sshd")
+ environ = {}
+ if 'PYTHONPATH' in os.environ:
+ environ['PYTHONPATH'] = ":".join(map(os.path.realpath,
+ os.environ['PYTHONPATH'].split(":")))
+
+ self.dir = tempfile.mkdtemp()
+ self.server_keypair = gen_ssh_keypair(
+ os.path.join(self.dir, "server_key"))
+ self.client_keypair = gen_ssh_keypair(
+ os.path.join(self.dir, "client_key"))
+ self.authorized_keys = gen_auth_keys(self.client_keypair[1],
+ os.path.join(self.dir, "authorized_keys"), environ)
+ self.port = get_free_port()
+ self.sshd_conf = gen_sshd_config(
+ os.path.join(self.dir, "sshd_config"),
+ self.port, self.server_keypair[0], self.authorized_keys)
+
+ self.sshd = subprocess.Popen([sshd, '-q', '-D', '-f', self.sshd_conf])
+ self.ssh_agent_vars = start_ssh_agent()
+ add_key_to_agent(self.client_keypair[0])
+
+ def __del__(self):
+ if self.sshd:
+ os.kill(self.sshd.pid, signal.SIGTERM)
+ self.sshd.wait()
+ if self.ssh_agent_vars:
+ stop_ssh_agent(self.ssh_agent_vars)
+ shutil.rmtree(self.dir)
from nepi.testbeds import netns
import os
import shutil
+import tempfile
import test_util
import time
import unittest
-import uuid
class NetnsExecuteTestCase(unittest.TestCase):
def setUp(self):
- self._home_dir = os.path.join(os.getenv("HOME"), ".nepi",
- str(uuid.uuid1()))
- os.makedirs(self._home_dir)
+ self.root_dir = tempfile.mkdtemp()
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_run_ping_if(self):
user = getpass.getuser()
testbed_version = "01"
instance = netns.TestbedInstance(testbed_version)
- instance.configure("homeDirectory", self._home_dir)
+ instance.configure("homeDirectory", self.root_dir)
instance.create(2, "Node")
instance.create(3, "Node")
instance.create(4, "NodeInterface")
user = getpass.getuser()
testbed_version = "01"
instance = netns.TestbedInstance(testbed_version)
- instance.configure("homeDirectory", self._home_dir)
+ instance.configure("homeDirectory", self.root_dir)
instance.create(2, "Node")
instance.create(3, "Node")
instance.create(4, "P2PNodeInterface")
user = getpass.getuser()
testbed_version = "01"
instance = netns.TestbedInstance(testbed_version)
- instance.configure("homeDirectory", self._home_dir)
+ instance.configure("homeDirectory", self.root_dir)
instance.create(2, "Node")
instance.create(3, "Node")
instance.create(4, "Node")
instance.shutdown()
def tearDown(self):
- shutil.rmtree(self._home_dir)
+ shutil.rmtree(self.root_dir)
if __name__ == '__main__':
unittest.main()
from nepi.util import proxy
import os
import shutil
+import tempfile
import test_util
import time
import unittest
-import uuid
class NetnsIntegrationTestCase(unittest.TestCase):
def setUp(self):
- self._root_dir = os.path.join(os.getenv("HOME"), ".nepi",
- str(uuid.uuid1()))
- os.makedirs(self._root_dir)
+ self.root_dir = tempfile.mkdtemp()
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_local_if(self):
user = getpass.getuser()
netns_provider = FactoriesProvider(testbed_id, testbed_version)
netns_desc = exp_desc.add_testbed_description(netns_provider)
- netns_desc.set_attribute_value("homeDirectory", self._root_dir)
+ netns_desc.set_attribute_value("homeDirectory", self.root_dir)
#netns_desc.set_attribute_value("enableDebug", True)
node1 = netns_desc.create("Node")
node2 = netns_desc.create("Node")
user = getpass.getuser()
netns_provider = FactoriesProvider(testbed_id, testbed_version)
netns_desc = exp_desc.add_testbed_description(netns_provider)
- netns_desc.set_attribute_value("homeDirectory", self._root_dir)
+ netns_desc.set_attribute_value("homeDirectory", self.root_dir)
#netns_desc.set_attribute_value("enableDebug", True)
node1 = netns_desc.create("Node")
node2 = netns_desc.create("Node")
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
- access_config.set_attribute_value("rootDirectory", self._root_dir)
+ access_config.set_attribute_value("rootDirectory", self.root_dir)
access_config.set_attribute_value("logLevel",
proxy.AccessConfiguration.DEBUG_LEVEL)
controller = proxy.create_controller(xml, access_config)
access_config2 = proxy.AccessConfiguration()
access_config2.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
- inst_root_dir = os.path.join(self._root_dir, "instance")
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+ access_config2.set_attribute_value("logLevel",
+ proxy.AccessConfiguration.DEBUG_LEVEL)
+ controller.set_access_configuration(netns_desc.guid, access_config2)
+
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ ping_result = controller.trace(netns_desc.guid, app.guid, "stdout")
+ comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+ self.assertTrue(ping_result.startswith(comp_result))
+ controller.stop()
+ controller.shutdown()
+
+ @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+ def test_all_ssh_daemonized_if(self):
+ exp_desc = ExperimentDescription()
+ testbed_version = "01"
+ testbed_id = "netns"
+ env = test_util.test_environment()
+ user = getpass.getuser()
+ netns_provider = FactoriesProvider(testbed_id, testbed_version)
+ netns_desc = exp_desc.add_testbed_description(netns_provider)
+ netns_desc.set_attribute_value("homeDirectory", self.root_dir)
+ #netns_desc.set_attribute_value("enableDebug", True)
+ node1 = netns_desc.create("Node")
+ node2 = netns_desc.create("Node")
+ iface1 = netns_desc.create("NodeInterface")
+ iface1.set_attribute_value("up", True)
+ node1.connector("devs").connect(iface1.connector("node"))
+ ip1 = iface1.add_address()
+ ip1.set_attribute_value("Address", "10.0.0.1")
+ iface2 = netns_desc.create("NodeInterface")
+ iface2.set_attribute_value("up", True)
+ node2.connector("devs").connect(iface2.connector("node"))
+ ip2 = iface2.add_address()
+ ip2.set_attribute_value("Address", "10.0.0.2")
+ switch = netns_desc.create("Switch")
+ switch.set_attribute_value("up", True)
+ iface1.connector("switch").connect(switch.connector("devs"))
+ iface2.connector("switch").connect(switch.connector("devs"))
+ app = netns_desc.create("Application")
+ app.set_attribute_value("command", "ping -qc1 10.0.0.2")
+ app.set_attribute_value("user", user)
+ app.connector("node").connect(node1.connector("apps"))
+ app.enable_trace("stdout")
+ xml = exp_desc.to_xml()
+
+ access_config = proxy.AccessConfiguration()
+ access_config.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ access_config.set_attribute_value("rootDirectory", self.root_dir)
+ access_config.set_attribute_value("logLevel",
+ proxy.AccessConfiguration.DEBUG_LEVEL)
+ access_config.set_attribute_value("communication",
+ proxy.AccessConfiguration.ACCESS_SSH)
+ access_config.set_attribute_value("port", env.port)
+ access_config.set_attribute_value("useAgent", True)
+ controller = proxy.create_controller(xml, access_config)
+
+ access_config2 = proxy.AccessConfiguration()
+ access_config2.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
os.mkdir(inst_root_dir)
access_config2.set_attribute_value("rootDirectory", inst_root_dir)
access_config2.set_attribute_value("logLevel",
controller.shutdown()
def tearDown(self):
- shutil.rmtree(self._root_dir)
+ shutil.rmtree(self.root_dir)
if __name__ == '__main__':
unittest.main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import getpass
from nepi.util import server
import os
import shutil
import sys
+import tempfile
+import test_util
import unittest
-import uuid
class ServerTestCase(unittest.TestCase):
def setUp(self):
- self._root_dir = os.path.join(os.getenv("HOME"), ".nepi",
- str(uuid.uuid1()))
- os.makedirs(self._root_dir)
+ self.root_dir = tempfile.mkdtemp()
def test_server(self):
- s = server.Server(self._root_dir)
+ s = server.Server(self.root_dir)
s.run()
- c = server.Client(self._root_dir)
+ c = server.Client(self.root_dir)
c.send_msg("Hola")
reply = c.read_reply()
self.assertTrue(reply == "Reply to: Hola")
self.assertTrue(reply == "Stopping server")
def test_server_long_message(self):
- s = server.Server(self._root_dir)
+ s = server.Server(self.root_dir)
s.run()
- c = server.Client(self._root_dir)
+ c = server.Client(self.root_dir)
c.send_msg("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
reply = c.read_reply()
self.assertTrue(reply == "Reply to: 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
reply = c.read_reply()
self.assertTrue(reply == "Stopping server")
+ def test_ssh_server(self):
+ env = test_util.test_environment()
+ user = getpass.getuser()
+ # launch server
+ python_code = "from nepi.util import server;s=server.Server('%s');\
+ s.run()" % self.root_dir
+ server.popen_ssh_subprocess(python_code, host = "localhost",
+ port = env.port, user = user, agent = True)
+ c = server.Client(self.root_dir, host = "localhost", port = env.port,
+ user = user, agent = True)
+ c.send_msg("Hola")
+ reply = c.read_reply()
+ self.assertTrue(reply == "Reply to: Hola")
+ c.send_stop()
+ reply = c.read_reply()
+ self.assertTrue(reply == "Stopping server")
+
def tearDown(self):
- shutil.rmtree(self._root_dir)
+ shutil.rmtree(self.root_dir)
if __name__ == '__main__':
unittest.main()