From 657ddfbe6bc3bafb2a03ea8d8bfdd3abfe67fdf3 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Mon, 14 Mar 2011 11:23:35 +0100 Subject: [PATCH] tests with daemonized controller + remote daeminized instance working --- src/nepi/core/testbed_impl.py | 4 +- src/nepi/util/constants.py | 1 + src/nepi/util/proxy.py | 120 ++++++++++++++++++++-------------- test/core/integration.py | 48 +++++++++++++- 4 files changed, 119 insertions(+), 54 deletions(-) diff --git a/src/nepi/core/testbed_impl.py b/src/nepi/core/testbed_impl.py index 5f60c19b..1472cca5 100644 --- a/src/nepi/core/testbed_impl.py +++ b/src/nepi/core/testbed_impl.py @@ -4,9 +4,7 @@ from nepi.core import execute from nepi.core.metadata import Metadata from nepi.util import validation -from nepi.util.constants import AF_INET, AF_INET6, STATUS_UNDETERMINED - -TIME_NOW = "0s" +from nepi.util.constants import AF_INET, AF_INET6, STATUS_UNDETERMINED, TIME_NOW class TestbedInstance(execute.TestbedInstance): def __init__(self, testbed_id, testbed_version): diff --git a/src/nepi/util/constants.py b/src/nepi/util/constants.py index 413a6776..8998d615 100644 --- a/src/nepi/util/constants.py +++ b/src/nepi/util/constants.py @@ -9,4 +9,5 @@ STATUS_RUNNING = 1 STATUS_FINISHED = 2 STATUS_UNDETERMINED = 3 +TIME_NOW = "0s" diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index afcee74b..e8ca4652 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -3,8 +3,8 @@ import base64 from nepi.core.attributes import AttributesMap, Attribute -from nepi.util import server -from nepi.util import validation +from nepi.util import server, validation +from nepi.util.constants import TIME_NOW import sys # PROTOCOL REPLIES @@ -37,6 +37,7 @@ GET = 23 SET = 24 ACTION = 25 STATUS = 26 +GUIDS = 27 # PARAMETER TYPE STRING = 100 @@ -79,6 +80,7 @@ testbed_messages = dict({ SET: "%d|%s" % (SET, "%s|%d|%s|%s|%d"), ACTION: "%d|%s" % (ACTION, "%s|%d|%s"), STATUS: "%d|%s" % (STATUS, "%d"), + GUIDS: "%d" % GUIDS, }) def get_type(value): @@ -125,12 +127,12 @@ class AccessConfiguration(AttributesMap): AccessConfiguration.ACCESS_SSH], validation_function = validation.is_enum) self.add_attribute(name = "host", - help = "Host where the instance will be executed", + help = "Host where the testbed will be executed", type = Attribute.STRING, value = "localhost", validation_function = validation.is_string) self.add_attribute(name = "user", - help = "User on the Host to execute the instance", + help = "User on the Host to execute the testbed", type = Attribute.STRING, validation_function = validation.is_string) self.add_attribute(name = "port", @@ -162,13 +164,13 @@ def create_controller(xml, access_config = None): def create_testbed_instance(testbed_id, testbed_version, access_config): mode = None if not access_config else access_config.get_attribute_value("mode") if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS: - return _build_testbed_instance(testbed_id, testbed_version) + return _build_testbed_testbed(testbed_id, testbed_version) elif mode == AccessConfiguration.MODE_DAEMON: root_dir = access_config.get_attribute_value("rootDirectory") return TestbedIntanceProxy(testbed_id, testbed_version, root_dir) raise RuntimeError("Unsupported access configuration 'mode'" % mode) -def _build_testbed_instance(testbed_id, testbed_version): +def _build_testbed_testbed(testbed_id, testbed_version): mod_name = "nepi.testbeds.%s" % (testbed_id.lower()) if not mod_name in sys.modules: __import__(mod_name) @@ -180,10 +182,10 @@ class TestbedInstanceServer(server.Server): super(TestbedInstanceServer, self).__init__(root_dir) self._testbed_id = testbed_id self._testbed_version = testbed_version - self._instance = None + self._testbed = None def post_daemonize(self): - self._instance = _build_testbed_instance(self._testbed_id, + self._testbed = _build_testbed_testbed(self._testbed_id, self._testbed_version) def reply_action(self, msg): @@ -234,6 +236,8 @@ class TestbedInstanceServer(server.Server): return self.action(params) elif instruction == STATUS: return self.status(params) + elif instruction == GUIDS: + return self.guids(params) else: error = "Invalid instruction %s" % instruction self.log_error(error) @@ -244,31 +248,37 @@ class TestbedInstanceServer(server.Server): result = base64.b64encode(error) return "%d|%s" % (ERROR, result) + def guids(self, params): + guids = self._testbed.guids + guids = ",".join(map(str, guids)) + result = base64.b64encode(guids) + return "%d|%s" % (OK, result) + def create(self, params): guid = int(params[1]) factory_id = params[2] - self._instance.create(guid, factory_id) + self._testbed.create(guid, factory_id) return "%d|%s" % (OK, "") def trace(self, params): guid = int(params[1]) trace_id = params[2] - trace = self._instance.trace(guid, trace_id) + trace = self._testbed.trace(guid, trace_id) result = base64.b64encode(trace) return "%d|%s" % (OK, result) def start(self, params): time = params[1] - self._instance.start(time) + self._testbed.start(time) return "%d|%s" % (OK, "") def stop(self, params): time = params[1] - self._instance.stop(time) + self._testbed.stop(time) return "%d|%s" % (OK, "") def shutdown(self, params): - self._instance.shutdown() + self._testbed.shutdown() return "%d|%s" % (OK, "") def configure(self, params): @@ -276,21 +286,16 @@ class TestbedInstanceServer(server.Server): value = base64.b64decode(params[2]) type = int(params[3]) value = set_type(type, value) - self._instance.configure(name, value) - return "%d|%s" % (OK, "") - - def trace(self, params): - guid = int(params[1]) - factory_id = params[2] - self._instance.trace(guid, factory_id) + self._testbed.configure(name, value) return "%d|%s" % (OK, "") def create_set(self, params): - name = base64.b64decode(params[1]) - value = base64.b64decode(params[2]) - type = int(params[3]) + guid = int(params[1]) + name = base64.b64decode(params[2]) + value = base64.b64decode(params[3]) + type = int(params[4]) value = set_type(type, value) - self._instance.create_set(name, value) + self._testbed.create_set(guid, name, value) return "%d|%s" % (OK, "") def factory_set(self, params): @@ -298,7 +303,7 @@ class TestbedInstanceServer(server.Server): value = base64.b64decode(params[2]) type = int(params[3]) value = set_type(type, value) - self._instance.factory_set(name, value) + self._testbed.factory_set(name, value) return "%d|%s" % (OK, "") def connect(self, params): @@ -306,7 +311,7 @@ class TestbedInstanceServer(server.Server): connector_type_name1 = params[2] guid2 = int(params[3]) connector_type_name2 = params[4] - self._instance.connect(guid1, connector_type_name1, guid2, + self._testbed.connect(guid1, connector_type_name1, guid2, connector_type_name2) return "%d|%s" % (OK, "") @@ -319,14 +324,14 @@ class TestbedInstanceServer(server.Server): cross_testbed_id = params[6] cross_factory_id = params[7] cross_connector_type_name = params[8] - self._instance.cross_connect(guid, connector_type_name, cross_guid, + self._testbed.cross_connect(guid, connector_type_name, cross_guid, cross_testbed_id, cross_factory_id, cross_connector_type_name) return "%d|%s" % (OK, "") def add_trace(self, params): guid = int(params[1]) trace_id = params[2] - self._instance.add_trace(guid, trace_id) + self._testbed.add_trace(guid, trace_id) return "%d|%s" % (OK, "") def add_address(self, params): @@ -335,7 +340,7 @@ class TestbedInstanceServer(server.Server): address = params[3] netprefix = int(params[4]) broadcast = params[5] - self._instance.add_address(guid, family, address, netprefix, + self._testbed.add_address(guid, family, address, netprefix, broadcast) return "%d|%s" % (OK, "") @@ -344,34 +349,34 @@ class TestbedInstanceServer(server.Server): destination = params[2] netprefix = int(params[3]) nexthop = params[4] - self._instance.add_route(guid, destination, netprefix, nexthop) + self._testbed.add_route(guid, destination, netprefix, nexthop) return "%d|%s" % (OK, "") def do_setup(self, params): - self._instance.do_setup() + self._testbed.do_setup() return "%d|%s" % (OK, "") def do_create(self, params): - self._instance.do_create() + self._testbed.do_create() return "%d|%s" % (OK, "") def do_connect(self, params): - self._instance.do_connect() + self._testbed.do_connect() return "%d|%s" % (OK, "") def do_configure(self, params): - self._instance.do_configure() + self._testbed.do_configure() return "%d|%s" % (OK, "") def do_cross_connect(self, params): - self._instance.do_cross_connect() + self._testbed.do_cross_connect() return "%d|%s" % (OK, "") def get(self, params): time = params[1] guid = int(param[2] ) name = base64.b64decode(params[3]) - value = self._instance.get(time, guid, name) + value = self._testbed.get(time, guid, name) result = base64.b64encode(str(value)) return "%d|%s" % (OK, result) @@ -382,20 +387,21 @@ class TestbedInstanceServer(server.Server): value = base64.b64decode(params[4]) type = int(params[3]) value = set_type(type, value) - self._instance.set(time, guid, name, value) + self._testbed.set(time, guid, name, value) return "%d|%s" % (OK, "") def action(self, params): time = params[1] guid = int(params[2]) command = base64.b64decode(params[3]) - self._instance.action(time, guid, command) + self._testbed.action(time, guid, command) return "%d|%s" % (OK, "") def status(self, params): guid = int(params[1]) - status = self._instance.status(guid) - return "%d|%s" % (OK, "%d" % status) + status = self._testbed.status(guid) + result = base64.b64encode(str(status)) + return "%d|%s" % (OK, result) class ExperimentControllerServer(server.Server): def __init__(self, experiment_xml, root_dir): @@ -435,12 +441,12 @@ class ExperimentControllerServer(server.Server): result = base64.b64encode(error) return "%d|%s" % (ERROR, result) - def experiment_xml(self, parameters): + def experiment_xml(self, params): xml = self._controller.experiment_xml result = base64.b64encode(xml) return "%d|%s" % (OK, result) - def set_access_configuration(self, parameters): + def set_access_configuration(self, params): testbed_guid = int(params[1]) mode = params[2] communication = params[3] @@ -461,7 +467,7 @@ class ExperimentControllerServer(server.Server): access_config) return "%d|%s" % (OK, "") - def trace(self, parameters): + def trace(self, params): testbed_guid = int(params[1]) guid = int(params[2]) trace_id = params[3] @@ -469,20 +475,20 @@ class ExperimentControllerServer(server.Server): result = base64.b64encode(trace) return "%d|%s" % (OK, "%s" % result) - def is_finished(self, parameters): + def is_finished(self, params): guid = int(params[1]) result = self._controller.is_finished(guid) return "%d|%s" % (OK, "%r" % result) - def start(self, parameters): + def start(self, params): self._controller.start() return "%d|%s" % (OK, "") - def stop(self, parameters): + def stop(self, params): self._controller.stop() return "%d|%s" % (OK, "") - def shutdown(self, parameters): + def shutdown(self, params): self._controller.shutdown() return "%d|%s" % (OK, "") @@ -495,6 +501,18 @@ class TestbedIntanceProxy(object): # create_client self._client = server.Client(root_dir) + @property + def guids(self): + msg = testbed_messages[GUIDS] + self._client.send_msg(msg) + reply = self._client.read_reply() + result = reply.split("|") + code = int(result[0]) + text = base64.b64decode(result[1]) + if code == ERROR: + raise RuntimeError(text) + return map(int, text.split(",")) + def configure(self, name, value): msg = testbed_messages[CONFIGURE] type = get_type(value) @@ -633,6 +651,7 @@ class TestbedIntanceProxy(object): def do_connect(self): msg = testbed_messages[DO_CONNECT] self._client.send_msg(msg) + reply = self._client.read_reply() result = reply.split("|") code = int(result[0]) text = base64.b64decode(result[1]) @@ -659,7 +678,7 @@ class TestbedIntanceProxy(object): if code == ERROR: raise RuntimeError(text) - def start(self, time): + def start(self, time = TIME_NOW): msg = testbed_messages[START] msg = msg % (time) self._client.send_msg(msg) @@ -670,7 +689,7 @@ class TestbedIntanceProxy(object): if code == ERROR: raise RuntimeError(text) - def stop(self, time): + def stop(self, time = TIME_NOW): msg = testbed_messages[STOP] msg = msg % (time) self._client.send_msg(msg) @@ -731,7 +750,7 @@ class TestbedIntanceProxy(object): text = base64.b64decode(result[1]) if code == ERROR: raise RuntimeError(text) - return bool(text) + return int(text) def trace(self, guid, trace_id): msg = testbed_messages[TRACE] @@ -754,6 +773,7 @@ class TestbedIntanceProxy(object): text = base64.b64decode(result[1]) if code == ERROR: raise RuntimeError(text) + self._client.send_stop() class ExperimentControllerProxy(object): def __init__(self, experiment_xml, root_dir): diff --git a/test/core/integration.py b/test/core/integration.py index 8c52af37..3febe57e 100755 --- a/test/core/integration.py +++ b/test/core/integration.py @@ -96,7 +96,7 @@ class ExecuteTestCase(unittest.TestCase): controller.stop() controller.shutdown() - def test_daemonized_controller_integration(self): + def test_daemonized_testbed_integration(self): exp_desc = ExperimentDescription() testbed_version = "01" testbed_id = "mock" @@ -129,6 +129,52 @@ class ExecuteTestCase(unittest.TestCase): fake_result = controller.trace(desc.guid, app.guid, "fake") comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. +--- 10.0.0.2 ping statistics --- +1 packets transmitted, 1 received, 0% packet loss, time 0ms +""" + self.assertTrue(fake_result.startswith(comp_result)) + controller.stop() + controller.shutdown() + + def test_daemonized_all_integration(self): + exp_desc = ExperimentDescription() + testbed_version = "01" + testbed_id = "mock" + provider = FactoriesProvider(testbed_id, testbed_version) + desc = exp_desc.add_testbed_description(provider) + desc.set_attribute_value("fake", True) + node1 = desc.create("Node") + node2 = desc.create("Node") + iface1 = desc.create("Interface") + iface1.set_attribute_value("fake", True) + node1.connector("devs").connect(iface1.connector("node")) + iface2 = desc.create("Interface") + iface2.set_attribute_value("fake", True) + node2.connector("devs").connect(iface2.connector("node")) + iface1.connector("iface").connect(iface2.connector("iface")) + app = desc.create("Application") + app.connector("node").connect(node1.connector("apps")) + app.enable_trace("fake") + + xml = exp_desc.to_xml() + access_config = proxy.AccessConfiguration() + access_config.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + access_config.set_attribute_value("rootDirectory", self._root_dir) + controller = proxy.create_controller(xml, access_config = None) + access_config = proxy.AccessConfiguration() + access_config.set_attribute_value("mode", + proxy.AccessConfiguration.MODE_DAEMON) + inst_root_dir = os.path.join(self._root_dir, "instance") + os.mkdir(inst_root_dir) + access_config.set_attribute_value("rootDirectory", inst_root_dir) + controller.set_access_configuration(desc.guid, access_config) + controller.start() + while not controller.is_finished(app.guid): + time.sleep(0.5) + fake_result = controller.trace(desc.guid, app.guid, "fake") + comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. + --- 10.0.0.2 ping statistics --- 1 packets transmitted, 1 received, 0% packet loss, time 0ms """ -- 2.43.0