BUILDDIR := $(BUILDDIR)/lib
endif
-#PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH)
-PYPATH = "../nepi2/src:../nepi2/test/lib:../netns/src"
+PYPATH = $(BUILDDIR):$(TESTLIB):$(PYTHONPATH)
COVERAGE = $(or $(shell which coverage), $(shell which python-coverage), \
coverage)
"nepi.testbeds",
"nepi.testbeds.netns",
"nepi.core",
+ "nepi.util.parser",
"nepi.util" ],
package_dir = {"": "src"}
)
def start(self):
self._create_testbed_instances()
- for instance in self._testbeds.values():
- instance.do_setup()
- for instance in self._testbeds.values():
- instance.do_create()
- instance.do_connect()
- instance.do_configure()
- for instances in self._testbeds.values():
- instance.do_cross_connect()
- for instances in self._testbeds.values():
- instance.start()
+ for testbed in self._testbeds.values():
+ testbed.do_setup()
+ for testbed in self._testbeds.values():
+ testbed.do_create()
+ testbed.do_connect()
+ testbed.do_configure()
+ for testbed in self._testbeds.values():
+ testbed.do_cross_connect()
+ for testbed in self._testbeds.values():
+ testbed.start()
def stop(self):
- for instance in self._testbeds.values():
- instance.stop()
+ for testbed in self._testbeds.values():
+ testbed.stop()
def is_finished(self, guid):
- for instance in self._testbeds.values():
- for guid_ in instance.guids:
+ for testbed in self._testbeds.values():
+ for guid_ in testbed.guids:
if guid_ == guid:
- return instance.status(guid) == STATUS_FINISHED
+ return testbed.status(guid) == STATUS_FINISHED
raise RuntimeError("No element exists with guid %d" % guid)
def shutdown(self):
- for instance in self._testbeds.values():
- instance.shutdown()
+ for testbed in self._testbeds.values():
+ testbed.shutdown()
def _create_testbed_instances(self):
parser = XmlExperimentParser()
(testbed_id, testbed_version) = data.get_testbed_data(guid)
access_config = None if guid not in self._access_config else\
self._access_config[guid]
- instance = proxy.create_testbed_instance(testbed_id,
+ testbed = proxy.create_testbed_instance(testbed_id,
testbed_version, access_config)
for (name, value) in data.get_attribute_data(guid):
- instance.configure(name, value)
- self._testbeds[guid] = instance
+ testbed.configure(name, value)
+ self._testbeds[guid] = testbed
else:
element_guids.append(guid)
self._program_testbed_instances(element_guids, data)
def _program_testbed_instances(self, element_guids, data):
for guid in element_guids:
(testbed_guid, factory_id) = data.get_box_data(guid)
- instance = self._testbeds[testbed_guid]
- instance.create(guid, factory_id)
+ testbed = self._testbeds[testbed_guid]
+ testbed.create(guid, factory_id)
for (name, value) in data.get_attribute_data(guid):
- instance.create_set(guid, name, value)
+ testbed.create_set(guid, name, value)
for guid in element_guids:
(testbed_guid, factory_id) = data.get_box_data(guid)
- instance = self._testbeds[testbed_guid]
+ testbed = self._testbeds[testbed_guid]
for (connector_type_name, other_guid, other_connector_type_name) \
in data.get_connection_data(guid):
(testbed_guid, factory_id) = data.get_box_data(guid)
(other_testbed_guid, other_factory_id) = data.get_box_data(
other_guid)
if testbed_guid == other_testbed_guid:
- instance.connect(guid, connector_type_name, other_guid,
+ testbed.connect(guid, connector_type_name, other_guid,
other_connector_type_name)
else:
- instance.cross_connect(guid, connector_type_name, other_guid,
+ testbed.cross_connect(guid, connector_type_name, other_guid,
other_testbed_id, other_factory_id, other_connector_type_name)
for trace_id in data.get_trace_data(guid):
- instance.add_trace(guid, trace_id)
+ testbed.add_trace(guid, trace_id)
for (autoconf, address, family, netprefix, broadcast) in \
data.get_address_data(guid):
if address != None:
- instance.add_adddress(guid, family, address, netprefix,
+ testbed.add_address(guid, family, address, netprefix,
broadcast)
for (family, destination, netprefix, nexthop) in \
data.get_route_data(guid):
- instance.add_route(guid, destination, netprefix, nexthop)
+ testbed.add_route(guid, destination, netprefix, nexthop)
self._add_trace[guid] = list()
self._add_trace[guid].append(trace_id)
- def add_adddress(self, guid, family, address, netprefix, broadcast):
+ def add_address(self, guid, family, address, netprefix, broadcast):
if not guid in self._create:
raise RuntimeError("Element guid %d doesn't exist" % guid)
factory_id = self._create[guid]
raise NotImplementedError
def trace(self, guid, trace_id):
- f = open(self.trace_filename(guid, trace_id), "r")
- content = f.read()
- f.close()
+ fd = open("%s" % self.trace_filename(guid, trace_id), "r")
+ content = fd.read()
+ fd.close()
return content
def shutdown(self):
getElementsByTagName("address")
for address_tag in address_tag_list:
if address_tag.nodeType == tag.ELEMENT_NODE:
- autoconf = bool(address_tag.getAttribute("AutoConfigure")) \
+ autoconf = address_tag.getAttribute("AutoConfigure") == "True" \
if address_tag.hasAttribute("AutoConfigure") else None
address = str(address_tag.getAttribute("Address")) \
if address_tag.hasAttribute("Address") else None
other_guid, other_connector_type_name)
def type_to_standard(self, value):
- if type(value) == str:
+ if isinstance(value, str):
return Attribute.STRING
- if type(value) == bool:
+ if isinstance(value, bool):
return Attribute.BOOL
- if type(value) == int:
+ if isinstance(value, int):
return Attribute.INTEGER
- if type(value) == float:
+ if isinstance(value, float):
return Attribute.DOUBLE
def type_from_standard(self, type, value):
if type == Attribute.STRING:
return str(value)
if type == Attribute.BOOL:
- return bool(value)
+ return value == "True"
if type == Attribute.INTEGER:
return int(value)
if type == Attribute.DOUBLE:
GUIDS: "%d" % GUIDS,
})
+instruction_text = dict({
+ OK: "OK",
+ ERROR: "ERROR",
+ XML: "XML",
+ ACCESS: "ACCESS",
+ TRACE: "TRACE",
+ FINISHED: "FINISHED",
+ START: "START",
+ STOP: "STOP",
+ SHUTDOWN: "SHUTDOWN",
+ CONFIGURE: "CONFIGURE",
+ CREATE: "CREATE",
+ CREATE_SET: "CREATE_SET",
+ FACTORY_SET: "FACTORY_SET",
+ CONNECT: "CONNECT",
+ CROSS_CONNECT: "CROSS_CONNECT",
+ ADD_TRACE: "ADD_TRACE",
+ ADD_ADDRESS: "ADD_ADDRESS",
+ ADD_ROUTE: "ADD_ROUTE",
+ DO_SETUP: "DO_SETUP",
+ DO_CREATE: "DO_CREATE",
+ DO_CONNECT: "DO_CONNECT",
+ DO_CONFIGURE: "DO_CONFIGURE",
+ DO_CROSS_CONNECT: "DO_CROSS_CONNECT",
+ GET: "GET",
+ SET: "SET",
+ ACTION: "ACTION",
+ STATUS: "STATUS",
+ GUIDS: "GUIDS",
+ STRING: "STRING",
+ INTEGER: "INTEGER",
+ BOOL: "BOOL",
+ FLOAT: "FLOAT"
+ })
+
def get_type(value):
if isinstance(value, bool):
return BOOL
elif type == FLOAT:
value = float(value)
elif type == BOOL:
- value = bool(value)
+ value = value == "True"
else:
value = str(value)
return value
+def log_msg(server, params):
+ instr = int(params[0])
+ instr_txt = instruction_text[instr]
+ server.log_debug("%s - msg: %s [%s]" % (server.__class__.__name__,
+ instr_txt, ", ".join(map(str, params[1:]))))
+
+def log_reply(server, reply):
+ res = reply.split("|")
+ code = int(res[0])
+ code_txt = instruction_text[code]
+ txt = base64.b64decode(res[1])
+ server.log_debug("%s - reply: %s %s" % (server.__class__.__name__,
+ code_txt, txt))
+
class AccessConfiguration(AttributesMap):
MODE_SINGLE_PROCESS = "SINGLE"
MODE_DAEMON = "DAEMON"
ACCESS_SSH = "SSH"
ACCESS_LOCAL = "LOCAL"
+ ERROR_LEVEL = "Error"
+ DEBUG_LEVEL = "Debug"
def __init__(self):
super(AccessConfiguration, self).__init__()
type = Attribute.BOOL,
value = False,
validation_function = validation.is_bool)
+ self.add_attribute(name = "logLevel",
+ help = "Log level for instance",
+ type = Attribute.ENUM,
+ value = AccessConfiguration.ERROR_LEVEL,
+ allowed = [AccessConfiguration.ERROR_LEVEL,
+ AccessConfiguration.DEBUG_LEVEL],
+ validation_function = validation.is_enum)
def create_controller(xml, access_config = None):
mode = None if not access_config else access_config.get_attribute_value("mode")
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
from nepi.core.execute import ExperimentController
return ExperimentController(xml)
- elif mode ==AccessConfiguration.MODE_DAEMON:
+ elif mode == AccessConfiguration.MODE_DAEMON:
root_dir = access_config.get_attribute_value("rootDirectory")
- return ExperimentControllerProxy(xml, root_dir)
+ log_level = access_config.get_attribute_value("logLevel")
+ return ExperimentControllerProxy(root_dir, log_level, experiment_xml = xml)
raise RuntimeError("Unsupported access configuration 'mode'" % mode)
def create_testbed_instance(testbed_id, testbed_version, access_config):
if not mode or mode == AccessConfiguration.MODE_SINGLE_PROCESS:
return _build_testbed_testbed(testbed_id, testbed_version)
elif mode == AccessConfiguration.MODE_DAEMON:
- root_dir = access_config.get_attribute_value("rootDirectory")
- return TestbedIntanceProxy(testbed_id, testbed_version, root_dir)
+ root_dir = access_config.get_attribute_value("rootDirectory")
+ log_level = access_config.get_attribute_value("logLevel")
+ return TestbedIntanceProxy(root_dir, log_level, testbed_id = testbed_id,
+ testbed_version = testbed_version)
raise RuntimeError("Unsupported access configuration 'mode'" % mode)
def _build_testbed_testbed(testbed_id, testbed_version):
return module.TestbedInstance(testbed_version)
class TestbedInstanceServer(server.Server):
- def __init__(self, testbed_id, testbed_version, root_dir):
+ def __init__(self, root_dir, testbed_id, testbed_version):
super(TestbedInstanceServer, self).__init__(root_dir)
self._testbed_id = testbed_id
self._testbed_version = testbed_version
def reply_action(self, msg):
params = msg.split("|")
instruction = int(params[0])
+ log_msg(self, params)
try:
if instruction == TRACE:
- return self.trace(params)
+ reply = self.trace(params)
elif instruction == START:
- return self.start(params)
+ reply = self.start(params)
elif instruction == STOP:
- return self.stop(params)
+ reply = self.stop(params)
elif instruction == SHUTDOWN:
- return self.shutdown(params)
+ reply = self.shutdown(params)
elif instruction == CONFIGURE:
- return self.configure(params)
+ reply = self.configure(params)
elif instruction == CREATE:
- return self.create(params)
+ reply = self.create(params)
elif instruction == CREATE_SET:
- return self.create_set(params)
+ reply = self.create_set(params)
elif instruction == FACTORY_SET:
- return self.factory_set(params)
+ reply = self.factory_set(params)
elif instruction == CONNECT:
- return self.connect(params)
+ reply = self.connect(params)
elif instruction == CROSS_CONNECT:
- return self.cross_connect(params)
+ reply = self.cross_connect(params)
elif instruction == ADD_TRACE:
- return self.add_trace(params)
+ reply = self.add_trace(params)
elif instruction == ADD_ADDRESS:
- return self.add_address(params)
+ reply = self.add_address(params)
elif instruction == ADD_ROUTE:
- return self.add_route(params)
+ reply = self.add_route(params)
elif instruction == DO_SETUP:
- return self.do_setup(params)
+ reply = self.do_setup(params)
elif instruction == DO_CREATE:
- return self.do_create(params)
+ reply = self.do_create(params)
elif instruction == DO_CONNECT:
- return self.do_connect(params)
+ reply = self.do_connect(params)
elif instruction == DO_CONFIGURE:
- return self.do_configure(params)
+ reply = self.do_configure(params)
elif instruction == DO_CROSS_CONNECT:
- return self.do_cross_connect(params)
+ reply = self.do_cross_connect(params)
elif instruction == GET:
- return self.get(params)
+ reply = self.get(params)
elif instruction == SET:
- return self.set(params)
+ reply = self.set(params)
elif instruction == ACTION:
- return self.action(params)
+ reply = self.action(params)
elif instruction == STATUS:
- return self.status(params)
+ reply = self.status(params)
elif instruction == GUIDS:
- return self.guids(params)
+ reply = self.guids(params)
else:
error = "Invalid instruction %s" % instruction
self.log_error(error)
result = base64.b64encode(error)
- return "%d|%s" % (ERROR, result)
+ reply = "%d|%s" % (ERROR, result)
except:
error = self.log_error()
result = base64.b64encode(error)
- return "%d|%s" % (ERROR, result)
+ reply = "%d|%s" % (ERROR, result)
+ log_reply(self, reply)
+ return reply
def guids(self, params):
guids = self._testbed.guids
return "%d|%s" % (OK, result)
class ExperimentControllerServer(server.Server):
- def __init__(self, experiment_xml, root_dir):
+ def __init__(self, root_dir, experiment_xml):
super(ExperimentControllerServer, self).__init__(root_dir)
self._experiment_xml = experiment_xml
self._controller = None
def reply_action(self, msg):
params = msg.split("|")
instruction = int(params[0])
+ log_msg(self, params)
try:
if instruction == XML:
- return self.experiment_xml(params)
+ reply = self.experiment_xml(params)
elif instruction == ACCESS:
- return self.set_access_configuration(params)
+ reply = self.set_access_configuration(params)
elif instruction == TRACE:
- return self.trace(params)
+ reply = self.trace(params)
elif instruction == FINISHED:
- return self.is_finished(params)
+ reply = self.is_finished(params)
elif instruction == START:
- return self.start(params)
+ reply = self.start(params)
elif instruction == STOP:
- return self.stop(params)
+ reply = self.stop(params)
elif instruction == SHUTDOWN:
- return self.shutdown(params)
+ reply = self.shutdown(params)
else:
error = "Invalid instruction %s" % instruction
self.log_error(error)
result = base64.b64encode(error)
- return "%d|%s" % (ERROR, result)
+ reply = "%d|%s" % (ERROR, result)
except:
error = self.log_error()
result = base64.b64encode(error)
- return "%d|%s" % (ERROR, result)
+ reply = "%d|%s" % (ERROR, result)
+ log_reply(self, reply)
+ return reply
def experiment_xml(self, params):
xml = self._controller.experiment_xml
user = params[5]
port = int(params[6])
root_dir = params[7]
- use_agent = bool(params[8])
+ use_agent = params[8] == "True"
access_config = AccessConfiguration()
access_config.set_attribute_value("mode", mode)
access_config.set_attribute_value("communication", communication)
trace_id = params[3]
trace = self._controller.trace(testbed_guid, guid, trace_id)
result = base64.b64encode(trace)
- return "%d|%s" % (OK, "%s" % result)
+ return "%d|%s" % (OK, result)
def is_finished(self, params):
guid = int(params[1])
- result = self._controller.is_finished(guid)
- return "%d|%s" % (OK, "%r" % result)
+ status = self._controller.is_finished(guid)
+ result = base64.b64encode(str(status))
+ return "%d|%s" % (OK, result)
def start(self, params):
self._controller.start()
return "%d|%s" % (OK, "")
class TestbedIntanceProxy(object):
- def __init__(self, testbed_id, testbed_version, root_dir):
- # launch daemon
- s = TestbedInstanceServer(testbed_id, testbed_version,
- root_dir)
- s.run()
+ def __init__(self, root_dir, log_level, testbed_id = None,
+ testbed_version = None, launch = True):
+ if launch:
+ if testbed_id == None or testbed_version == None:
+ raise RuntimeError("To launch a TesbedInstance server a \
+ testbed_id and testbed_version are required")
+ # launch daemon
+ s = TestbedInstanceServer(root_dir, testbed_id, testbed_version)
+ if log_level == AccessConfiguration.DEBUG_LEVEL:
+ s.set_debug_log_level()
+ s.run()
# create_client
self._client = server.Client(root_dir)
self._client.send_stop()
class ExperimentControllerProxy(object):
- def __init__(self, experiment_xml, root_dir):
- # launch daemon
- s = ExperimentControllerServer(experiment_xml, root_dir)
- s.run()
+ def __init__(self, root_dir, log_level, experiment_xml = None, launch = True):
+ if launch:
+ if experiment_xml == None:
+ raise RuntimeError("To launch a ExperimentControllerServer a \
+ xml description of the experiment is required")
+ # launch daemon
+ s = ExperimentControllerServer(root_dir, experiment_xml)
+ if log_level == AccessConfiguration.DEBUG_LEVEL:
+ s.set_debug_log_level()
+ s.run()
# create_client
self._client = server.Client(root_dir)
reply = self._client.read_reply()
result = reply.split("|")
code = int(result[0])
- text = base64.b64decode(result[1])
- if code == OK:
- return text
- raise RuntimeError(text)
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return text
def set_access_configuration(self, testbed_guid, access_config):
mode = access_config.get_attribute_value("mode")
reply = self._client.read_reply()
result = reply.split("|")
code = int(result[0])
- if code == OK:
- return bool(result[1])
- error = base64.b64decode(result[1])
- raise RuntimeError(error)
+ text = base64.b64decode(result[1])
+ if code == ERROR:
+ raise RuntimeError(text)
+ return text == "True"
def shutdown(self):
msg = controller_messages[SHUTDOWN]
import sys
import subprocess
import threading
+from time import strftime
+import traceback
CTRL_SOCK = "ctrl.sock"
STD_ERR = "stderr.log"
STOP_MSG = "STOP"
+ERROR_LEVEL = 0
+DEBUG_LEVEL = 1
+
class Server(object):
def __init__(self, root_dir = "."):
self._root_dir = root_dir
self._stop = False
self._ctrl_sock = None
- self._stderr = None
+ self._stderr = None
+ self._log_level = ERROR_LEVEL
def run(self):
try:
os._exit(0)
except:
self.log_error()
- raise
+ self.cleanup()
+ os._exit(0)
def daemonize(self):
+ # pipes for process synchronization
+ (r, w) = os.pipe()
+
pid1 = os.fork()
if pid1 > 0:
- # we do os.waitpid to avoid leaving a <defunc> (zombie) process
- os.waitpid(pid1, 0)
+ os.close(w)
+ os.read(r, 1)
+ os.close(r)
+ # os.waitpid avoids leaving a <defunc> (zombie) process
+ st = os.waitpid(pid1, 0)[1]
+ if st:
+ raise RuntimeError("Daemonization failed")
# return 0 to inform the caller method that this is not the
# daemonized process
return 0
+ os.close(r)
# Decouple from parent environment.
os.chdir(self._root_dir)
os._exit(0)
# close all open file descriptors.
- for fd in range(2, MAX_FD):
- try:
- os.close(fd)
- except OSError:
- pass
+ for fd in range(3, MAX_FD):
+ if fd != w:
+ try:
+ os.close(fd)
+ except OSError:
+ pass
# Redirect standard file descriptors.
self._stderr = stdout = file(STD_ERR, "a", 0)
os.dup2(stdin.fileno(), sys.stdin.fileno())
os.dup2(stdout.fileno(), sys.stdout.fileno())
os.dup2(self._stderr.fileno(), sys.stderr.fileno())
+ # let the parent process know that the daemonization is finished
+ os.write(w, "\n")
+ os.close(w)
return 1
def post_daemonize(self):
if msg == STOP_MSG:
self._stop = True
- try:
- reply = self.stop_action()
- except:
- self.log_error()
- self.send_reply(conn, reply)
- break
+ reply = self.stop_action()
else:
- try:
- reply = self.reply_action(msg)
- except:
- self.log_error()
- self.send_reply(conn, reply)
+ reply = self.reply_action(msg)
+ self.send_reply(conn, reply)
conn.close()
def recv_msg(self, conn):
- data = conn.recv(1024)
- decoded = base64.b64decode(data)
- return decoded.rstrip()
+ data = ""
+ while True:
+ try:
+ chunk = conn.recv(1024)
+ except OSError, e:
+ if e.errno != errno.EINTR:
+ raise
+ if chunk == '':
+ continue
+ data += chunk
+ if chunk[-1] == "\n":
+ break
+ decoded = base64.b64decode(data)
+ return decoded.rstrip()
def send_reply(self, conn, reply):
- encoded = base64.b64encode(reply)
- conn.send("%s\n" % encoded)
+ encoded = base64.b64encode(reply)
+ conn.send("%s\n" % encoded)
def cleanup(self):
try:
self._ctrl_sock.close()
os.remove(CTRL_SOCK)
- except e:
+ except:
self.log_error()
def stop_action(self):
def reply_action(self, msg):
return "Reply to: %s" % msg
- def log_error(self, error = None):
- if error == None:
- import traceback
- error = "%s\n" % traceback.format_exc()
- sys.stderr.write(error)
- return error
+ def set_error_log_level(self):
+ self._log_level = ERROR_LEVEL
+
+ def set_debug_log_level(self):
+ self._log_level = DEBUG_LEVEL
+
+ def log_error(self, text = None):
+ if text == None:
+ text = traceback.format_exc()
+ date = strftime("%Y-%m-%d %H:%M:%S")
+ sys.stderr.write("ERROR: %s\n%s\n" % (date, text))
+ return text
+
+ def log_debug(self, text):
+ if self._log_level == DEBUG_LEVEL:
+ date = strftime("%Y-%m-%d %H:%M:%S")
+ sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
class Forwarder(object):
def __init__(self, root_dir = "."):
self._stop = True
def recv_from_server(self):
- return self._ctrl_sock.recv(1024)
+ data = ""
+ while True:
+ try:
+ chunk = self._ctrl_sock.recv(1024)
+ except OSError, e:
+ if e.errno != errno.EINTR:
+ raise
+ if chunk == '':
+ continue
+ data += chunk
+ if chunk[-1] == "\n":
+ break
+ return data
def connect(self):
self.disconnect()
c.forward()" % root_dir
],
stdin = subprocess.PIPE,
- stdout = subprocess.PIPE,
- env = os.environ)
+ stdout = subprocess.PIPE)
def send_msg(self, msg):
encoded = base64.b64encode(msg)
proxy.AccessConfiguration.MODE_DAEMON)
access_config.set_attribute_value("rootDirectory", self._root_dir)
controller = proxy.create_controller(xml, access_config)
+
controller.start()
while not controller.is_finished(app.guid):
time.sleep(0.5)
proxy.AccessConfiguration.MODE_DAEMON)
access_config.set_attribute_value("rootDirectory", self._root_dir)
controller.set_access_configuration(desc.guid, access_config)
+
controller.start()
while not controller.is_finished(app.guid):
time.sleep(0.5)
access_config.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
access_config.set_attribute_value("rootDirectory", self._root_dir)
- controller = proxy.create_controller(xml, access_config = None)
- access_config = proxy.AccessConfiguration()
- access_config.set_attribute_value("mode",
+ controller = proxy.create_controller(xml, access_config)
+
+ access_config2 = proxy.AccessConfiguration()
+ access_config2.set_attribute_value("mode",
proxy.AccessConfiguration.MODE_DAEMON)
inst_root_dir = os.path.join(self._root_dir, "instance")
os.mkdir(inst_root_dir)
- access_config.set_attribute_value("rootDirectory", inst_root_dir)
- controller.set_access_configuration(desc.guid, access_config)
+ access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+ controller.set_access_configuration(desc.guid, access_config2)
+
controller.start()
while not controller.is_finished(app.guid):
time.sleep(0.5)
instance.create(4, "NodeInterface")
instance.create_set(4, "up", True)
instance.connect(2, "devs", 4, "node")
- instance.add_adddress(4, AF_INET, "10.0.0.1", 24, None)
+ instance.add_address(4, AF_INET, "10.0.0.1", 24, None)
instance.create(5, "NodeInterface")
instance.create_set(5, "up", True)
instance.connect(3, "devs", 5, "node")
- instance.add_adddress(5, AF_INET, "10.0.0.2", 24, None)
+ instance.add_address(5, AF_INET, "10.0.0.2", 24, None)
instance.create(6, "Switch")
instance.create_set(6, "up", True)
instance.connect(4, "switch", 6, "devs")
instance.create(4, "P2PNodeInterface")
instance.create_set(4, "up", True)
instance.connect(2, "devs", 4, "node")
- instance.add_adddress(4, AF_INET, "10.0.0.1", 24, None)
+ instance.add_address(4, AF_INET, "10.0.0.1", 24, None)
instance.create(5, "P2PNodeInterface")
instance.create_set(5, "up", True)
instance.connect(3, "devs", 5, "node")
- instance.add_adddress(5, AF_INET, "10.0.0.2", 24, None)
+ instance.add_address(5, AF_INET, "10.0.0.2", 24, None)
instance.connect(4, "p2p", 5, "p2p")
instance.create(6, "Application")
instance.create_set(6, "command", "ping -qc1 10.0.0.2")
instance.create(5, "NodeInterface")
instance.create_set(5, "up", True)
instance.connect(2, "devs", 5, "node")
- instance.add_adddress(5, AF_INET, "10.0.0.1", 24, None)
+ instance.add_address(5, AF_INET, "10.0.0.1", 24, None)
instance.create(6, "NodeInterface")
instance.create_set(6, "up", True)
instance.connect(3, "devs", 6, "node")
- instance.add_adddress(6, AF_INET, "10.0.0.2", 24, None)
+ instance.add_address(6, AF_INET, "10.0.0.2", 24, None)
instance.create(7, "NodeInterface")
instance.create_set(7, "up", True)
instance.connect(3, "devs", 7, "node")
- instance.add_adddress(7, AF_INET, "10.0.1.1", 24, None)
+ instance.add_address(7, AF_INET, "10.0.1.1", 24, None)
instance.create(8, "NodeInterface")
instance.create_set(8, "up", True)
instance.connect(4, "devs", 8, "node")
- instance.add_adddress(8, AF_INET, "10.0.1.2", 24, None)
+ instance.add_address(8, AF_INET, "10.0.1.2", 24, None)
instance.create(9, "Switch")
instance.create_set(9, "up", True)
instance.connect(5, "switch", 9, "devs")
import getpass
from nepi.core.design import ExperimentDescription, FactoriesProvider
from nepi.core.execute import ExperimentController
+from nepi.util import proxy
import os
import shutil
import test_util
class NetnsIntegrationTestCase(unittest.TestCase):
def setUp(self):
- self._home_dir = os.path.join(os.getenv("HOME"), ".nepi",
+ self._root_dir = os.path.join(os.getenv("HOME"), ".nepi",
str(uuid.uuid1()))
- os.makedirs(self._home_dir)
+ os.makedirs(self._root_dir)
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_local_if(self):
user = getpass.getuser()
netns_provider = FactoriesProvider(testbed_id, testbed_version)
netns_desc = exp_desc.add_testbed_description(netns_provider)
- netns_desc.set_attribute_value("homeDirectory", self._home_dir)
+ netns_desc.set_attribute_value("homeDirectory", self._root_dir)
#netns_desc.set_attribute_value("enableDebug", True)
node1 = netns_desc.create("Node")
node2 = netns_desc.create("Node")
self.assertTrue(ping_result.startswith(comp_result))
controller.stop()
controller.shutdown()
-
+
+ @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+ def test_all_daemonized_if(self):
+ exp_desc = ExperimentDescription()
+ testbed_version = "01"
+ testbed_id = "netns"
+ user = getpass.getuser()
+ netns_provider = FactoriesProvider(testbed_id, testbed_version)
+ netns_desc = exp_desc.add_testbed_description(netns_provider)
+ netns_desc.set_attribute_value("homeDirectory", self._root_dir)
+ #netns_desc.set_attribute_value("enableDebug", True)
+ node1 = netns_desc.create("Node")
+ node2 = netns_desc.create("Node")
+ iface1 = netns_desc.create("NodeInterface")
+ iface1.set_attribute_value("up", True)
+ node1.connector("devs").connect(iface1.connector("node"))
+ ip1 = iface1.add_address()
+ ip1.set_attribute_value("Address", "10.0.0.1")
+ iface2 = netns_desc.create("NodeInterface")
+ iface2.set_attribute_value("up", True)
+ node2.connector("devs").connect(iface2.connector("node"))
+ ip2 = iface2.add_address()
+ ip2.set_attribute_value("Address", "10.0.0.2")
+ switch = netns_desc.create("Switch")
+ switch.set_attribute_value("up", True)
+ iface1.connector("switch").connect(switch.connector("devs"))
+ iface2.connector("switch").connect(switch.connector("devs"))
+ app = netns_desc.create("Application")
+ app.set_attribute_value("command", "ping -qc1 10.0.0.2")
+ app.set_attribute_value("user", user)
+ app.connector("node").connect(node1.connector("apps"))
+ app.enable_trace("stdout")
+ xml = exp_desc.to_xml()
+
+ access_config = proxy.AccessConfiguration()
+ access_config.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ access_config.set_attribute_value("rootDirectory", self._root_dir)
+ access_config.set_attribute_value("logLevel",
+ proxy.AccessConfiguration.DEBUG_LEVEL)
+ controller = proxy.create_controller(xml, access_config)
+
+ access_config2 = proxy.AccessConfiguration()
+ access_config2.set_attribute_value("mode",
+ proxy.AccessConfiguration.MODE_DAEMON)
+ inst_root_dir = os.path.join(self._root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ access_config2.set_attribute_value("rootDirectory", inst_root_dir)
+ access_config2.set_attribute_value("logLevel",
+ proxy.AccessConfiguration.DEBUG_LEVEL)
+ controller.set_access_configuration(netns_desc.guid, access_config2)
+
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ ping_result = controller.trace(netns_desc.guid, app.guid, "stdout")
+ comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+
+--- 10.0.0.2 ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time 0ms
+"""
+ self.assertTrue(ping_result.startswith(comp_result))
+ controller.stop()
+ controller.shutdown()
+
def tearDown(self):
- shutil.rmtree(self._home_dir)
+ shutil.rmtree(self._root_dir)
if __name__ == '__main__':
unittest.main()
reply = c.read_reply()
self.assertTrue(reply == "Stopping server")
+ def test_server_long_message(self):
+ s = server.Server(self._root_dir)
+ s.run()
+ c = server.Client(self._root_dir)
+ c.send_msg("1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+ reply = c.read_reply()
+ self.assertTrue(reply == "Reply to: 1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+ c.send_stop()
+ reply = c.read_reply()
+ self.assertTrue(reply == "Stopping server")
+
def tearDown(self):
shutil.rmtree(self._root_dir)