self._box_attributes.add_attribute(name, help, type, value, range,
allowed, flags, validation_function)
-class TestbedInstance(object):
+class TestbedController(object):
def __init__(self, testbed_id, testbed_version):
self._testbed_id = testbed_id
self._testbed_version = testbed_version
from nepi.util import validation
from nepi.util.constants import AF_INET, AF_INET6, STATUS_UNDETERMINED, TIME_NOW
-class TestbedInstance(execute.TestbedInstance):
+class TestbedController(execute.TestbedController):
def __init__(self, testbed_id, testbed_version):
- super(TestbedInstance, self).__init__(testbed_id, testbed_version)
+ super(TestbedController, self).__init__(testbed_id, testbed_version)
self._started = False
# testbed attributes for validation
self._attributes = None
def trace_filename(self, guid, trace_id):
"""
- Return a trace's file path, for TestbedInstance's default
+ Return a trace's file path, for TestbedController's default
implementation of trace()
"""
raise NotImplementedError
# -*- coding: utf-8 -*-
from constants import TESTBED_ID
-from execute import TestbedInstance
+from execute import TestbedController
from nepi.core import testbed_impl
import os
-class TestbedInstance(testbed_impl.TestbedInstance):
+class TestbedController(testbed_impl.TestbedController):
def __init__(self, testbed_version):
- super(TestbedInstance, self).__init__(TESTBED_ID, testbed_version)
+ super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
self._netns = None
self._home_directory = None
self._traces = dict()
self._netns = self._load_netns_module()
def set(self, time, guid, name, value):
- super(TestbedInstance, self).set(time, guid, name, value)
+ super(TestbedController, self).set(time, guid, name, value)
# TODO: take on account schedule time for the task
element = self._elements.get(guid)
# -*- coding: utf-8 -*-
from constants import TESTBED_ID
-from execute import TestbedInstance
+from execute import TestbedController
import sys
import threading
-class TestbedInstance(testbed_impl.TestbedInstance):
+class TestbedController(testbed_impl.TestbedController):
def __init__(self, testbed_version):
- super(TestbedInstance, self).__init__(TESTBED_ID, testbed_version)
+ super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
self._ns3 = None
self._home_directory = None
self._traces = dict()
self._ns3 = self._load_ns3_module()
def start(self):
- super(TestbedInstance, self).start()
+ super(TestbedController, self).start()
self._condition = threading.Condition()
self._simulator_thread = threading.Thread(target = self._simulator_run,
args = [self._condition])
self._simulator_thread.start()
def set(self, time, guid, name, value):
- super(TestbedInstance, self).set(time, guid, name, value)
+ super(TestbedController, self).set(time, guid, name, value)
# TODO: take on account schedule time for the task
element = self._elements[guid]
ns3_value = self._to_ns3_value(guid, name, value)
# -*- coding: utf-8 -*-
from constants import TESTBED_ID
-from execute import TestbedInstance
+from execute import TestbedController
from nepi.core import testbed_impl
import os
-class TestbedInstance(testbed_impl.TestbedInstance):
+class TestbedController(testbed_impl.TestbedController):
def __init__(self, testbed_version):
- super(TestbedInstance, self).__init__(TESTBED_ID, testbed_version)
+ super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
self._home_directory = None
self._traces = dict()
get_attribute_value("homeDirectory")
def set(self, time, guid, name, value):
- super(TestbedInstance, self).set(time, guid, name, value)
+ super(TestbedController, self).set(time, guid, name, value)
# TODO: take on account schedule time for the task
element = self._elements[guid]
if element:
elif mode == AccessConfiguration.MODE_DAEMON:
(root_dir, log_level, user, host, port, agent) = \
get_access_config_params(access_config)
- return TestbedInstanceProxy(root_dir, log_level, testbed_id = testbed_id,
+ return TestbedControllerProxy(root_dir, log_level, testbed_id = testbed_id,
testbed_version = testbed_version, host = host, port = port,
user = user, agent = agent, launch = launch)
raise RuntimeError("Unsupported access configuration '%s'" % mode)
if not mod_name in sys.modules:
__import__(mod_name)
module = sys.modules[mod_name]
- return module.TestbedInstance(testbed_version)
+ return module.TestbedController(testbed_version)
-class TestbedInstanceServer(server.Server):
+class TestbedControllerServer(server.Server):
def __init__(self, root_dir, log_level, testbed_id, testbed_version):
- super(TestbedInstanceServer, self).__init__(root_dir, log_level)
+ super(TestbedControllerServer, self).__init__(root_dir, log_level)
self._testbed_id = testbed_id
self._testbed_version = testbed_version
self._testbed = None
self._controller.shutdown()
return "%d|%s" % (OK, "")
-class TestbedInstanceProxy(object):
+class TestbedControllerProxy(object):
def __init__(self, root_dir, log_level, testbed_id = None,
testbed_version = None, launch = True, host = None,
port = None, user = None, agent = None):
if host != None:
python_code = "from nepi.util.proxy import \
TesbedInstanceServer;\
- s = TestbedInstanceServer('%s', %d, '%s', '%s');\
+ s = TestbedControllerServer('%s', %d, '%s', '%s');\
s.run()" % (root_dir, log_level, testbed_id,
testbed_version)
proc = server.popen_ssh_subprocess(python_code, host = host,
err)
else:
# launch daemon
- s = TestbedInstanceServer(root_dir, log_level, testbed_id,
+ s = TestbedControllerServer(root_dir, log_level, testbed_id,
testbed_version)
s.run()
def test_execute(self):
testbed_version = "01"
testbed_id = "mock"
- instance = mock.TestbedInstance(testbed_version)
+ instance = mock.TestbedController(testbed_version)
instance.defer_configure("fake", True)
instance.defer_create(2, "Node")
instance.defer_create(3, "Node")
# -*- coding: utf-8 -*-
from constants import TESTBED_ID
-from execute import TestbedInstance
+from execute import TestbedController
from constants import TESTBED_ID
from nepi.core import testbed_impl
-class TestbedInstance(testbed_impl.TestbedInstance):
+class TestbedController(testbed_impl.TestbedController):
def __init__(self, testbed_version):
- super(TestbedInstance, self).__init__(TESTBED_ID, testbed_version)
+ super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
def do_setup(self):
pass
pass
def set(self, time, guid, name, value):
- super(TestbedInstance, self).set(time, guid, name, value)
+ super(TestbedController, self).set(time, guid, name, value)
def get(self, time, guid, name):
try:
def test_run_ping_if(self):
user = getpass.getuser()
testbed_version = "01"
- instance = netns.TestbedInstance(testbed_version)
+ instance = netns.TestbedController(testbed_version)
instance.defer_configure("homeDirectory", self.root_dir)
instance.defer_create(2, "Node")
instance.defer_create(3, "Node")
def test_run_ping_p2pif(self):
user = getpass.getuser()
testbed_version = "01"
- instance = netns.TestbedInstance(testbed_version)
+ instance = netns.TestbedController(testbed_version)
instance.defer_configure("homeDirectory", self.root_dir)
instance.defer_create(2, "Node")
instance.defer_create(3, "Node")
def test_run_ping_routing(self):
user = getpass.getuser()
testbed_version = "01"
- instance = netns.TestbedInstance(testbed_version)
+ instance = netns.TestbedController(testbed_version)
instance.defer_configure("homeDirectory", self.root_dir)
instance.defer_create(2, "Node")
instance.defer_create(3, "Node")
"Test requires working ns-3 bindings")
def test_run_ping_if(self):
testbed_version = "3_9_RC3"
- instance = ns3.TestbedInstance(testbed_version)
+ instance = ns3.TestbedController(testbed_version)
instance.defer_configure("homeDirectory", self.root_dir)
instance.defer_create(2, "ns3::Node")
instance.defer_create(3, "ns3::Ipv4L3Protocol")
"Test requires working ns-3 bindings")
def test_run_ping_routing(self):
testbed_version = "3_9_RC3"
- instance = ns3.TestbedInstance(testbed_version)
+ instance = ns3.TestbedController(testbed_version)
instance.defer_configure("homeDirectory", self.root_dir)
instance.defer_create(2, "ns3::Node")
instance.defer_create(3, "ns3::Ipv4L3Protocol")