tests with daemonized controller + remote daeminized instance working
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 14 Mar 2011 10:23:35 +0000 (11:23 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 14 Mar 2011 10:23:35 +0000 (11:23 +0100)
src/nepi/core/testbed_impl.py
src/nepi/util/constants.py
src/nepi/util/proxy.py
test/core/integration.py

index 5f60c19..1472cca 100644 (file)
@@ -4,9 +4,7 @@
 from nepi.core import execute
 from nepi.core.metadata import Metadata
 from nepi.util import validation
-from nepi.util.constants import AF_INET, AF_INET6, STATUS_UNDETERMINED
-
-TIME_NOW = "0s"
+from nepi.util.constants import AF_INET, AF_INET6, STATUS_UNDETERMINED, TIME_NOW
 
 class TestbedInstance(execute.TestbedInstance):
     def __init__(self, testbed_id, testbed_version):
index 413a677..8998d61 100644 (file)
@@ -9,4 +9,5 @@ STATUS_RUNNING = 1
 STATUS_FINISHED = 2
 STATUS_UNDETERMINED = 3
 
+TIME_NOW = "0s"
 
index afcee74..e8ca465 100644 (file)
@@ -3,8 +3,8 @@
 
 import base64
 from nepi.core.attributes import AttributesMap, Attribute
-from nepi.util import server
-from nepi.util import validation
+from nepi.util import server, validation
+from nepi.util.constants import TIME_NOW
 import sys
 
 # PROTOCOL REPLIES
@@ -37,6 +37,7 @@ GET = 23
 SET = 24
 ACTION  = 25
 STATUS  = 26
+GUIDS  = 27
 
 # PARAMETER TYPE
 STRING  =  100
@@ -79,6 +80,7 @@ testbed_messages = dict({
     SET:    "%d|%s" % (SET, "%s|%d|%s|%s|%d"),
     ACTION: "%d|%s" % (ACTION, "%s|%d|%s"),
     STATUS: "%d|%s" % (STATUS, "%d"),
+    GUIDS:  "%d" % GUIDS,
     })
 
 def get_type(value):
@@ -125,12 +127,12 @@ class AccessConfiguration(AttributesMap):
                     AccessConfiguration.ACCESS_SSH],
                 validation_function = validation.is_enum)
         self.add_attribute(name = "host",
-                help = "Host where the instance will be executed",
+                help = "Host where the testbed will be executed",
                 type = Attribute.STRING,
                 value = "localhost",
                 validation_function = validation.is_string)
         self.add_attribute(name = "user",
-                help = "User on the Host to execute the instance",
+                help = "User on the Host to execute the testbed",
                 type = Attribute.STRING,
                 validation_function = validation.is_string)
         self.add_attribute(name = "port",
@@ -162,13 +164,13 @@ def create_controller(xml, access_config = None):
 def create_testbed_instance(testbed_id, testbed_version, access_config):
     mode = None if not access_config else access_config.get_attribute_value("mode")
     if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
-        return  _build_testbed_instance(testbed_id, testbed_version)
+        return  _build_testbed_testbed(testbed_id, testbed_version)
     elif mode == AccessConfiguration.MODE_DAEMON:
                 root_dir = access_config.get_attribute_value("rootDirectory")
                 return TestbedIntanceProxy(testbed_id, testbed_version, root_dir)
     raise RuntimeError("Unsupported access configuration 'mode'" % mode)
 
-def _build_testbed_instance(testbed_id, testbed_version):
+def _build_testbed_testbed(testbed_id, testbed_version):
     mod_name = "nepi.testbeds.%s" % (testbed_id.lower())
     if not mod_name in sys.modules:
         __import__(mod_name)
@@ -180,10 +182,10 @@ class TestbedInstanceServer(server.Server):
         super(TestbedInstanceServer, self).__init__(root_dir)
         self._testbed_id = testbed_id
         self._testbed_version = testbed_version
-        self._instance = None
+        self._testbed = None
 
     def post_daemonize(self):
-        self._instance = _build_testbed_instance(self._testbed_id, 
+        self._testbed = _build_testbed_testbed(self._testbed_id, 
                 self._testbed_version)
 
     def reply_action(self, msg):
@@ -234,6 +236,8 @@ class TestbedInstanceServer(server.Server):
                 return self.action(params)
             elif instruction == STATUS:
                 return self.status(params)
+            elif instruction == GUIDS:
+                return self.guids(params)
             else:
                 error = "Invalid instruction %s" % instruction
                 self.log_error(error)
@@ -244,31 +248,37 @@ class TestbedInstanceServer(server.Server):
             result = base64.b64encode(error)
             return "%d|%s" % (ERROR, result)
 
+    def guids(self, params):
+        guids = self._testbed.guids
+        guids = ",".join(map(str, guids))
+        result = base64.b64encode(guids)
+        return "%d|%s" % (OK, result)
+
     def create(self, params):
         guid = int(params[1])
         factory_id = params[2]
-        self._instance.create(guid, factory_id)
+        self._testbed.create(guid, factory_id)
         return "%d|%s" % (OK, "")
 
     def trace(self, params):
         guid = int(params[1])
         trace_id = params[2]
-        trace = self._instance.trace(guid, trace_id)
+        trace = self._testbed.trace(guid, trace_id)
         result = base64.b64encode(trace)
         return "%d|%s" % (OK, result)
 
     def start(self, params):
         time = params[1]
-        self._instance.start(time)
+        self._testbed.start(time)
         return "%d|%s" % (OK, "")
 
     def stop(self, params):
         time = params[1]
-        self._instance.stop(time)
+        self._testbed.stop(time)
         return "%d|%s" % (OK, "")
 
     def shutdown(self, params):
-        self._instance.shutdown()
+        self._testbed.shutdown()
         return "%d|%s" % (OK, "")
 
     def configure(self, params):
@@ -276,21 +286,16 @@ class TestbedInstanceServer(server.Server):
         value = base64.b64decode(params[2])
         type = int(params[3])
         value = set_type(type, value)
-        self._instance.configure(name, value)
-        return "%d|%s" % (OK, "")
-
-    def trace(self, params):
-        guid = int(params[1])
-        factory_id = params[2]
-        self._instance.trace(guid, factory_id)
+        self._testbed.configure(name, value)
         return "%d|%s" % (OK, "")
 
     def create_set(self, params):
-        name = base64.b64decode(params[1])
-        value = base64.b64decode(params[2])
-        type = int(params[3])
+        guid = int(params[1])
+        name = base64.b64decode(params[2])
+        value = base64.b64decode(params[3])
+        type = int(params[4])
         value = set_type(type, value)
-        self._instance.create_set(name, value)
+        self._testbed.create_set(guid, name, value)
         return "%d|%s" % (OK, "")
 
     def factory_set(self, params):
@@ -298,7 +303,7 @@ class TestbedInstanceServer(server.Server):
         value = base64.b64decode(params[2])
         type = int(params[3])
         value = set_type(type, value)
-        self._instance.factory_set(name, value)
+        self._testbed.factory_set(name, value)
         return "%d|%s" % (OK, "")
 
     def connect(self, params):
@@ -306,7 +311,7 @@ class TestbedInstanceServer(server.Server):
         connector_type_name1 = params[2]
         guid2 = int(params[3])
         connector_type_name2 = params[4]
-        self._instance.connect(guid1, connector_type_name1, guid2, 
+        self._testbed.connect(guid1, connector_type_name1, guid2, 
             connector_type_name2)
         return "%d|%s" % (OK, "")
 
@@ -319,14 +324,14 @@ class TestbedInstanceServer(server.Server):
         cross_testbed_id = params[6]
         cross_factory_id = params[7]
         cross_connector_type_name = params[8]
-        self._instance.cross_connect(guid, connector_type_name, cross_guid, 
+        self._testbed.cross_connect(guid, connector_type_name, cross_guid, 
             cross_testbed_id, cross_factory_id, cross_connector_type_name)
         return "%d|%s" % (OK, "")
 
     def add_trace(self, params):
         guid = int(params[1])
         trace_id = params[2]
-        self._instance.add_trace(guid, trace_id)
+        self._testbed.add_trace(guid, trace_id)
         return "%d|%s" % (OK, "")
 
     def add_address(self, params):
@@ -335,7 +340,7 @@ class TestbedInstanceServer(server.Server):
         address = params[3]
         netprefix = int(params[4])
         broadcast = params[5]
-        self._instance.add_address(guid, family, address, netprefix,
+        self._testbed.add_address(guid, family, address, netprefix,
                 broadcast)
         return "%d|%s" % (OK, "")
 
@@ -344,34 +349,34 @@ class TestbedInstanceServer(server.Server):
         destination = params[2]
         netprefix = int(params[3])
         nexthop = params[4]
-        self._instance.add_route(guid, destination, netprefix, nexthop)
+        self._testbed.add_route(guid, destination, netprefix, nexthop)
         return "%d|%s" % (OK, "")
 
     def do_setup(self, params):
-        self._instance.do_setup()
+        self._testbed.do_setup()
         return "%d|%s" % (OK, "")
 
     def do_create(self, params):
-        self._instance.do_create()
+        self._testbed.do_create()
         return "%d|%s" % (OK, "")
 
     def do_connect(self, params):
-        self._instance.do_connect()
+        self._testbed.do_connect()
         return "%d|%s" % (OK, "")
 
     def do_configure(self, params):
-        self._instance.do_configure()
+        self._testbed.do_configure()
         return "%d|%s" % (OK, "")
 
     def do_cross_connect(self, params):
-        self._instance.do_cross_connect()
+        self._testbed.do_cross_connect()
         return "%d|%s" % (OK, "")
 
     def get(self, params):
         time = params[1]
         guid = int(param[2] )
         name = base64.b64decode(params[3])
-        value = self._instance.get(time, guid, name)
+        value = self._testbed.get(time, guid, name)
         result = base64.b64encode(str(value))
         return "%d|%s" % (OK, result)
 
@@ -382,20 +387,21 @@ class TestbedInstanceServer(server.Server):
         value = base64.b64decode(params[4])
         type = int(params[3])
         value = set_type(type, value)
-        self._instance.set(time, guid, name, value)
+        self._testbed.set(time, guid, name, value)
         return "%d|%s" % (OK, "")
 
     def action(self, params):
         time = params[1]
         guid = int(params[2])
         command = base64.b64decode(params[3])
-        self._instance.action(time, guid, command)
+        self._testbed.action(time, guid, command)
         return "%d|%s" % (OK, "")
 
     def status(self, params):
         guid = int(params[1])
-        status = self._instance.status(guid)
-        return "%d|%s" % (OK, "%d" % status)
+        status = self._testbed.status(guid)
+        result = base64.b64encode(str(status))
+        return "%d|%s" % (OK, result)
  
 class ExperimentControllerServer(server.Server):
     def __init__(self, experiment_xml, root_dir):
@@ -435,12 +441,12 @@ class ExperimentControllerServer(server.Server):
             result = base64.b64encode(error)
             return "%d|%s" % (ERROR, result)
 
-    def experiment_xml(self, parameters):
+    def experiment_xml(self, params):
         xml = self._controller.experiment_xml
         result = base64.b64encode(xml)
         return "%d|%s" % (OK, result)
 
-    def set_access_configuration(self, parameters):
+    def set_access_configuration(self, params):
         testbed_guid = int(params[1])
         mode = params[2]
         communication = params[3]
@@ -461,7 +467,7 @@ class ExperimentControllerServer(server.Server):
                 access_config)
         return "%d|%s" % (OK, "")
 
-    def trace(self, parameters):
+    def trace(self, params):
         testbed_guid = int(params[1])
         guid = int(params[2])
         trace_id = params[3]
@@ -469,20 +475,20 @@ class ExperimentControllerServer(server.Server):
         result = base64.b64encode(trace)
         return "%d|%s" % (OK, "%s" % result)
 
-    def is_finished(self, parameters):
+    def is_finished(self, params):
         guid = int(params[1])
         result = self._controller.is_finished(guid)
         return "%d|%s" % (OK, "%r" % result)
 
-    def start(self, parameters):
+    def start(self, params):
         self._controller.start()
         return "%d|%s" % (OK, "")
 
-    def stop(self, parameters):
+    def stop(self, params):
         self._controller.stop()
         return "%d|%s" % (OK, "")
 
-    def shutdown(self, parameters):
+    def shutdown(self, params):
         self._controller.shutdown()
         return "%d|%s" % (OK, "")
 
@@ -495,6 +501,18 @@ class TestbedIntanceProxy(object):
         # create_client
         self._client = server.Client(root_dir)
 
+    @property
+    def guids(self):
+        msg = testbed_messages[GUIDS]
+        self._client.send_msg(msg)
+        reply = self._client.read_reply()
+        result = reply.split("|")
+        code = int(result[0])
+        text = base64.b64decode(result[1])
+        if code == ERROR:
+            raise RuntimeError(text)
+        return map(int, text.split(","))
+
     def configure(self, name, value):
         msg = testbed_messages[CONFIGURE]
         type = get_type(value)
@@ -633,6 +651,7 @@ class TestbedIntanceProxy(object):
     def do_connect(self):
         msg = testbed_messages[DO_CONNECT]
         self._client.send_msg(msg)
+        reply = self._client.read_reply()
         result = reply.split("|")
         code = int(result[0])
         text = base64.b64decode(result[1])
@@ -659,7 +678,7 @@ class TestbedIntanceProxy(object):
         if code == ERROR:
             raise RuntimeError(text)
 
-    def start(self, time):
+    def start(self, time = TIME_NOW):
         msg = testbed_messages[START]
         msg = msg % (time)
         self._client.send_msg(msg)
@@ -670,7 +689,7 @@ class TestbedIntanceProxy(object):
         if code == ERROR:
             raise RuntimeError(text)
 
-    def stop(self, time):
+    def stop(self, time = TIME_NOW):
         msg = testbed_messages[STOP]
         msg = msg % (time)
         self._client.send_msg(msg)
@@ -731,7 +750,7 @@ class TestbedIntanceProxy(object):
         text = base64.b64decode(result[1])
         if code == ERROR:
             raise RuntimeError(text)
-        return bool(text)
+        return int(text)
 
     def trace(self, guid, trace_id):
         msg = testbed_messages[TRACE]
@@ -754,6 +773,7 @@ class TestbedIntanceProxy(object):
         text = base64.b64decode(result[1])
         if code == ERROR:
             raise RuntimeError(text)
+        self._client.send_stop()
 
 class ExperimentControllerProxy(object):
     def __init__(self, experiment_xml, root_dir):
index 8c52af3..3febe57 100755 (executable)
@@ -96,7 +96,7 @@ class ExecuteTestCase(unittest.TestCase):
         controller.stop()
         controller.shutdown()
 
-    def test_daemonized_controller_integration(self):
+    def test_daemonized_testbed_integration(self):
         exp_desc = ExperimentDescription()
         testbed_version = "01"
         testbed_id = "mock"
@@ -129,6 +129,52 @@ class ExecuteTestCase(unittest.TestCase):
         fake_result = controller.trace(desc.guid, app.guid, "fake")
         comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
 
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+        self.assertTrue(fake_result.startswith(comp_result))
+        controller.stop()
+        controller.shutdown()
+
+    def test_daemonized_all_integration(self):
+        exp_desc = ExperimentDescription()
+        testbed_version = "01"
+        testbed_id = "mock"
+        provider = FactoriesProvider(testbed_id, testbed_version)
+        desc = exp_desc.add_testbed_description(provider)
+        desc.set_attribute_value("fake", True)
+        node1 = desc.create("Node")
+        node2 = desc.create("Node")
+        iface1 = desc.create("Interface")
+        iface1.set_attribute_value("fake", True)
+        node1.connector("devs").connect(iface1.connector("node"))
+        iface2 = desc.create("Interface")
+        iface2.set_attribute_value("fake", True)
+        node2.connector("devs").connect(iface2.connector("node"))
+        iface1.connector("iface").connect(iface2.connector("iface"))
+        app = desc.create("Application")
+        app.connector("node").connect(node1.connector("apps"))
+        app.enable_trace("fake")
+
+        xml = exp_desc.to_xml()
+        access_config = proxy.AccessConfiguration()
+        access_config.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        access_config.set_attribute_value("rootDirectory", self._root_dir)
+        controller = proxy.create_controller(xml, access_config = None)
+        access_config = proxy.AccessConfiguration()
+        access_config.set_attribute_value("mode", 
+                proxy.AccessConfiguration.MODE_DAEMON)
+        inst_root_dir = os.path.join(self._root_dir, "instance")
+        os.mkdir(inst_root_dir)
+        access_config.set_attribute_value("rootDirectory", inst_root_dir)
+        controller.set_access_configuration(desc.guid, access_config)
+        controller.start()
+        while not controller.is_finished(app.guid):
+            time.sleep(0.5)
+        fake_result = controller.trace(desc.guid, app.guid, "fake")
+        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
 --- 10.0.0.2 ping statistics ---
 1 packets transmitted, 1 received, 0% packet loss, time 0ms
 """