bugfixing
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 15 Mar 2011 16:36:45 +0000 (17:36 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 15 Mar 2011 16:36:45 +0000 (17:36 +0100)
12 files changed:
Makefile
setup.py
src/nepi/core/execute.py
src/nepi/core/testbed_impl.py
src/nepi/testbeds/netns/execute.py
src/nepi/util/parser/_xml.py
src/nepi/util/proxy.py
src/nepi/util/server.py
test/core/integration.py
test/testbeds/netns/execute.py
test/testbeds/netns/integration.py
test/util/server.py

index f3b1e06..bd3c0b1 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -17,8 +17,7 @@ else
 BUILDDIR := $(BUILDDIR)/lib
 endif
 
-#PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH)
-PYPATH = "../nepi2/src:../nepi2/test/lib:../netns/src"
+PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH)
 COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \
           coverage)
 
index 38d64e0..0fea373 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -16,6 +16,7 @@ setup(
             "nepi.testbeds",
             "nepi.testbeds.netns",
             "nepi.core",
+            "nepi.util.parser",
             "nepi.util" ],
         package_dir = {"": "src"}
     )
index 37c6350..97402c9 100644 (file)
@@ -260,31 +260,31 @@ class ExperimentController(object):
 
     def start(self):
         self._create_testbed_instances()
-        for instance in self._testbeds.values():
-            instance.do_setup()
-        for instance in self._testbeds.values():
-            instance.do_create()
-            instance.do_connect()
-            instance.do_configure()
-        for instances in self._testbeds.values():
-            instance.do_cross_connect()
-        for instances in self._testbeds.values():
-            instance.start()
+        for testbed in self._testbeds.values():
+            testbed.do_setup()
+        for testbed in self._testbeds.values():
+            testbed.do_create()
+            testbed.do_connect()
+            testbed.do_configure()
+        for testbed in self._testbeds.values():
+            testbed.do_cross_connect()
+        for testbed in self._testbeds.values():
+            testbed.start()
 
     def stop(self):
-       for instance in self._testbeds.values():
-           instance.stop()
+       for testbed in self._testbeds.values():
+           testbed.stop()
 
     def is_finished(self, guid):
-        for instance in self._testbeds.values():
-            for guid_ in instance.guids:
+        for testbed in self._testbeds.values():
+            for guid_ in testbed.guids:
                 if guid_ == guid:
-                    return instance.status(guid) == STATUS_FINISHED
+                    return testbed.status(guid) == STATUS_FINISHED
         raise RuntimeError("No element exists with guid %d" % guid)    
 
     def shutdown(self):
-       for instance in self._testbeds.values():
-           instance.shutdown()
+       for testbed in self._testbeds.values():
+           testbed.shutdown()
 
     def _create_testbed_instances(self):
         parser = XmlExperimentParser()
@@ -295,11 +295,11 @@ class ExperimentController(object):
                 (testbed_id, testbed_version) = data.get_testbed_data(guid)
                 access_config = None if guid not in self._access_config else\
                         self._access_config[guid]
-                instance = proxy.create_testbed_instance(testbed_id, 
+                testbed = proxy.create_testbed_instance(testbed_id, 
                         testbed_version, access_config)
                 for (name, value) in data.get_attribute_data(guid):
-                    instance.configure(name, value)
-                self._testbeds[guid] = instance
+                    testbed.configure(name, value)
+                self._testbeds[guid] = testbed
             else:
                 element_guids.append(guid)
         self._program_testbed_instances(element_guids, data)
@@ -307,33 +307,33 @@ class ExperimentController(object):
     def _program_testbed_instances(self, element_guids, data):
         for guid in element_guids:
             (testbed_guid, factory_id) = data.get_box_data(guid)
-            instance = self._testbeds[testbed_guid]
-            instance.create(guid, factory_id)
+            testbed = self._testbeds[testbed_guid]
+            testbed.create(guid, factory_id)
             for (name, value) in data.get_attribute_data(guid):
-                instance.create_set(guid, name, value)
+                testbed.create_set(guid, name, value)
 
         for guid in element_guids: 
             (testbed_guid, factory_id) = data.get_box_data(guid)
-            instance = self._testbeds[testbed_guid]
+            testbed = self._testbeds[testbed_guid]
             for (connector_type_name, other_guid, other_connector_type_name) \
                     in data.get_connection_data(guid):
                 (testbed_guid, factory_id) = data.get_box_data(guid)
                 (other_testbed_guid, other_factory_id) = data.get_box_data(
                         other_guid)
                 if testbed_guid == other_testbed_guid:
-                    instance.connect(guid, connector_type_name, other_guid, 
+                    testbed.connect(guid, connector_type_name, other_guid, 
                         other_connector_type_name)
                 else:
-                    instance.cross_connect(guid, connector_type_name, other_guid, 
+                    testbed.cross_connect(guid, connector_type_name, other_guid, 
                         other_testbed_id, other_factory_id, other_connector_type_name)
             for trace_id in data.get_trace_data(guid):
-                instance.add_trace(guid, trace_id)
+                testbed.add_trace(guid, trace_id)
             for (autoconf, address, family, netprefix, broadcast) in \
                     data.get_address_data(guid):
                 if address != None:
-                    instance.add_adddress(guid, family, address, netprefix,
+                    testbed.add_address(guid, family, address, netprefix,
                         broadcast)
             for (family, destination, netprefix, nexthop) in \
                     data.get_route_data(guid):
-                instance.add_route(guid, destination, netprefix, nexthop)
+                testbed.add_route(guid, destination, netprefix, nexthop)
 
index 1472cca..5452531 100644 (file)
@@ -137,7 +137,7 @@ class TestbedInstance(execute.TestbedInstance):
             self._add_trace[guid] = list()
         self._add_trace[guid].append(trace_id)
 
-    def add_adddress(self, guid, family, address, netprefix, broadcast):
+    def add_address(self, guid, family, address, netprefix, broadcast):
         if not guid in self._create:
             raise RuntimeError("Element guid %d doesn't exist" % guid)
         factory_id = self._create[guid]
index 8636677..e19f737 100644 (file)
@@ -59,9 +59,9 @@ class TestbedInstance(testbed_impl.TestbedInstance):
         raise NotImplementedError
 
     def trace(self, guid, trace_id):
-        f = open(self.trace_filename(guid, trace_id), "r")
-        content = f.read()
-        f.close()
+        fd = open("%s" % self.trace_filename(guid, trace_id), "r")
+        content = fd.read()
+        fd.close()
         return content
 
     def shutdown(self):
index 9e35692..4f47e93 100644 (file)
@@ -244,7 +244,7 @@ class XmlExperimentParser(ExperimentParser):
                 getElementsByTagName("address")
         for address_tag in address_tag_list:
             if address_tag.nodeType == tag.ELEMENT_NODE:
-                autoconf = bool(address_tag.getAttribute("AutoConfigure")) \
+                autoconf = address_tag.getAttribute("AutoConfigure") == "True" \
                        if address_tag.hasAttribute("AutoConfigure") else None
                 address = str(address_tag.getAttribute("Address")) \
                        if address_tag.hasAttribute("Address") else None
@@ -290,20 +290,20 @@ class XmlExperimentParser(ExperimentParser):
                          other_guid, other_connector_type_name)
 
     def type_to_standard(self, value):
-        if type(value) == str:
+        if isinstance(value, str):
             return Attribute.STRING
-        if type(value) == bool:
+        if isinstance(value, bool):
             return Attribute.BOOL
-        if type(value) == int:
+        if isinstance(value, int):
             return Attribute.INTEGER
-        if type(value) == float:
+        if isinstance(value, float):
             return Attribute.DOUBLE
     
     def type_from_standard(self, type, value):
         if type == Attribute.STRING:
             return str(value)
         if type == Attribute.BOOL:
-            return bool(value)
+            return value == "True"
         if type == Attribute.INTEGER:
             return int(value)
         if type == Attribute.DOUBLE:
index e8ca465..e3d6edb 100644 (file)
@@ -83,6 +83,41 @@ testbed_messages = dict({
     GUIDS:  "%d" % GUIDS,
     })
 
+instruction_text = dict({
+    OK:     "OK",
+    ERROR:  "ERROR",
+    XML:    "XML",
+    ACCESS: "ACCESS",
+    TRACE:  "TRACE",
+    FINISHED:   "FINISHED",
+    START:  "START",
+    STOP:   "STOP",
+    SHUTDOWN:   "SHUTDOWN",
+    CONFIGURE:  "CONFIGURE",
+    CREATE: "CREATE",
+    CREATE_SET: "CREATE_SET",
+    FACTORY_SET:    "FACTORY_SET",
+    CONNECT:    "CONNECT",
+    CROSS_CONNECT: "CROSS_CONNECT",
+    ADD_TRACE:  "ADD_TRACE",
+    ADD_ADDRESS:    "ADD_ADDRESS",
+    ADD_ROUTE:  "ADD_ROUTE",
+    DO_SETUP:   "DO_SETUP",
+    DO_CREATE:  "DO_CREATE",
+    DO_CONNECT: "DO_CONNECT",
+    DO_CONFIGURE:   "DO_CONFIGURE",
+    DO_CROSS_CONNECT:   "DO_CROSS_CONNECT",
+    GET:    "GET",
+    SET:    "SET",
+    ACTION: "ACTION",
+    STATUS: "STATUS",
+    GUIDS:  "GUIDS",
+    STRING: "STRING",
+    INTEGER:    "INTEGER",
+    BOOL:   "BOOL",
+    FLOAT:  "FLOAT"
+    })
+
 def get_type(value):
     if isinstance(value, bool):
         return BOOL
@@ -99,16 +134,32 @@ def set_type(type, value):
     elif type == FLOAT:
         value = float(value)
     elif type == BOOL:
-        value = bool(value)
+        value = value == "True"
     else:
         value = str(value)
     return value
 
+def log_msg(server, params):
+    instr = int(params[0])
+    instr_txt = instruction_text[instr]
+    server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__, 
+        instr_txt, ", ".join(map(str, params[1:]))))
+
+def log_reply(server, reply):
+    res = reply.split("|")
+    code = int(res[0])
+    code_txt = instruction_text[code]
+    txt = base64.b64decode(res[1])
+    server.log_debug("%s - reply: %s %s" % (server.__class__.__name__, 
+            code_txt, txt))
+
 class AccessConfiguration(AttributesMap):
     MODE_SINGLE_PROCESS = "SINGLE"
     MODE_DAEMON = "DAEMON"
     ACCESS_SSH = "SSH"
     ACCESS_LOCAL = "LOCAL"
+    ERROR_LEVEL = "Error"
+    DEBUG_LEVEL = "Debug"
 
     def __init__(self):
         super(AccessConfiguration, self).__init__()
@@ -150,15 +201,23 @@ class AccessConfiguration(AttributesMap):
                 type = Attribute.BOOL,
                 value = False,
                 validation_function = validation.is_bool)
+        self.add_attribute(name = "logLevel",
+                help = "Log level for instance",
+                type = Attribute.ENUM,
+                value = AccessConfiguration.ERROR_LEVEL,
+                allowed = [AccessConfiguration.ERROR_LEVEL,
+                    AccessConfiguration.DEBUG_LEVEL],
+                validation_function = validation.is_enum)
 
 def create_controller(xml, access_config = None):
     mode = None if not access_config else access_config.get_attribute_value("mode")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
         from nepi.core.execute import ExperimentController
         return ExperimentController(xml)
-    elif mode ==AccessConfiguration.MODE_DAEMON:
+    elif mode == AccessConfiguration.MODE_DAEMON:
         root_dir = access_config.get_attribute_value("rootDirectory")
-        return ExperimentControllerProxy(xml, root_dir)
+        log_level = access_config.get_attribute_value("logLevel")
+        return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml)
     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
 
 def create_testbed_instance(testbed_id, testbed_version, access_config):
@@ -166,8 +225,10 @@ def create_testbed_instance(testbed_id, testbed_version, access_config):
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
         return  _build_testbed_testbed(testbed_id, testbed_version)
     elif mode == AccessConfiguration.MODE_DAEMON:
-                root_dir = access_config.get_attribute_value("rootDirectory")
-                return TestbedIntanceProxy(testbed_id, testbed_version, root_dir)
+        root_dir = access_config.get_attribute_value("rootDirectory")
+        log_level = access_config.get_attribute_value("logLevel")
+        return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id, 
+                testbed_version = testbed_version)
     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
 
 def _build_testbed_testbed(testbed_id, testbed_version):
@@ -178,7 +239,7 @@ def _build_testbed_testbed(testbed_id, testbed_version):
     return module.TestbedInstance(testbed_version)
 
 class TestbedInstanceServer(server.Server):
-    def __init__(self, testbed_id, testbed_version, root_dir):
+    def __init__(self, root_dir, testbed_id, testbed_version):
         super(TestbedInstanceServer, self).__init__(root_dir)
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
@@ -191,62 +252,65 @@ class TestbedInstanceServer(server.Server):
     def reply_action(self, msg):
         params = msg.split("|")
         instruction = int(params[0])
+        log_msg(self, params)
         try:
             if instruction == TRACE:
-                return self.trace(params)
+                reply = self.trace(params)
             elif instruction == START:
-                return self.start(params)
+                reply = self.start(params)
             elif instruction == STOP:
-                return self.stop(params)
+                reply = self.stop(params)
             elif instruction == SHUTDOWN:
-                return self.shutdown(params)
+                reply = self.shutdown(params)
             elif instruction == CONFIGURE:
-                return self.configure(params)
+                reply = self.configure(params)
             elif instruction == CREATE:
-                return self.create(params)
+                reply = self.create(params)
             elif instruction == CREATE_SET:
-                return self.create_set(params)
+                reply = self.create_set(params)
             elif instruction == FACTORY_SET:
-                return self.factory_set(params)
+                reply = self.factory_set(params)
             elif instruction == CONNECT:
-                return self.connect(params)
+                reply = self.connect(params)
             elif instruction == CROSS_CONNECT:
-                return self.cross_connect(params)
+                reply = self.cross_connect(params)
             elif instruction == ADD_TRACE:
-                return self.add_trace(params)
+                reply = self.add_trace(params)
             elif instruction == ADD_ADDRESS:
-                return self.add_address(params)
+                reply = self.add_address(params)
             elif instruction == ADD_ROUTE:
-                return self.add_route(params)
+                reply = self.add_route(params)
             elif instruction == DO_SETUP:
-                return self.do_setup(params)
+                reply = self.do_setup(params)
             elif instruction == DO_CREATE:
-                return self.do_create(params)
+                reply = self.do_create(params)
             elif instruction == DO_CONNECT:
-                return self.do_connect(params)
+                reply = self.do_connect(params)
             elif instruction == DO_CONFIGURE:
-                return self.do_configure(params)
+                reply = self.do_configure(params)
             elif instruction == DO_CROSS_CONNECT:
-                return self.do_cross_connect(params)
+                reply = self.do_cross_connect(params)
             elif instruction == GET:
-                return self.get(params)
+                reply = self.get(params)
             elif instruction == SET:
-                return self.set(params)
+                reply = self.set(params)
             elif instruction == ACTION:
-                return self.action(params)
+                reply = self.action(params)
             elif instruction == STATUS:
-                return self.status(params)
+                reply = self.status(params)
             elif instruction == GUIDS:
-                return self.guids(params)
+                reply = self.guids(params)
             else:
                 error = "Invalid instruction %s" % instruction
                 self.log_error(error)
                 result = base64.b64encode(error)
-                return "%d|%s" % (ERROR, result)
+                reply = "%d|%s" % (ERROR, result)
         except:
             error = self.log_error()
             result = base64.b64encode(error)
-            return "%d|%s" % (ERROR, result)
+            reply = "%d|%s" % (ERROR, result)
+        log_reply(self, reply)
+        return reply
 
     def guids(self, params):
         guids = self._testbed.guids
@@ -404,7 +468,7 @@ class TestbedInstanceServer(server.Server):
         return "%d|%s" % (OK, result)
  
 class ExperimentControllerServer(server.Server):
-    def __init__(self, experiment_xml, root_dir):
+    def __init__(self, root_dir, experiment_xml):
         super(ExperimentControllerServer, self).__init__(root_dir)
         self._experiment_xml = experiment_xml
         self._controller = None
@@ -416,30 +480,33 @@ class ExperimentControllerServer(server.Server):
     def reply_action(self, msg):
         params = msg.split("|")
         instruction = int(params[0])
+        log_msg(self, params)
         try:
             if instruction == XML:
-                return self.experiment_xml(params)
+                reply = self.experiment_xml(params)
             elif instruction == ACCESS:
-                return self.set_access_configuration(params)
+                reply = self.set_access_configuration(params)
             elif instruction == TRACE:
-                return self.trace(params)
+                reply = self.trace(params)
             elif instruction == FINISHED:
-                return self.is_finished(params)
+                reply = self.is_finished(params)
             elif instruction == START:
-                return self.start(params)
+                reply = self.start(params)
             elif instruction == STOP:
-                return self.stop(params)
+                reply = self.stop(params)
             elif instruction == SHUTDOWN:
-                return self.shutdown(params)
+                reply = self.shutdown(params)
             else:
                 error = "Invalid instruction %s" % instruction
                 self.log_error(error)
                 result = base64.b64encode(error)
-                return "%d|%s" % (ERROR, result)
+                reply = "%d|%s" % (ERROR, result)
         except:
             error = self.log_error()
             result = base64.b64encode(error)
-            return "%d|%s" % (ERROR, result)
+            reply = "%d|%s" % (ERROR, result)
+        log_reply(self, reply)
+        return reply
 
     def experiment_xml(self, params):
         xml = self._controller.experiment_xml
@@ -454,7 +521,7 @@ class ExperimentControllerServer(server.Server):
         user = params[5]
         port = int(params[6])
         root_dir = params[7]
-        use_agent = bool(params[8])
+        use_agent = params[8] == "True"
         access_config = AccessConfiguration()
         access_config.set_attribute_value("mode", mode)
         access_config.set_attribute_value("communication", communication)
@@ -473,12 +540,13 @@ class ExperimentControllerServer(server.Server):
         trace_id = params[3]
         trace = self._controller.trace(testbed_guid, guid, trace_id)
         result = base64.b64encode(trace)
-        return "%d|%s" % (OK, "%s" % result)
+        return "%d|%s" % (OK, result)
 
     def is_finished(self, params):
         guid = int(params[1])
-        result = self._controller.is_finished(guid)
-        return "%d|%s" % (OK, "%r" % result)
+        status = self._controller.is_finished(guid)
+        result = base64.b64encode(str(status))
+        return "%d|%s" % (OK, result)
 
     def start(self, params):
         self._controller.start()
@@ -493,11 +561,17 @@ class ExperimentControllerServer(server.Server):
         return "%d|%s" % (OK, "")
 
 class TestbedIntanceProxy(object):
-    def __init__(self, testbed_id, testbed_version, root_dir):
-        # launch daemon
-        s = TestbedInstanceServer(testbed_id, testbed_version, 
-                root_dir)
-        s.run()
+    def __init__(self, root_dir, log_level, testbed_id = None, 
+            testbed_version = None, launch = True):
+        if launch:
+            if testbed_id == None or testbed_version == None:
+                raise RuntimeError("To launch a TesbedInstance server a \
+                        testbed_id and testbed_version are required")
+            # launch daemon
+            s = TestbedInstanceServer(root_dir, testbed_id, testbed_version)
+            if log_level == AccessConfiguration.DEBUG_LEVEL:
+                s.set_debug_log_level()
+            s.run()
         # create_client
         self._client = server.Client(root_dir)
 
@@ -776,10 +850,16 @@ class TestbedIntanceProxy(object):
         self._client.send_stop()
 
 class ExperimentControllerProxy(object):
-    def __init__(self, experiment_xml, root_dir):
-        # launch daemon
-        s = ExperimentControllerServer(experiment_xml, root_dir)
-        s.run()
+    def __init__(self, root_dir, log_level, experiment_xml = None, launch = True):
+        if launch:
+            if experiment_xml == None:
+                raise RuntimeError("To launch a ExperimentControllerServer a \
+                        xml description of the experiment is required")
+            # launch daemon
+            s = ExperimentControllerServer(root_dir, experiment_xml)
+            if log_level == AccessConfiguration.DEBUG_LEVEL:
+                s.set_debug_log_level()
+            s.run()
         # create_client
         self._client = server.Client(root_dir)
 
@@ -790,10 +870,10 @@ class ExperimentControllerProxy(object):
         reply = self._client.read_reply()
         result = reply.split("|")
         code = int(result[0])
-        text =  base64.b64decode(result[1])
-        if code == OK:
-            return text
-        raise RuntimeError(text)
+        text = base64.b64decode(result[1])
+        if code == ERROR:
+            raise RuntimeError(text)
+        return text
 
     def set_access_configuration(self, testbed_guid, access_config):
         mode = access_config.get_attribute_value("mode")
@@ -853,10 +933,10 @@ class ExperimentControllerProxy(object):
         reply = self._client.read_reply()
         result = reply.split("|")
         code = int(result[0])
-        if code == OK:
-            return bool(result[1])
-        error =  base64.b64decode(result[1])
-        raise RuntimeError(error)
+        text = base64.b64decode(result[1])
+        if code == ERROR:
+            raise RuntimeError(text)
+        return text == "True"
 
     def shutdown(self):
         msg = controller_messages[SHUTDOWN]
index 1a432ef..8a3dbed 100644 (file)
@@ -9,6 +9,8 @@ import socket
 import sys
 import subprocess
 import threading
+from time import strftime
+import traceback
 
 CTRL_SOCK = "ctrl.sock"
 STD_ERR = "stderr.log"
@@ -16,12 +18,16 @@ MAX_FD = 1024
 
 STOP_MSG = "STOP"
 
+ERROR_LEVEL = 0
+DEBUG_LEVEL = 1
+
 class Server(object):
     def __init__(self, root_dir = "."):
         self._root_dir = root_dir
         self._stop = False
         self._ctrl_sock = None
-        self._stderr = None 
+        self._stderr = None
+        self._log_level = ERROR_LEVEL
 
     def run(self):
         try:
@@ -38,16 +44,26 @@ class Server(object):
                 os._exit(0)
         except:
             self.log_error()
-            raise
+            self.cleanup()
+            os._exit(0)
 
     def daemonize(self):
+        # pipes for process synchronization
+        (r, w) = os.pipe()
+
         pid1 = os.fork()
         if pid1 > 0:
-            # we do os.waitpid to avoid leaving a <defunc> (zombie) process
-            os.waitpid(pid1, 0)
+            os.close(w)
+            os.read(r, 1)
+            os.close(r)
+            # os.waitpid avoids leaving a <defunc> (zombie) process
+            st = os.waitpid(pid1, 0)[1]
+            if st:
+                raise RuntimeError("Daemonization failed")
             # return 0 to inform the caller method that this is not the 
             # daemonized process
             return 0
+        os.close(r)
 
         # Decouple from parent environment.
         os.chdir(self._root_dir)
@@ -61,11 +77,12 @@ class Server(object):
             os._exit(0)
 
         # close all open file descriptors.
-        for fd in range(2, MAX_FD):
-            try:
-                os.close(fd)
-            except OSError:
-                pass
+        for fd in range(3, MAX_FD):
+            if fd != w:
+                try:
+                    os.close(fd)
+                except OSError:
+                    pass
 
         # Redirect standard file descriptors.
         self._stderr = stdout = file(STD_ERR, "a", 0)
@@ -73,6 +90,9 @@ class Server(object):
         os.dup2(stdin.fileno(), sys.stdin.fileno())
         os.dup2(stdout.fileno(), sys.stdout.fileno())
         os.dup2(self._stderr.fileno(), sys.stderr.fileno())
+        # let the parent process know that the daemonization is finished
+        os.write(w, "\n")
+        os.close(w)
         return 1
 
     def post_daemonize(self):
@@ -93,34 +113,37 @@ class Server(object):
                     
                 if msg == STOP_MSG:
                     self._stop = True
-                    try:
-                        reply = self.stop_action()
-                    except:
-                        self.log_error()
-                    self.send_reply(conn, reply)
-                    break
+                    reply = self.stop_action()
                 else:
-                    try:
-                        reply = self.reply_action(msg)
-                    except:
-                        self.log_error()
-                    self.send_reply(conn, reply)
+                    reply = self.reply_action(msg)
+                self.send_reply(conn, reply)
             conn.close()
 
     def recv_msg(self, conn):
-       data = conn.recv(1024)
-       decoded = base64.b64decode(data)
-       return decoded.rstrip()
+        data = ""
+        while True:
+            try:
+                chunk = conn.recv(1024)
+            except OSError, e:
+                if e.errno != errno.EINTR:
+                    raise
+                if chunk == '':
+                    continue
+            data += chunk
+            if chunk[-1] == "\n":
+                break
+        decoded = base64.b64decode(data)
+        return decoded.rstrip()
 
     def send_reply(self, conn, reply):
-       encoded = base64.b64encode(reply)
-       conn.send("%s\n" % encoded)
+        encoded = base64.b64encode(reply)
+        conn.send("%s\n" % encoded)
        
     def cleanup(self):
         try:
             self._ctrl_sock.close()
             os.remove(CTRL_SOCK)
-        except e:
+        except:
             self.log_error()
 
     def stop_action(self):
@@ -129,12 +152,23 @@ class Server(object):
     def reply_action(self, msg):
         return "Reply to: %s" % msg
 
-    def log_error(self, error = None):
-        if error == None:
-            import traceback
-            error = "%s\n" %  traceback.format_exc()
-        sys.stderr.write(error)
-        return error
+    def set_error_log_level(self):
+        self._log_level = ERROR_LEVEL
+
+    def set_debug_log_level(self):
+        self._log_level = DEBUG_LEVEL
+
+    def log_error(self, text = None):
+        if text == None:
+            text = traceback.format_exc()
+        date = strftime("%Y-%m-%d %H:%M:%S")
+        sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
+        return text
+
+    def log_debug(self, text):
+        if self._log_level == DEBUG_LEVEL:
+            date = strftime("%Y-%m-%d %H:%M:%S")
+            sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
 
 class Forwarder(object):
     def __init__(self, root_dir = "."):
@@ -173,7 +207,19 @@ class Forwarder(object):
             self._stop = True
 
     def recv_from_server(self):
-        return self._ctrl_sock.recv(1024)
+        data = ""
+        while True:
+            try:
+                chunk = self._ctrl_sock.recv(1024)
+            except OSError, e:
+                if e.errno != errno.EINTR:
+                    raise
+                if chunk == '':
+                    continue
+            data += chunk
+            if chunk[-1] == "\n":
+                break
+        return data
  
     def connect(self):
         self.disconnect()
@@ -195,8 +241,7 @@ class Client(object):
                         c.forward()" % root_dir
                 ],
                 stdin = subprocess.PIPE, 
-                stdout = subprocess.PIPE, 
-                env = os.environ)
+                stdout = subprocess.PIPE)
 
     def send_msg(self, msg):
         encoded = base64.b64encode(msg)
index 3febe57..069d5c5 100755 (executable)
@@ -83,6 +83,7 @@ class ExecuteTestCase(unittest.TestCase):
                 proxy.AccessConfiguration.MODE_DAEMON)
         access_config.set_attribute_value("rootDirectory", self._root_dir)
         controller = proxy.create_controller(xml, access_config)
+
         controller.start()
         while not controller.is_finished(app.guid):
             time.sleep(0.5)
@@ -123,6 +124,7 @@ class ExecuteTestCase(unittest.TestCase):
                 proxy.AccessConfiguration.MODE_DAEMON)
         access_config.set_attribute_value("rootDirectory", self._root_dir)
         controller.set_access_configuration(desc.guid, access_config)
+
         controller.start()
         while not controller.is_finished(app.guid):
             time.sleep(0.5)
@@ -161,14 +163,16 @@ class ExecuteTestCase(unittest.TestCase):
         access_config.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
         access_config.set_attribute_value("rootDirectory", self._root_dir)
-        controller = proxy.create_controller(xml, access_config = None)
-        access_config = proxy.AccessConfiguration()
-        access_config.set_attribute_value("mode", 
+        controller = proxy.create_controller(xml, access_config)
+
+        access_config2 = proxy.AccessConfiguration()
+        access_config2.set_attribute_value("mode", 
                 proxy.AccessConfiguration.MODE_DAEMON)
         inst_root_dir = os.path.join(self._root_dir, "instance")
         os.mkdir(inst_root_dir)
-        access_config.set_attribute_value("rootDirectory", inst_root_dir)
-        controller.set_access_configuration(desc.guid, access_config)
+        access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+        controller.set_access_configuration(desc.guid, access_config2)
+
         controller.start()
         while not controller.is_finished(app.guid):
             time.sleep(0.5)
index 884a57d..1e4d4c1 100755 (executable)
@@ -28,11 +28,11 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.create(4, "NodeInterface")
         instance.create_set(4, "up", True)
         instance.connect(2, "devs", 4, "node")
-        instance.add_adddress(4, AF_INET, "10.0.0.1", 24, None)
+        instance.add_address(4, AF_INET, "10.0.0.1", 24, None)
         instance.create(5, "NodeInterface")
         instance.create_set(5, "up", True)
         instance.connect(3, "devs", 5, "node")
-        instance.add_adddress(5, AF_INET, "10.0.0.2", 24, None)
+        instance.add_address(5, AF_INET, "10.0.0.2", 24, None)
         instance.create(6, "Switch")
         instance.create_set(6, "up", True)
         instance.connect(4, "switch", 6, "devs")
@@ -71,11 +71,11 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.create(4, "P2PNodeInterface")
         instance.create_set(4, "up", True)
         instance.connect(2, "devs", 4, "node")
-        instance.add_adddress(4, AF_INET, "10.0.0.1", 24, None)
+        instance.add_address(4, AF_INET, "10.0.0.1", 24, None)
         instance.create(5, "P2PNodeInterface")
         instance.create_set(5, "up", True)
         instance.connect(3, "devs", 5, "node")
-        instance.add_adddress(5, AF_INET, "10.0.0.2", 24, None)
+        instance.add_address(5, AF_INET, "10.0.0.2", 24, None)
         instance.connect(4, "p2p", 5, "p2p")
         instance.create(6, "Application")
         instance.create_set(6, "command", "ping -qc1 10.0.0.2")
@@ -112,19 +112,19 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.create(5, "NodeInterface")
         instance.create_set(5, "up", True)
         instance.connect(2, "devs", 5, "node")
-        instance.add_adddress(5, AF_INET, "10.0.0.1", 24, None)
+        instance.add_address(5, AF_INET, "10.0.0.1", 24, None)
         instance.create(6, "NodeInterface")
         instance.create_set(6, "up", True)
         instance.connect(3, "devs", 6, "node")
-        instance.add_adddress(6, AF_INET, "10.0.0.2", 24, None)
+        instance.add_address(6, AF_INET, "10.0.0.2", 24, None)
         instance.create(7, "NodeInterface")
         instance.create_set(7, "up", True)
         instance.connect(3, "devs", 7, "node")
-        instance.add_adddress(7, AF_INET, "10.0.1.1", 24, None)
+        instance.add_address(7, AF_INET, "10.0.1.1", 24, None)
         instance.create(8, "NodeInterface")
         instance.create_set(8, "up", True)
         instance.connect(4, "devs", 8, "node")
-        instance.add_adddress(8, AF_INET, "10.0.1.2", 24, None)
+        instance.add_address(8, AF_INET, "10.0.1.2", 24, None)
         instance.create(9, "Switch")
         instance.create_set(9, "up", True)
         instance.connect(5, "switch", 9, "devs")
index a0244d5..b7a0822 100755 (executable)
@@ -4,6 +4,7 @@
 import getpass
 from nepi.core.design import ExperimentDescription, FactoriesProvider
 from nepi.core.execute import ExperimentController
+from nepi.util import proxy
 import os
 import shutil
 import test_util
@@ -13,9 +14,9 @@ import uuid
 
 class NetnsIntegrationTestCase(unittest.TestCase):
     def setUp(self):
-        self._home_dir = os.path.join(os.getenv("HOME"), ".nepi", 
+        self._root_dir = os.path.join(os.getenv("HOME"), ".nepi", 
                 str(uuid.uuid1()))
-        os.makedirs(self._home_dir)
+        os.makedirs(self._root_dir)
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
     def test_local_if(self):
@@ -25,7 +26,7 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         user = getpass.getuser()
         netns_provider = FactoriesProvider(testbed_id, testbed_version)
         netns_desc = exp_desc.add_testbed_description(netns_provider)
-        netns_desc.set_attribute_value("homeDirectory", self._home_dir)
+        netns_desc.set_attribute_value("homeDirectory", self._root_dir)
         #netns_desc.set_attribute_value("enableDebug", True)
         node1 = netns_desc.create("Node")
         node2 = netns_desc.create("Node")
@@ -63,9 +64,73 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         self.assertTrue(ping_result.startswith(comp_result))
         controller.stop()
         controller.shutdown()
-        
+
+    @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+    def test_all_daemonized_if(self):
+        exp_desc = ExperimentDescription()
+        testbed_version = "01"
+        testbed_id = "netns"
+        user = getpass.getuser()
+        netns_provider = FactoriesProvider(testbed_id, testbed_version)
+        netns_desc = exp_desc.add_testbed_description(netns_provider)
+        netns_desc.set_attribute_value("homeDirectory", self._root_dir)
+        #netns_desc.set_attribute_value("enableDebug", True)
+        node1 = netns_desc.create("Node")
+        node2 = netns_desc.create("Node")
+        iface1 = netns_desc.create("NodeInterface")
+        iface1.set_attribute_value("up", True)
+        node1.connector("devs").connect(iface1.connector("node"))
+        ip1 = iface1.add_address()
+        ip1.set_attribute_value("Address", "10.0.0.1")
+        iface2 = netns_desc.create("NodeInterface")
+        iface2.set_attribute_value("up", True)
+        node2.connector("devs").connect(iface2.connector("node"))
+        ip2 = iface2.add_address()
+        ip2.set_attribute_value("Address", "10.0.0.2")
+        switch = netns_desc.create("Switch")
+        switch.set_attribute_value("up", True)
+        iface1.connector("switch").connect(switch.connector("devs"))
+        iface2.connector("switch").connect(switch.connector("devs"))
+        app = netns_desc.create("Application")
+        app.set_attribute_value("command", "ping -qc1 10.0.0.2")
+        app.set_attribute_value("user", user)
+        app.connector("node").connect(node1.connector("apps"))
+        app.enable_trace("stdout")
+        xml = exp_desc.to_xml()
+
+        access_config = proxy.AccessConfiguration()
+        access_config.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        access_config.set_attribute_value("rootDirectory", self._root_dir)
+        access_config.set_attribute_value("logLevel", 
+                proxy.AccessConfiguration.DEBUG_LEVEL)
+        controller = proxy.create_controller(xml, access_config)
+
+        access_config2 = proxy.AccessConfiguration()
+        access_config2.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        inst_root_dir = os.path.join(self._root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+        access_config2.set_attribute_value("logLevel", 
+                proxy.AccessConfiguration.DEBUG_LEVEL)
+        controller.set_access_configuration(netns_desc.guid, access_config2)
+
+        controller.start()
+        while not controller.is_finished(app.guid):
+            time.sleep(0.5)
+        ping_result = controller.trace(netns_desc.guid, app.guid, "stdout")
+        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+        self.assertTrue(ping_result.startswith(comp_result))
+        controller.stop()
+        controller.shutdown()
+
     def tearDown(self):
-        shutil.rmtree(self._home_dir)
+        shutil.rmtree(self._root_dir)
 
 if __name__ == '__main__':
     unittest.main()
index 4e0fbaf..3766520 100755 (executable)
@@ -25,6 +25,17 @@ class ServerTestCase(unittest.TestCase):
         reply = c.read_reply()
         self.assertTrue(reply == "Stopping server")
 
+    def test_server_long_message(self):
+        s = server.Server(self._root_dir)
+        s.run()
+        c = server.Client(self._root_dir)
+        c.send_msg("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+        reply = c.read_reply()
+        self.assertTrue(reply == "Reply to: 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+        c.send_stop()
+        reply = c.read_reply()
+        self.assertTrue(reply == "Stopping server")
+
     def tearDown(self):
         shutil.rmtree(self._root_dir)