From: Alina Quereilhac Date: Tue, 15 Mar 2011 16:36:45 +0000 (+0100) Subject: bugfixing X-Git-Tag: nepi_v2~185 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=9b57a0addc5aa3b57bad9f419a6d35e2d9dd1887;p=nepi.git bugfixing --- diff --git a/Makefile b/Makefile index f3b1e061..bd3c0b11 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,7 @@ else BUILDDIR := $(BUILDDIR)/lib endif -#PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH) -PYPATH = "../nepi2/src:../nepi2/test/lib:../netns/src" +PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH) COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \ coverage) diff --git a/setup.py b/setup.py index 38d64e00..0fea3730 100755 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ setup( "nepi.testbeds", "nepi.testbeds.netns", "nepi.core", + "nepi.util.parser", "nepi.util" ], package_dir = {"": "src"} ) diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index 37c63500..97402c95 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -260,31 +260,31 @@ class ExperimentController(object): def start(self): self._create_testbed_instances() - for instance in self._testbeds.values(): - instance.do_setup() - for instance in self._testbeds.values(): - instance.do_create() - instance.do_connect() - instance.do_configure() - for instances in self._testbeds.values(): - instance.do_cross_connect() - for instances in self._testbeds.values(): - instance.start() + for testbed in self._testbeds.values(): + testbed.do_setup() + for testbed in self._testbeds.values(): + testbed.do_create() + testbed.do_connect() + testbed.do_configure() + for testbed in self._testbeds.values(): + testbed.do_cross_connect() + for testbed in self._testbeds.values(): + testbed.start() def stop(self): - for instance in self._testbeds.values(): - instance.stop() + for testbed in self._testbeds.values(): + testbed.stop() def is_finished(self, guid): - for instance in self._testbeds.values(): - for guid_ in instance.guids: + for testbed in self._testbeds.values(): + for guid_ in testbed.guids: if guid_ == guid: - return instance.status(guid) == STATUS_FINISHED + return testbed.status(guid) == STATUS_FINISHED raise RuntimeError("No element exists with guid %d" % guid) def shutdown(self): - for instance in self._testbeds.values(): - instance.shutdown() + for testbed in self._testbeds.values(): + testbed.shutdown() def _create_testbed_instances(self): parser = XmlExperimentParser() @@ -295,11 +295,11 @@ class ExperimentController(object): (testbed_id, testbed_version) = data.get_testbed_data(guid) access_config = None if guid not in self._access_config else\ self._access_config[guid] - instance = proxy.create_testbed_instance(testbed_id, + testbed = proxy.create_testbed_instance(testbed_id, testbed_version, access_config) for (name, value) in data.get_attribute_data(guid): - instance.configure(name, value) - self._testbeds[guid] = instance + testbed.configure(name, value) + self._testbeds[guid] = testbed else: element_guids.append(guid) self._program_testbed_instances(element_guids, data) @@ -307,33 +307,33 @@ class ExperimentController(object): def _program_testbed_instances(self, element_guids, data): for guid in element_guids: (testbed_guid, factory_id) = data.get_box_data(guid) - instance = self._testbeds[testbed_guid] - instance.create(guid, factory_id) + testbed = self._testbeds[testbed_guid] + testbed.create(guid, factory_id) for (name, value) in data.get_attribute_data(guid): - instance.create_set(guid, name, value) + testbed.create_set(guid, name, value) for guid in element_guids: (testbed_guid, factory_id) = data.get_box_data(guid) - instance = self._testbeds[testbed_guid] + testbed = self._testbeds[testbed_guid] for (connector_type_name, other_guid, other_connector_type_name) \ in data.get_connection_data(guid): (testbed_guid, factory_id) = data.get_box_data(guid) (other_testbed_guid, other_factory_id) = data.get_box_data( other_guid) if testbed_guid == other_testbed_guid: - instance.connect(guid, connector_type_name, other_guid, + testbed.connect(guid, connector_type_name, other_guid, other_connector_type_name) else: - instance.cross_connect(guid, connector_type_name, other_guid, + testbed.cross_connect(guid, connector_type_name, other_guid, other_testbed_id, other_factory_id, other_connector_type_name) for trace_id in data.get_trace_data(guid): - instance.add_trace(guid, trace_id) + testbed.add_trace(guid, trace_id) for (autoconf, address, family, netprefix, broadcast) in \ data.get_address_data(guid): if address != None: - instance.add_adddress(guid, family, address, netprefix, + testbed.add_address(guid, family, address, netprefix, broadcast) for (family, destination, netprefix, nexthop) in \ data.get_route_data(guid): - instance.add_route(guid, destination, netprefix, nexthop) + testbed.add_route(guid, destination, netprefix, nexthop) diff --git a/src/nepi/core/testbed_impl.py b/src/nepi/core/testbed_impl.py index 1472cca5..54525319 100644 --- a/src/nepi/core/testbed_impl.py +++ b/src/nepi/core/testbed_impl.py @@ -137,7 +137,7 @@ class TestbedInstance(execute.TestbedInstance): self._add_trace[guid] = list() self._add_trace[guid].append(trace_id) - def add_adddress(self, guid, family, address, netprefix, broadcast): + def add_address(self, guid, family, address, netprefix, broadcast): if not guid in self._create: raise RuntimeError("Element guid %d doesn't exist" % guid) factory_id = self._create[guid] diff --git a/src/nepi/testbeds/netns/execute.py b/src/nepi/testbeds/netns/execute.py index 8636677b..e19f737e 100644 --- a/src/nepi/testbeds/netns/execute.py +++ b/src/nepi/testbeds/netns/execute.py @@ -59,9 +59,9 @@ class TestbedInstance(testbed_impl.TestbedInstance): raise NotImplementedError def trace(self, guid, trace_id): - f = open(self.trace_filename(guid, trace_id), "r") - content = f.read() - f.close() + fd = open("%s" % self.trace_filename(guid, trace_id), "r") + content = fd.read() + fd.close() return content def shutdown(self): diff --git a/src/nepi/util/parser/_xml.py b/src/nepi/util/parser/_xml.py index 9e356920..4f47e93f 100644 --- a/src/nepi/util/parser/_xml.py +++ b/src/nepi/util/parser/_xml.py @@ -244,7 +244,7 @@ class XmlExperimentParser(ExperimentParser): getElementsByTagName("address") for address_tag in address_tag_list: if address_tag.nodeType == tag.ELEMENT_NODE: - autoconf = bool(address_tag.getAttribute("AutoConfigure")) \ + autoconf = address_tag.getAttribute("AutoConfigure") == "True" \ if address_tag.hasAttribute("AutoConfigure") else None address = str(address_tag.getAttribute("Address")) \ if address_tag.hasAttribute("Address") else None @@ -290,20 +290,20 @@ class XmlExperimentParser(ExperimentParser): other_guid, other_connector_type_name) def type_to_standard(self, value): - if type(value) == str: + if isinstance(value, str): return Attribute.STRING - if type(value) == bool: + if isinstance(value, bool): return Attribute.BOOL - if type(value) == int: + if isinstance(value, int): return Attribute.INTEGER - if type(value) == float: + if isinstance(value, float): return Attribute.DOUBLE def type_from_standard(self, type, value): if type == Attribute.STRING: return str(value) if type == Attribute.BOOL: - return bool(value) + return value == "True" if type == Attribute.INTEGER: return int(value) if type == Attribute.DOUBLE: diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index e8ca4652..e3d6edbc 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -83,6 +83,41 @@ testbed_messages = dict({ GUIDS: "%d" % GUIDS, }) +instruction_text = dict({ + OK: "OK", + ERROR: "ERROR", + XML: "XML", + ACCESS: "ACCESS", + TRACE: "TRACE", + FINISHED: "FINISHED", + START: "START", + STOP: "STOP", + SHUTDOWN: "SHUTDOWN", + CONFIGURE: "CONFIGURE", + CREATE: "CREATE", + CREATE_SET: "CREATE_SET", + FACTORY_SET: "FACTORY_SET", + CONNECT: "CONNECT", + CROSS_CONNECT: "CROSS_CONNECT", + ADD_TRACE: "ADD_TRACE", + ADD_ADDRESS: "ADD_ADDRESS", + ADD_ROUTE: "ADD_ROUTE", + DO_SETUP: "DO_SETUP", + DO_CREATE: "DO_CREATE", + DO_CONNECT: "DO_CONNECT", + DO_CONFIGURE: "DO_CONFIGURE", + DO_CROSS_CONNECT: "DO_CROSS_CONNECT", + GET: "GET", + SET: "SET", + ACTION: "ACTION", + STATUS: "STATUS", + GUIDS: "GUIDS", + STRING: "STRING", + INTEGER: "INTEGER", + BOOL: "BOOL", + FLOAT: "FLOAT" + }) + def get_type(value): if isinstance(value, bool): return BOOL @@ -99,16 +134,32 @@ def set_type(type, value): elif type == FLOAT: value = float(value) elif type == BOOL: - value = bool(value) + value = value == "True" else: value = str(value) return value +def log_msg(server, params): + instr = int(params[0]) + instr_txt = instruction_text[instr] + server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, + instr_txt, ", ".join(map(str, params[1:])))) + +def log_reply(server, reply): + res = reply.split("|") + code = int(res[0]) + code_txt = instruction_text[code] + txt = base64.b64decode(res[1]) + server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, + code_txt, txt)) + class AccessConfiguration(AttributesMap): MODE_SINGLE_PROCESS = "SINGLE" MODE_DAEMON = "DAEMON" ACCESS_SSH = "SSH" ACCESS_LOCAL = "LOCAL" + ERROR_LEVEL = "Error" + DEBUG_LEVEL = "Debug" def __init__(self): super(AccessConfiguration, self).__init__() @@ -150,15 +201,23 @@ class AccessConfiguration(AttributesMap): type = Attribute.BOOL, value = False, validation_function = validation.is_bool) + self.add_attribute(name = "logLevel", + help = "Log level for instance", + type = Attribute.ENUM, + value = AccessConfiguration.ERROR_LEVEL, + allowed = [AccessConfiguration.ERROR_LEVEL, + AccessConfiguration.DEBUG_LEVEL], + validation_function = validation.is_enum) def create_controller(xml, access_config = None): mode = None if not access_config else access_config.get_attribute_value("mode") if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS: from nepi.core.execute import ExperimentController return ExperimentController(xml) - elif mode ==AccessConfiguration.MODE_DAEMON: + elif mode == AccessConfiguration.MODE_DAEMON: root_dir = access_config.get_attribute_value("rootDirectory") - return ExperimentControllerProxy(xml, root_dir) + log_level = access_config.get_attribute_value("logLevel") + return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml) raise RuntimeError("Unsupported access configuration 'mode'" % mode) def create_testbed_instance(testbed_id, testbed_version, access_config): @@ -166,8 +225,10 @@ def create_testbed_instance(testbed_id, testbed_version, access_config): if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS: return _build_testbed_testbed(testbed_id, testbed_version) elif mode == AccessConfiguration.MODE_DAEMON: - root_dir = access_config.get_attribute_value("rootDirectory") - return TestbedIntanceProxy(testbed_id, testbed_version, root_dir) + root_dir = access_config.get_attribute_value("rootDirectory") + log_level = access_config.get_attribute_value("logLevel") + return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, + testbed_version = testbed_version) raise RuntimeError("Unsupported access configuration 'mode'" % mode) def _build_testbed_testbed(testbed_id, testbed_version): @@ -178,7 +239,7 @@ def _build_testbed_testbed(testbed_id, testbed_version): return module.TestbedInstance(testbed_version) class TestbedInstanceServer(server.Server): - def __init__(self, testbed_id, testbed_version, root_dir): + def __init__(self, root_dir, testbed_id, testbed_version): super(TestbedInstanceServer, self).__init__(root_dir) self._testbed_id = testbed_id self._testbed_version = testbed_version @@ -191,62 +252,65 @@ class TestbedInstanceServer(server.Server): def reply_action(self, msg): params = msg.split("|") instruction = int(params[0]) + log_msg(self, params) try: if instruction == TRACE: - return self.trace(params) + reply = self.trace(params) elif instruction == START: - return self.start(params) + reply = self.start(params) elif instruction == STOP: - return self.stop(params) + reply = self.stop(params) elif instruction == SHUTDOWN: - return self.shutdown(params) + reply = self.shutdown(params) elif instruction == CONFIGURE: - return self.configure(params) + reply = self.configure(params) elif instruction == CREATE: - return self.create(params) + reply = self.create(params) elif instruction == CREATE_SET: - return self.create_set(params) + reply = self.create_set(params) elif instruction == FACTORY_SET: - return self.factory_set(params) + reply = self.factory_set(params) elif instruction == CONNECT: - return self.connect(params) + reply = self.connect(params) elif instruction == CROSS_CONNECT: - return self.cross_connect(params) + reply = self.cross_connect(params) elif instruction == ADD_TRACE: - return self.add_trace(params) + reply = self.add_trace(params) elif instruction == ADD_ADDRESS: - return self.add_address(params) + reply = self.add_address(params) elif instruction == ADD_ROUTE: - return self.add_route(params) + reply = self.add_route(params) elif instruction == DO_SETUP: - return self.do_setup(params) + reply = self.do_setup(params) elif instruction == DO_CREATE: - return self.do_create(params) + reply = self.do_create(params) elif instruction == DO_CONNECT: - return self.do_connect(params) + reply = self.do_connect(params) elif instruction == DO_CONFIGURE: - return self.do_configure(params) + reply = self.do_configure(params) elif instruction == DO_CROSS_CONNECT: - return self.do_cross_connect(params) + reply = self.do_cross_connect(params) elif instruction == GET: - return self.get(params) + reply = self.get(params) elif instruction == SET: - return self.set(params) + reply = self.set(params) elif instruction == ACTION: - return self.action(params) + reply = self.action(params) elif instruction == STATUS: - return self.status(params) + reply = self.status(params) elif instruction == GUIDS: - return self.guids(params) + reply = self.guids(params) else: error = "Invalid instruction %s" % instruction self.log_error(error) result = base64.b64encode(error) - return "%d|%s" % (ERROR, result) + reply = "%d|%s" % (ERROR, result) except: error = self.log_error() result = base64.b64encode(error) - return "%d|%s" % (ERROR, result) + reply = "%d|%s" % (ERROR, result) + log_reply(self, reply) + return reply def guids(self, params): guids = self._testbed.guids @@ -404,7 +468,7 @@ class TestbedInstanceServer(server.Server): return "%d|%s" % (OK, result) class ExperimentControllerServer(server.Server): - def __init__(self, experiment_xml, root_dir): + def __init__(self, root_dir, experiment_xml): super(ExperimentControllerServer, self).__init__(root_dir) self._experiment_xml = experiment_xml self._controller = None @@ -416,30 +480,33 @@ class ExperimentControllerServer(server.Server): def reply_action(self, msg): params = msg.split("|") instruction = int(params[0]) + log_msg(self, params) try: if instruction == XML: - return self.experiment_xml(params) + reply = self.experiment_xml(params) elif instruction == ACCESS: - return self.set_access_configuration(params) + reply = self.set_access_configuration(params) elif instruction == TRACE: - return self.trace(params) + reply = self.trace(params) elif instruction == FINISHED: - return self.is_finished(params) + reply = self.is_finished(params) elif instruction == START: - return self.start(params) + reply = self.start(params) elif instruction == STOP: - return self.stop(params) + reply = self.stop(params) elif instruction == SHUTDOWN: - return self.shutdown(params) + reply = self.shutdown(params) else: error = "Invalid instruction %s" % instruction self.log_error(error) result = base64.b64encode(error) - return "%d|%s" % (ERROR, result) + reply = "%d|%s" % (ERROR, result) except: error = self.log_error() result = base64.b64encode(error) - return "%d|%s" % (ERROR, result) + reply = "%d|%s" % (ERROR, result) + log_reply(self, reply) + return reply def experiment_xml(self, params): xml = self._controller.experiment_xml @@ -454,7 +521,7 @@ class ExperimentControllerServer(server.Server): user = params[5] port = int(params[6]) root_dir = params[7] - use_agent = bool(params[8]) + use_agent = params[8] == "True" access_config = AccessConfiguration() access_config.set_attribute_value("mode", mode) access_config.set_attribute_value("communication", communication) @@ -473,12 +540,13 @@ class ExperimentControllerServer(server.Server): trace_id = params[3] trace = self._controller.trace(testbed_guid, guid, trace_id) result = base64.b64encode(trace) - return "%d|%s" % (OK, "%s" % result) + return "%d|%s" % (OK, result) def is_finished(self, params): guid = int(params[1]) - result = self._controller.is_finished(guid) - return "%d|%s" % (OK, "%r" % result) + status = self._controller.is_finished(guid) + result = base64.b64encode(str(status)) + return "%d|%s" % (OK, result) def start(self, params): self._controller.start() @@ -493,11 +561,17 @@ class ExperimentControllerServer(server.Server): return "%d|%s" % (OK, "") class TestbedIntanceProxy(object): - def __init__(self, testbed_id, testbed_version, root_dir): - # launch daemon - s = TestbedInstanceServer(testbed_id, testbed_version, - root_dir) - s.run() + def __init__(self, root_dir, log_level, testbed_id = None, + testbed_version = None, launch = True): + if launch: + if testbed_id == None or testbed_version == None: + raise RuntimeError("To launch a TesbedInstance server a \ + testbed_id and testbed_version are required") + # launch daemon + s = TestbedInstanceServer(root_dir, testbed_id, testbed_version) + if log_level == AccessConfiguration.DEBUG_LEVEL: + s.set_debug_log_level() + s.run() # create_client self._client = server.Client(root_dir) @@ -776,10 +850,16 @@ class TestbedIntanceProxy(object): self._client.send_stop() class ExperimentControllerProxy(object): - def __init__(self, experiment_xml, root_dir): - # launch daemon - s = ExperimentControllerServer(experiment_xml, root_dir) - s.run() + def __init__(self, root_dir, log_level, experiment_xml = None, launch = True): + if launch: + if experiment_xml == None: + raise RuntimeError("To launch a ExperimentControllerServer a \ + xml description of the experiment is required") + # launch daemon + s = ExperimentControllerServer(root_dir, experiment_xml) + if log_level == AccessConfiguration.DEBUG_LEVEL: + s.set_debug_log_level() + s.run() # create_client self._client = server.Client(root_dir) @@ -790,10 +870,10 @@ class ExperimentControllerProxy(object): reply = self._client.read_reply() result = reply.split("|") code = int(result[0]) - text = base64.b64decode(result[1]) - if code == OK: - return text - raise RuntimeError(text) + text = base64.b64decode(result[1]) + if code == ERROR: + raise RuntimeError(text) + return text def set_access_configuration(self, testbed_guid, access_config): mode = access_config.get_attribute_value("mode") @@ -853,10 +933,10 @@ class ExperimentControllerProxy(object): reply = self._client.read_reply() result = reply.split("|") code = int(result[0]) - if code == OK: - return bool(result[1]) - error = base64.b64decode(result[1]) - raise RuntimeError(error) + text = base64.b64decode(result[1]) + if code == ERROR: + raise RuntimeError(text) + return text == "True" def shutdown(self): msg = controller_messages[SHUTDOWN] diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 1a432ef8..8a3dbed7 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -9,6 +9,8 @@ import socket import sys import subprocess import threading +from time import strftime +import traceback CTRL_SOCK = "ctrl.sock" STD_ERR = "stderr.log" @@ -16,12 +18,16 @@ MAX_FD = 1024 STOP_MSG = "STOP" +ERROR_LEVEL = 0 +DEBUG_LEVEL = 1 + class Server(object): def __init__(self, root_dir = "."): self._root_dir = root_dir self._stop = False self._ctrl_sock = None - self._stderr = None + self._stderr = None + self._log_level = ERROR_LEVEL def run(self): try: @@ -38,16 +44,26 @@ class Server(object): os._exit(0) except: self.log_error() - raise + self.cleanup() + os._exit(0) def daemonize(self): + # pipes for process synchronization + (r, w) = os.pipe() + pid1 = os.fork() if pid1 > 0: - # we do os.waitpid to avoid leaving a (zombie) process - os.waitpid(pid1, 0) + os.close(w) + os.read(r, 1) + os.close(r) + # os.waitpid avoids leaving a (zombie) process + st = os.waitpid(pid1, 0)[1] + if st: + raise RuntimeError("Daemonization failed") # return 0 to inform the caller method that this is not the # daemonized process return 0 + os.close(r) # Decouple from parent environment. os.chdir(self._root_dir) @@ -61,11 +77,12 @@ class Server(object): os._exit(0) # close all open file descriptors. - for fd in range(2, MAX_FD): - try: - os.close(fd) - except OSError: - pass + for fd in range(3, MAX_FD): + if fd != w: + try: + os.close(fd) + except OSError: + pass # Redirect standard file descriptors. self._stderr = stdout = file(STD_ERR, "a", 0) @@ -73,6 +90,9 @@ class Server(object): os.dup2(stdin.fileno(), sys.stdin.fileno()) os.dup2(stdout.fileno(), sys.stdout.fileno()) os.dup2(self._stderr.fileno(), sys.stderr.fileno()) + # let the parent process know that the daemonization is finished + os.write(w, "\n") + os.close(w) return 1 def post_daemonize(self): @@ -93,34 +113,37 @@ class Server(object): if msg == STOP_MSG: self._stop = True - try: - reply = self.stop_action() - except: - self.log_error() - self.send_reply(conn, reply) - break + reply = self.stop_action() else: - try: - reply = self.reply_action(msg) - except: - self.log_error() - self.send_reply(conn, reply) + reply = self.reply_action(msg) + self.send_reply(conn, reply) conn.close() def recv_msg(self, conn): - data = conn.recv(1024) - decoded = base64.b64decode(data) - return decoded.rstrip() + data = "" + while True: + try: + chunk = conn.recv(1024) + except OSError, e: + if e.errno != errno.EINTR: + raise + if chunk == '': + continue + data += chunk + if chunk[-1] == "\n": + break + decoded = base64.b64decode(data) + return decoded.rstrip() def send_reply(self, conn, reply): - encoded = base64.b64encode(reply) - conn.send("%s\n" % encoded) + encoded = base64.b64encode(reply) + conn.send("%s\n" % encoded) def cleanup(self): try: self._ctrl_sock.close() os.remove(CTRL_SOCK) - except e: + except: self.log_error() def stop_action(self): @@ -129,12 +152,23 @@ class Server(object): def reply_action(self, msg): return "Reply to: %s" % msg - def log_error(self, error = None): - if error == None: - import traceback - error = "%s\n" % traceback.format_exc() - sys.stderr.write(error) - return error + def set_error_log_level(self): + self._log_level = ERROR_LEVEL + + def set_debug_log_level(self): + self._log_level = DEBUG_LEVEL + + def log_error(self, text = None): + if text == None: + text = traceback.format_exc() + date = strftime("%Y-%m-%d %H:%M:%S") + sys.stderr.write("ERROR: %s\n%s\n" % (date, text)) + return text + + def log_debug(self, text): + if self._log_level == DEBUG_LEVEL: + date = strftime("%Y-%m-%d %H:%M:%S") + sys.stderr.write("DEBUG: %s\n%s\n" % (date, text)) class Forwarder(object): def __init__(self, root_dir = "."): @@ -173,7 +207,19 @@ class Forwarder(object): self._stop = True def recv_from_server(self): - return self._ctrl_sock.recv(1024) + data = "" + while True: + try: + chunk = self._ctrl_sock.recv(1024) + except OSError, e: + if e.errno != errno.EINTR: + raise + if chunk == '': + continue + data += chunk + if chunk[-1] == "\n": + break + return data def connect(self): self.disconnect() @@ -195,8 +241,7 @@ class Client(object): c.forward()" % root_dir ], stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - env = os.environ) + stdout = subprocess.PIPE) def send_msg(self, msg): encoded = base64.b64encode(msg) diff --git a/test/core/integration.py b/test/core/integration.py index 3febe57e..069d5c55 100755 --- a/test/core/integration.py +++ b/test/core/integration.py @@ -83,6 +83,7 @@ class ExecuteTestCase(unittest.TestCase): proxy.AccessConfiguration.MODE_DAEMON) access_config.set_attribute_value("rootDirectory", self._root_dir) controller = proxy.create_controller(xml, access_config) + controller.start() while not controller.is_finished(app.guid): time.sleep(0.5) @@ -123,6 +124,7 @@ class ExecuteTestCase(unittest.TestCase): proxy.AccessConfiguration.MODE_DAEMON) access_config.set_attribute_value("rootDirectory", self._root_dir) controller.set_access_configuration(desc.guid, access_config) + controller.start() while not controller.is_finished(app.guid): time.sleep(0.5) @@ -161,14 +163,16 @@ class ExecuteTestCase(unittest.TestCase): access_config.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) access_config.set_attribute_value("rootDirectory", self._root_dir) - controller = proxy.create_controller(xml, access_config = None) - access_config = proxy.AccessConfiguration() - access_config.set_attribute_value("mode", + controller = proxy.create_controller(xml, access_config) + + access_config2 = proxy.AccessConfiguration() + access_config2.set_attribute_value("mode", proxy.AccessConfiguration.MODE_DAEMON) inst_root_dir = os.path.join(self._root_dir, "instance") os.mkdir(inst_root_dir) - access_config.set_attribute_value("rootDirectory", inst_root_dir) - controller.set_access_configuration(desc.guid, access_config) + access_config2.set_attribute_value("rootDirectory", inst_root_dir) + controller.set_access_configuration(desc.guid, access_config2) + controller.start() while not controller.is_finished(app.guid): time.sleep(0.5) diff --git a/test/testbeds/netns/execute.py b/test/testbeds/netns/execute.py index 884a57de..1e4d4c12 100755 --- a/test/testbeds/netns/execute.py +++ b/test/testbeds/netns/execute.py @@ -28,11 +28,11 @@ class NetnsExecuteTestCase(unittest.TestCase): instance.create(4, "NodeInterface") instance.create_set(4, "up", True) instance.connect(2, "devs", 4, "node") - instance.add_adddress(4, AF_INET, "10.0.0.1", 24, None) + instance.add_address(4, AF_INET, "10.0.0.1", 24, None) instance.create(5, "NodeInterface") instance.create_set(5, "up", True) instance.connect(3, "devs", 5, "node") - instance.add_adddress(5, AF_INET, "10.0.0.2", 24, None) + instance.add_address(5, AF_INET, "10.0.0.2", 24, None) instance.create(6, "Switch") instance.create_set(6, "up", True) instance.connect(4, "switch", 6, "devs") @@ -71,11 +71,11 @@ class NetnsExecuteTestCase(unittest.TestCase): instance.create(4, "P2PNodeInterface") instance.create_set(4, "up", True) instance.connect(2, "devs", 4, "node") - instance.add_adddress(4, AF_INET, "10.0.0.1", 24, None) + instance.add_address(4, AF_INET, "10.0.0.1", 24, None) instance.create(5, "P2PNodeInterface") instance.create_set(5, "up", True) instance.connect(3, "devs", 5, "node") - instance.add_adddress(5, AF_INET, "10.0.0.2", 24, None) + instance.add_address(5, AF_INET, "10.0.0.2", 24, None) instance.connect(4, "p2p", 5, "p2p") instance.create(6, "Application") instance.create_set(6, "command", "ping -qc1 10.0.0.2") @@ -112,19 +112,19 @@ class NetnsExecuteTestCase(unittest.TestCase): instance.create(5, "NodeInterface") instance.create_set(5, "up", True) instance.connect(2, "devs", 5, "node") - instance.add_adddress(5, AF_INET, "10.0.0.1", 24, None) + instance.add_address(5, AF_INET, "10.0.0.1", 24, None) instance.create(6, "NodeInterface") instance.create_set(6, "up", True) instance.connect(3, "devs", 6, "node") - instance.add_adddress(6, AF_INET, "10.0.0.2", 24, None) + instance.add_address(6, AF_INET, "10.0.0.2", 24, None) instance.create(7, "NodeInterface") instance.create_set(7, "up", True) instance.connect(3, "devs", 7, "node") - instance.add_adddress(7, AF_INET, "10.0.1.1", 24, None) + instance.add_address(7, AF_INET, "10.0.1.1", 24, None) instance.create(8, "NodeInterface") instance.create_set(8, "up", True) instance.connect(4, "devs", 8, "node") - instance.add_adddress(8, AF_INET, "10.0.1.2", 24, None) + instance.add_address(8, AF_INET, "10.0.1.2", 24, None) instance.create(9, "Switch") instance.create_set(9, "up", True) instance.connect(5, "switch", 9, "devs") diff --git a/test/testbeds/netns/integration.py b/test/testbeds/netns/integration.py index a0244d5a..b7a08224 100755 --- a/test/testbeds/netns/integration.py +++ b/test/testbeds/netns/integration.py @@ -4,6 +4,7 @@ import getpass from nepi.core.design import ExperimentDescription, FactoriesProvider from nepi.core.execute import ExperimentController +from nepi.util import proxy import os import shutil import test_util @@ -13,9 +14,9 @@ import uuid class NetnsIntegrationTestCase(unittest.TestCase): def setUp(self): - self._home_dir = os.path.join(os.getenv("HOME"), ".nepi", + self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", str(uuid.uuid1())) - os.makedirs(self._home_dir) + os.makedirs(self._root_dir) @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") def test_local_if(self): @@ -25,7 +26,7 @@ class NetnsIntegrationTestCase(unittest.TestCase): user = getpass.getuser() netns_provider = FactoriesProvider(testbed_id, testbed_version) netns_desc = exp_desc.add_testbed_description(netns_provider) - netns_desc.set_attribute_value("homeDirectory", self._home_dir) + netns_desc.set_attribute_value("homeDirectory", self._root_dir) #netns_desc.set_attribute_value("enableDebug", True) node1 = netns_desc.create("Node") node2 = netns_desc.create("Node") @@ -63,9 +64,73 @@ class NetnsIntegrationTestCase(unittest.TestCase): self.assertTrue(ping_result.startswith(comp_result)) controller.stop() controller.shutdown() - + + @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") + def test_all_daemonized_if(self): + exp_desc = ExperimentDescription() + testbed_version = "01" + testbed_id = "netns" + user = getpass.getuser() + netns_provider = FactoriesProvider(testbed_id, testbed_version) + netns_desc = exp_desc.add_testbed_description(netns_provider) + netns_desc.set_attribute_value("homeDirectory", self._root_dir) + #netns_desc.set_attribute_value("enableDebug", True) + node1 = netns_desc.create("Node") + node2 = netns_desc.create("Node") + iface1 = netns_desc.create("NodeInterface") + iface1.set_attribute_value("up", True) + node1.connector("devs").connect(iface1.connector("node")) + ip1 = iface1.add_address() + ip1.set_attribute_value("Address", "10.0.0.1") + iface2 = netns_desc.create("NodeInterface") + iface2.set_attribute_value("up", True) + node2.connector("devs").connect(iface2.connector("node")) + ip2 = iface2.add_address() + ip2.set_attribute_value("Address", "10.0.0.2") + switch = netns_desc.create("Switch") + switch.set_attribute_value("up", True) + iface1.connector("switch").connect(switch.connector("devs")) + iface2.connector("switch").connect(switch.connector("devs")) + app = netns_desc.create("Application") + app.set_attribute_value("command", "ping -qc1 10.0.0.2") + app.set_attribute_value("user", user) + app.connector("node").connect(node1.connector("apps")) + app.enable_trace("stdout") + xml = exp_desc.to_xml() + + access_config = proxy.AccessConfiguration() + access_config.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + access_config.set_attribute_value("rootDirectory", self._root_dir) + access_config.set_attribute_value("logLevel", + proxy.AccessConfiguration.DEBUG_LEVEL) + controller = proxy.create_controller(xml, access_config) + + access_config2 = proxy.AccessConfiguration() + access_config2.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + inst_root_dir = os.path.join(self._root_dir, "instance") + os.mkdir(inst_root_dir) + access_config2.set_attribute_value("rootDirectory", inst_root_dir) + access_config2.set_attribute_value("logLevel", + proxy.AccessConfiguration.DEBUG_LEVEL) + controller.set_access_configuration(netns_desc.guid, access_config2) + + controller.start() + while not controller.is_finished(app.guid): + time.sleep(0.5) + ping_result = controller.trace(netns_desc.guid, app.guid, "stdout") + comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. + +--- 10.0.0.2 ping statistics --- +1 packets transmitted, 1 received, 0% packet loss, time 0ms +""" + self.assertTrue(ping_result.startswith(comp_result)) + controller.stop() + controller.shutdown() + def tearDown(self): - shutil.rmtree(self._home_dir) + shutil.rmtree(self._root_dir) if __name__ == '__main__': unittest.main() diff --git a/test/util/server.py b/test/util/server.py index 4e0fbaff..3766520e 100755 --- a/test/util/server.py +++ b/test/util/server.py @@ -25,6 +25,17 @@ class ServerTestCase(unittest.TestCase): reply = c.read_reply() self.assertTrue(reply == "Stopping server") + def test_server_long_message(self): + s = server.Server(self._root_dir) + s.run() + c = server.Client(self._root_dir) + c.send_msg("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") + reply = c.read_reply() + self.assertTrue(reply == "Reply to: 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") + c.send_stop() + reply = c.read_reply() + self.assertTrue(reply == "Stopping server") + def tearDown(self): shutil.rmtree(self._root_dir)