From fdbe8428159db491e9498449443be4d093cd9b8a Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Mon, 13 Jan 2014 13:02:56 +0100 Subject: [PATCH] NS3Wrapper server and client --- src/nepi/resources/ns3/linuxns3client.py | 93 +++++ src/nepi/resources/ns3/ns3client.py | 56 +++ src/nepi/resources/ns3/ns3server.py | 183 ++++++++++ src/nepi/resources/ns3/ns3wrapper.py | 101 +++--- src/nepi/resources/ns3/ns3wrapper_server.py | 364 -------------------- test/resources/ns3/linuxns3client.py | 185 ++++++++++ test/resources/ns3/ns3wrapper.py | 230 ++++++++++++- 7 files changed, 788 insertions(+), 424 deletions(-) create mode 100644 src/nepi/resources/ns3/linuxns3client.py create mode 100644 src/nepi/resources/ns3/ns3client.py create mode 100644 src/nepi/resources/ns3/ns3server.py delete mode 100644 src/nepi/resources/ns3/ns3wrapper_server.py create mode 100644 test/resources/ns3/linuxns3client.py diff --git a/src/nepi/resources/ns3/linuxns3client.py b/src/nepi/resources/ns3/linuxns3client.py new file mode 100644 index 00000000..972e8005 --- /dev/null +++ b/src/nepi/resources/ns3/linuxns3client.py @@ -0,0 +1,93 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2014 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +import base64 +import cPickle +import errno +import socket +from optparse import OptionParser, SUPPRESS_HELP + +from nepi.resources.ns3.ns3client import NS3Client +from nepi.resources.ns3.ns3server import NS3WrapperMessage + +class LinuxNS3Client(NS3Client): + def __init__(self, socket_name): + super(LinuxNS3Client, self).__init__() + self._socket_name = socket_name + + @property + def socket_name(self): + return self._socket_name + + def send_msg(self, msg, *args): + args = list(args) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(self.socket_name) + + args.insert(0, msg) + def encode(arg): + arg = cPickle.dumps(arg) + return base64.b64encode(arg) + + encoded = "|".join(map(encode, args)) + sock.send("%s\n" % encoded) + + reply = sock.recv(1024) + return cPickle.loads(base64.b64decode(reply)) + + def create(self, clazzname, *args): + args = list(args) + args.insert(0, clazzname) + + return self.send_msg(NS3WrapperMessage.CREATE, *args) + + def invoke(self, uuid, operation, *args): + args = list(args) + args.insert(0, operation) + args.insert(0, uuid) + + return self.send_msg(NS3WrapperMessage.INVOKE, *args) + + def set(self, uuid, name, value): + args = [uuid, name, value] + + return self.send_msg(NS3WrapperMessage.SET, *args) + + def get(self, uuid, name): + args = [uuid, name] + + return self.send_msg(NS3WrapperMessage.GET, *args) + + def trace(self, *args): + return self.send_msg(NS3WrapperMessage.TRACE, *args) + + def start(self): + return self.send_msg(NS3WrapperMessage.START, []) + + def stop(self, time = None): + args = None + if time: + args = [time] + + return self.send_msg(NS3WrapperMessage.STOP, *args) + + def shutdown(self): + return self.send_msg(NS3WrapperMessage.SHUTDOWN, []) + diff --git a/src/nepi/resources/ns3/ns3client.py b/src/nepi/resources/ns3/ns3client.py new file mode 100644 index 00000000..b2257803 --- /dev/null +++ b/src/nepi/resources/ns3/ns3client.py @@ -0,0 +1,56 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2014 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +import base64 +import cPickle +import errno +import socket +from optparse import OptionParser, SUPPRESS_HELP + +from ns3server import NS3WrapperMessage + +class NS3Client(object): + """ Common Interface for NS3 client classes """ + def __init__(self): + super(NS3Client, self).__init__() + + def create(self, clazzname, *args): + pass + + def invoke(self, uuid, operation, *args): + pass + + def set(self, uuid, name, value): + pass + + def get(self, uuid, name): + pass + + def trace(self, *args): + pass + + def start(self): + pass + + def stop(self, time = None): + pass + + def shutdown(self): + pass + diff --git a/src/nepi/resources/ns3/ns3server.py b/src/nepi/resources/ns3/ns3server.py new file mode 100644 index 00000000..f538259a --- /dev/null +++ b/src/nepi/resources/ns3/ns3server.py @@ -0,0 +1,183 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2014 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +import base64 +import cPickle +import errno +import socket +from optparse import OptionParser, SUPPRESS_HELP + +from ns3wrapper import NS3Wrapper + +class NS3WrapperMessage: + CREATE = "CREATE" + INVOKE = "INVOKE" + SET = "SET" + GET = "GET" + TRACE = "TRACE" + START = "START" + STOP = "STOP" + SHUTDOWN = "SHUTDOWN" + +def handle_message(ns3_wrapper, msg, args): + if msg == NS3WrapperMessage.SHUTDOWN: + ns3_wrapper.shutdown() + return "BYEBYE" + + if msg == NS3WrapperMessage.STOP: + time = None + if args: + time = args[0] + + ns3_wrapper.stop(time=time) + return "STOPPED" + + if msg == NS3WrapperMessage.START: + ns3_wrapper.start() + return "STARTED" + + if msg == NS3WrapperMessage.CREATE: + clazzname = args.pop(0) + + uuid = ns3_wrapper.create(clazzname, *args) + return uuid + + if msg == NS3WrapperMessage.INVOKE: + uuid = args.pop(0) + operation = args.pop(0) + + uuid = ns3_wrapper.invoke(uuid, operation, *args) + return uuid + + if msg == NS3WrapperMessage.GET: + uuid = args.pop(0) + name = args.pop(0) + + value = ns3_wrapper.get(uuid, name) + return value + + if msg == NS3WrapperMessage.SET: + uuid = args.pop(0) + name = args.pop(0) + value = args.pop(0) + + value = ns3_wrapper.set(uuid, name, value) + return value + + if msg == NS3WrapperMessage.TRACE: + return "NOT IMPLEMENTED" + +def create_socket(socket_name): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(socket_name) + return sock + +def recv_msg(conn): + msg = [] + chunk = '' + + while '\n' not in chunk: + try: + chunk = conn.recv(1024) + except (OSError, socket.error), e: + if e[0] != errno.EINTR: + raise + # Ignore eintr errors + continue + + if chunk: + msg.append(chunk) + else: + # empty chunk = EOF + break + + msg = ''.join(msg).split('\n')[0] + + # The message might have arguments that will be appended + # as a '|' separated list after the message identifier + def decode(arg): + arg = base64.b64decode(arg).rstrip() + return cPickle.loads(arg) + + dargs = map(decode, msg.split("|")) + + # decoded message + dmsg = dargs.pop(0) + + return (dmsg, dargs) + +def send_reply(conn, reply): + encoded = base64.b64encode(cPickle.dumps(reply)) + conn.send("%s\n" % encoded) + +def get_options(): + usage = ("usage: %prog -S ") + + parser = OptionParser(usage = usage) + + parser.add_option("-S", "--socket-name", dest="socket_name", + help = "Name for the unix socket used to interact with this process", + default = "tap.sock", type="str") + + (options, args) = parser.parse_args() + + return options.socket_name + +def run_server(socket_name): + ns3_wrapper = NS3Wrapper() + + # create unix socket to receive instructions + sock = create_socket(socket_name) + sock.listen(0) + + # wait for messages to arrive and process them + stop = False + + while not stop: + conn, addr = sock.accept() + conn.settimeout(5) + + try: + (msg, args) = recv_msg(conn) + except socket.timeout, e: + # Ingore time-out + continue + + if not msg: + # Ignore - connection lost + break + + ns3_wrapper.logger.debug("Message received %s args %s" % ( msg, str(args))) + + if msg == NS3WrapperMessage.SHUTDOWN: + stop = True + + reply = handle_message(ns3_wrapper, msg, args) + + try: + send_reply(conn, reply) + except socket.error: + break + +if __name__ == '__main__': + + socket_name = get_options() + + run_server(socket_name) + diff --git a/src/nepi/resources/ns3/ns3wrapper.py b/src/nepi/resources/ns3/ns3wrapper.py index 00175a80..b8fa5bc2 100644 --- a/src/nepi/resources/ns3/ns3wrapper.py +++ b/src/nepi/resources/ns3/ns3wrapper.py @@ -21,8 +21,15 @@ import logging import os import sys import threading +import time import uuid +# TODO: +# 1. ns-3 classes should be identified as ns3::clazzname? +# + +SINGLETON = "singleton::" + class NS3Wrapper(object): def __init__(self, homedir = None): super(NS3Wrapper, self).__init__() @@ -31,7 +38,6 @@ class NS3Wrapper(object): self._condition = None self._started = False - self._stopped = False # holds reference to all C++ objects and variables in the simulation self._objects = dict() @@ -42,11 +48,8 @@ class NS3Wrapper(object): # exposed through the Python bindings self._tids = dict() - # Generate unique identifier for the simulation wrapper - self._uuid = self.make_uuid() - # create home dir (where all simulation related files will end up) - self._homedir = homedir or os.path.join("/tmp", self.uuid) + self._homedir = homedir or os.path.join("/", "tmp", "ns3_wrapper" ) home = os.path.normpath(self.homedir) if not os.path.exists(home): @@ -54,7 +57,7 @@ class NS3Wrapper(object): # Logging loglevel = os.environ.get("NS3LOGLEVEL", "debug") - self._logger = logging.getLogger("ns3wrapper.%s" % self.uuid) + self._logger = logging.getLogger("ns3wrapper") self._logger.setLevel(getattr(logging, loglevel.upper())) hdlr = logging.FileHandler(os.path.join(self.homedir, "ns3wrapper.log")) @@ -69,9 +72,6 @@ class NS3Wrapper(object): # Load ns-3 shared libraries and import modules self._load_ns3_module() - # Add module as anoter object, so we can reference it later - self._objects[self.uuid] = self.ns3 - @property def ns3(self): return self._ns3 @@ -80,51 +80,34 @@ class NS3Wrapper(object): def homedir(self): return self._homedir - @property - def uuid(self): - return self._uuid - @property def logger(self): return self._logger + @property + def is_running(self): + return self._started and self._ns3 and not self.ns3.Simulator.IsFinished() + def make_uuid(self): return "uuid%s" % uuid.uuid4() - def is_running(self): - return self._started and not self._stopped - def get_object(self, uuid): return self._objects.get(uuid) def get_typeid(self, uuid): return self._tids.get(uuid) - def singleton(self, clazzname): - uuid = "uuid%s"%clazzname - - if not uuid in self._objects: - if not hasattr(self.ns3, clazzname): - msg = "Type %s not supported" % (typeid) - self.logger.error(msg) - - clazz = getattr(self.ns3, clazzname) - self._objects[uuid] = clazz - - typeid = "ns3::%s" % clazzname - self._tids[uuid] = typeid - - return uuid - def create(self, clazzname, *args): if not hasattr(self.ns3, clazzname): msg = "Type %s not supported" % (clazzname) self.logger.error(msg) - - realargs = [self.get_object(arg) if \ - str(arg).startswith("uuid") else arg for arg in args] - + clazz = getattr(self.ns3, clazzname) + + # arguments starting with 'uuid' identify ns-3 C++ + # objects and must be replaced by the actual object + realargs = self.replace_args(args) + obj = clazz(*realargs) uuid = self.make_uuid() @@ -137,14 +120,16 @@ class NS3Wrapper(object): return uuid def invoke(self, uuid, operation, *args): - obj = self.get_object(uuid) - + if uuid.startswith(SINGLETON): + obj = self._singleton(uuid) + else: + obj = self.get_object(uuid) + method = getattr(obj, operation) - # arguments starting with 'uuid' identifie stored + # arguments starting with 'uuid' identify ns-3 C++ # objects and must be replaced by the actual object - realargs = [self.get_object(arg) if \ - str(arg).startswith("uuid") else arg for arg in args] + realargs = self.replace_args(args) result = method(*realargs) @@ -170,13 +155,15 @@ class NS3Wrapper(object): # to set the value by scheduling an event, else # we risk to corrupt the state of the # simulation. - if self._is_running: + if self.is_running: # schedule the event in the Simulator self._schedule_event(self._condition, set_attr, obj, name, ns3_value) else: set_attr(obj, name, ns3_value) + return value + def get(self, uuid, name): obj = self.get_object(uuid) ns3_value = self._create_ns3_value(uuid, name) @@ -184,7 +171,7 @@ class NS3Wrapper(object): def get_attr(obj, name, ns3_value): obj.GetAttribute(name, ns3_value) - if self._is_running: + if self.is_running: # schedule the event in the Simulator self._schedule_event(self._condition, get_attr, obj, name, ns3_value) @@ -212,12 +199,12 @@ class NS3Wrapper(object): self.ns3.Simulator.Stop() else: self.ns3.Simulator.Stop(self.ns3.Time(time)) - self._stopped = True def shutdown(self): if self.ns3: - if not self.ns3.Simulator.IsFinished(): - self.stop() + while not self.ns3.Simulator.IsFinished(): + #self.logger.debug("Waiting for simulation to finish") + time.sleep(0.5) # TODO!!!! SHOULD WAIT UNTIL THE THREAD FINISHES if self._simulator_thread: @@ -329,6 +316,28 @@ class NS3Wrapper(object): ns3_value.DeserializeFromString(str_value, checker) return ns3_value + # singletons are identified as "ns3::ClassName" + def _singleton(self, ident): + if not ident.startswith(SINGLETON): + return None + + clazzname = ident[ident.find("::")+2:] + if not hasattr(self.ns3, clazzname): + msg = "Type %s not supported" % (clazzname) + self.logger.error(msg) + + return getattr(self.ns3, clazzname) + + # replace uuids and singleton references for the real objects + def replace_args(self, args): + realargs = [self.get_object(arg) if \ + str(arg).startswith("uuid") else arg for arg in args] + + realargs = [self._singleton(arg) if \ + str(arg).startswith(SINGLETON) else arg for arg in realargs] + + return realargs + def _load_ns3_module(self): if self.ns3: return diff --git a/src/nepi/resources/ns3/ns3wrapper_server.py b/src/nepi/resources/ns3/ns3wrapper_server.py deleted file mode 100644 index 27739481..00000000 --- a/src/nepi/resources/ns3/ns3wrapper_server.py +++ /dev/null @@ -1,364 +0,0 @@ -# -# NEPI, a framework to manage network experiments -# Copyright (C) 2013 INRIA -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# - - - -class Server(object): - def __init__(self, root_dir = ".", log_level = "ERROR", - environment_setup = "", clean_root = False): - self._root_dir = root_dir - self._clean_root = clean_root - self._stop = False - self._ctrl_sock = None - self._log_level = log_level - self._rdbuf = "" - self._environment_setup = environment_setup - - def run(self): - try: - if self.daemonize(): - self.post_daemonize() - self.loop() - self.cleanup() - # ref: "os._exit(0)" - # can not return normally after fork beacuse no exec was done. - # This means that if we don't do a os._exit(0) here the code that - # follows the call to "Server.run()" in the "caller code" will be - # executed... but by now it has already been executed after the - # first process (the one that did the first fork) returned. - os._exit(0) - except: - print >>sys.stderr, "SERVER_ERROR." - self.log_error() - self.cleanup() - os._exit(0) - print >>sys.stderr, "SERVER_READY." - - def daemonize(self): - # pipes for process synchronization - (r, w) = os.pipe() - - # build root folder - root = os.path.normpath(self._root_dir) - if self._root_dir not in [".", ""] and os.path.exists(root) \ - and self._clean_root: - shutil.rmtree(root) - if not os.path.exists(root): - os.makedirs(root, 0755) - - pid1 = os.fork() - if pid1 > 0: - os.close(w) - while True: - try: - os.read(r, 1) - except OSError, e: # pragma: no cover - if e.errno == errno.EINTR: - continue - else: - raise - break - os.close(r) - # os.waitpid avoids leaving a (zombie) process - st = os.waitpid(pid1, 0)[1] - if st: - raise RuntimeError("Daemonization failed") - # return 0 to inform the caller method that this is not the - # daemonized process - return 0 - os.close(r) - - # Decouple from parent environment. - os.chdir(self._root_dir) - os.umask(0) - os.setsid() - - # fork 2 - pid2 = os.fork() - if pid2 > 0: - # see ref: "os._exit(0)" - os._exit(0) - - # close all open file descriptors. - max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] - if (max_fd == resource.RLIM_INFINITY): - max_fd = MAX_FD - for fd in range(3, max_fd): - if fd != w: - try: - os.close(fd) - except OSError: - pass - - # Redirect standard file descriptors. - stdin = open(DEV_NULL, "r") - stderr = stdout = open(STD_ERR, "a", 0) - os.dup2(stdin.fileno(), sys.stdin.fileno()) - # NOTE: sys.stdout.write will still be buffered, even if the file - # was opened with 0 buffer - os.dup2(stdout.fileno(), sys.stdout.fileno()) - os.dup2(stderr.fileno(), sys.stderr.fileno()) - - # setup environment - if self._environment_setup: - # parse environment variables and pass to child process - # do it by executing shell commands, in case there's some heavy setup involved - envproc = subprocess.Popen( - [ "bash", "-c", - "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" % - ( self._environment_setup, ) ], - stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE - ) - out,err = envproc.communicate() - - # parse new environment - if out: - environment = dict(map(lambda x:x.split("\x02"), out.split("\x01"))) - - # apply to current environment - for name, value in environment.iteritems(): - os.environ[name] = value - - # apply pythonpath - if 'PYTHONPATH' in environment: - sys.path = environment['PYTHONPATH'].split(':') + sys.path - - # create control socket - self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - self._ctrl_sock.bind(CTRL_SOCK) - except socket.error: - # Address in use, check pidfile - pid = None - try: - pidfile = open(CTRL_PID, "r") - pid = pidfile.read() - pidfile.close() - pid = int(pid) - except: - # no pidfile - pass - - if pid is not None: - # Check process liveliness - if not os.path.exists("/proc/%d" % (pid,)): - # Ok, it's dead, clean the socket - os.remove(CTRL_SOCK) - - # try again - self._ctrl_sock.bind(CTRL_SOCK) - - self._ctrl_sock.listen(0) - - # Save pidfile - pidfile = open(CTRL_PID, "w") - pidfile.write(str(os.getpid())) - pidfile.close() - - # let the parent process know that the daemonization is finished - os.write(w, "\n") - os.close(w) - return 1 - - def post_daemonize(self): - os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level - # QT, for some strange reason, redefines the SIGCHILD handler to write - # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received. - # Server dameonization closes all file descriptors from fileno '3', - # but the overloaded handler (inherited by the forked process) will - # keep trying to write the \0 to fileno 'x', which might have been reused - # after closing, for other operations. This is bad bad bad when fileno 'x' - # is in use for communication pouroses, because unexpected \0 start - # appearing in the communication messages... this is exactly what happens - # when using netns in daemonized form. Thus, be have no other alternative than - # restoring the SIGCHLD handler to the default here. - import signal - signal.signal(signal.SIGCHLD, signal.SIG_DFL) - - def loop(self): - while not self._stop: - conn, addr = self._ctrl_sock.accept() - self.log_error("ACCEPTED CONNECTION: %s" % (addr,)) - conn.settimeout(5) - while not self._stop: - try: - msg = self.recv_msg(conn) - except socket.timeout, e: - #self.log_error("SERVER recv_msg: connection timedout ") - continue - - if not msg: - self.log_error("CONNECTION LOST") - break - - if msg == STOP_MSG: - self._stop = True - reply = self.stop_action() - else: - reply = self.reply_action(msg) - - try: - self.send_reply(conn, reply) - except socket.error: - self.log_error() - self.log_error("NOTICE: Awaiting for reconnection") - break - try: - conn.close() - except: - # Doesn't matter - self.log_error() - - def recv_msg(self, conn): - data = [self._rdbuf] - chunk = data[0] - while '\n' not in chunk: - try: - chunk = conn.recv(1024) - except (OSError, socket.error), e: - if e[0] != errno.EINTR: - raise - else: - continue - if chunk: - data.append(chunk) - else: - # empty chunk = EOF - break - data = ''.join(data).split('\n',1) - while len(data) < 2: - data.append('') - data, self._rdbuf = data - - decoded = base64.b64decode(data) - return decoded.rstrip() - - def send_reply(self, conn, reply): - encoded = base64.b64encode(reply) - conn.send("%s\n" % encoded) - - def cleanup(self): - try: - self._ctrl_sock.close() - os.remove(CTRL_SOCK) - except: - self.log_error() - - def stop_action(self): - return "Stopping server" - - def reply_action(self, msg): - return "Reply to: %s" % msg - - def log_error(self, text = None, context = ''): - if text == None: - text = traceback.format_exc() - date = time.strftime("%Y-%m-%d %H:%M:%S") - if context: - context = " (%s)" % (context,) - sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text)) - return text - - def log_debug(self, text): - if self._log_level == DC.DEBUG_LEVEL: - date = time.strftime("%Y-%m-%d %H:%M:%S") - sys.stderr.write("DEBUG: %s\n%s\n" % (date, text)) - -class Forwarder(object): - def __init__(self, root_dir = "."): - self._ctrl_sock = None - self._root_dir = root_dir - self._stop = False - self._rdbuf = "" - - def forward(self): - self.connect() - print >>sys.stderr, "FORWARDER_READY." - while not self._stop: - data = self.read_data() - if not data: - # Connection to client lost - break - self.send_to_server(data) - - data = self.recv_from_server() - if not data: - # Connection to server lost - raise IOError, "Connection to server lost while "\ - "expecting response" - self.write_data(data) - self.disconnect() - - def read_data(self): - return sys.stdin.readline() - - def write_data(self, data): - sys.stdout.write(data) - # sys.stdout.write is buffered, this is why we need to do a flush() - sys.stdout.flush() - - def send_to_server(self, data): - try: - self._ctrl_sock.send(data) - except (IOError, socket.error), e: - if e[0] == errno.EPIPE: - self.connect() - self._ctrl_sock.send(data) - else: - raise e - encoded = data.rstrip() - msg = base64.b64decode(encoded) - if msg == STOP_MSG: - self._stop = True - - def recv_from_server(self): - data = [self._rdbuf] - chunk = data[0] - while '\n' not in chunk: - try: - chunk = self._ctrl_sock.recv(1024) - except (OSError, socket.error), e: - if e[0] != errno.EINTR: - raise - continue - if chunk: - data.append(chunk) - else: - # empty chunk = EOF - break - data = ''.join(data).split('\n',1) - while len(data) < 2: - data.append('') - data, self._rdbuf = data - - return data+'\n' - - def connect(self): - self.disconnect() - self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock_addr = os.path.join(self._root_dir, CTRL_SOCK) - self._ctrl_sock.connect(sock_addr) - - def disconnect(self): - try: - self._ctrl_sock.close() - except: - pass - diff --git a/test/resources/ns3/linuxns3client.py b/test/resources/ns3/linuxns3client.py new file mode 100644 index 00000000..207e4378 --- /dev/null +++ b/test/resources/ns3/linuxns3client.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + + +# Test based on ns-3 csma/examples/csma-ping.cc file +# +# Network topology +# +# n0 n1 n2 n3 +# | | | | +# ----------------- +# +# node n0 sends IGMP traffic to node n3 + + +from nepi.resources.ns3.ns3server import run_server +from nepi.resources.ns3.linuxns3client import LinuxNS3Client + +import os +import threading +import time +import unittest + +class LinuxNS3ClientTest(unittest.TestCase): + def setUp(self): + self.socket_name = os.path.join("/", "tmp", "NS3WrapperServer.sock") + + def tearDown(self): + os.remove(self.socket_name) + + def test_runtime_attr_modify(self): + thread = threading.Thread(target = run_server, + args = [self.socket_name]) + + thread.setDaemon(True) + thread.start() + + time.sleep(3) + + # Verify that the communication socket was created + self.assertTrue(os.path.exists(self.socket_name)) + + # Instantiate the NS3 client + client = LinuxNS3Client(self.socket_name) + + # Define a real time simulation + stype = client.create("StringValue", "ns3::RealtimeSimulatorImpl") + client.invoke("singleton::GlobalValue", "Bind", "SimulatorImplementationType", stype) + btrue = client.create("BooleanValue", True) + client.invoke("singleton::GlobalValue", "Bind", "ChecksumEnabled", btrue) + + # Create Node + n1 = client.create("Node") + self.assertTrue(n1.startswith("uuid")) + + ## Install internet stack + ipv41 = client.create("Ipv4L3Protocol") + client.invoke(n1, "AggregateObject", ipv41) + + arp1 = client.create("ArpL3Protocol") + client.invoke(n1, "AggregateObject", arp1) + + icmp1 = client.create("Icmpv4L4Protocol") + client.invoke(n1, "AggregateObject", icmp1) + + ## Add IPv4 routing + lr1 = client.create("Ipv4ListRouting") + client.invoke(ipv41, "SetRoutingProtocol", lr1) + sr1 = client.create("Ipv4StaticRouting") + client.invoke(lr1, "AddRoutingProtocol", sr1, 1) + + ## NODE 2 + n2 = client.create("Node") + + ## Install internet stack + ipv42 = client.create("Ipv4L3Protocol") + client.invoke(n2, "AggregateObject", ipv42) + + arp2 = client.create("ArpL3Protocol") + client.invoke(n2, "AggregateObject", arp2) + + icmp2 = client.create("Icmpv4L4Protocol") + client.invoke(n2, "AggregateObject", icmp2) + + ## Add IPv4 routing + lr2 = client.create("Ipv4ListRouting") + client.invoke(ipv42, "SetRoutingProtocol", lr2) + sr2 = client.create("Ipv4StaticRouting") + client.invoke(lr2, "AddRoutingProtocol", sr2, 1) + + ##### Create p2p device and enable ascii tracing + p2pHelper = client.create("PointToPointHelper") + asciiHelper = client.create("AsciiTraceHelper") + + # Iface for node1 + p1 = client.create("PointToPointNetDevice") + client.invoke(n1, "AddDevice", p1) + q1 = client.create("DropTailQueue") + client.invoke(p1, "SetQueue", q1) + + # Add IPv4 address + ifindex1 = client.invoke(ipv41, "AddInterface", p1) + mask1 = client.create("Ipv4Mask", "/30") + addr1 = client.create("Ipv4Address", "10.0.0.1") + inaddr1 = client.create("Ipv4InterfaceAddress", addr1, mask1) + client.invoke(ipv41, "AddAddress", ifindex1, inaddr1) + client.invoke(ipv41, "SetMetric", ifindex1, 1) + client.invoke(ipv41, "SetUp", ifindex1) + + # Enable collection of Ascii format to a specific file + filepath1 = "trace-p2p-1.tr" + stream1 = client.invoke(asciiHelper, "CreateFileStream", filepath1) + client.invoke(p2pHelper, "EnableAscii", stream1, p1) + + # Iface for node2 + p2 = client.create("PointToPointNetDevice") + client.invoke(n2, "AddDevice", p2) + q2 = client.create("DropTailQueue") + client.invoke(p2, "SetQueue", q2) + + # Add IPv4 address + ifindex2 = client.invoke(ipv42, "AddInterface", p2) + mask2 = client.create("Ipv4Mask", "/30") + addr2 = client.create("Ipv4Address", "10.0.0.2") + inaddr2 = client.create("Ipv4InterfaceAddress", addr2, mask2) + client.invoke(ipv42, "AddAddress", ifindex2, inaddr2) + client.invoke(ipv42, "SetMetric", ifindex2, 1) + client.invoke(ipv42, "SetUp", ifindex2) + + # Enable collection of Ascii format to a specific file + filepath2 = "trace-p2p-2.tr" + stream2 = client.invoke(asciiHelper, "CreateFileStream", filepath2) + client.invoke(p2pHelper, "EnableAscii", stream2, p2) + + # Create channel + chan = client.create("PointToPointChannel") + client.set(chan, "Delay", "0s") + client.invoke(p1, "Attach", chan) + client.invoke(p2, "Attach", chan) + + ### create pinger + ping = client.create("V4Ping") + client.invoke(n1, "AddApplication", ping) + client.set (ping, "Remote", "10.0.0.2") + client.set (ping, "Interval", "1s") + client.set (ping, "Verbose", True) + client.set (ping, "StartTime", "0s") + client.set (ping, "StopTime", "20s") + + ### run Simulation + client.stop(time = "21s") + client.start() + + time.sleep(1) + + client.set(chan, "Delay", "5s") + + time.sleep(5) + + client.set(chan, "Delay", "0s") + + # wait until simulation is over + client.shutdown() + + +if __name__ == '__main__': + unittest.main() + diff --git a/test/resources/ns3/ns3wrapper.py b/test/resources/ns3/ns3wrapper.py index 569decb0..70bf970e 100755 --- a/test/resources/ns3/ns3wrapper.py +++ b/test/resources/ns3/ns3wrapper.py @@ -32,9 +32,8 @@ from nepi.resources.ns3.ns3wrapper import NS3Wrapper -import os.path +import subprocess import time -import tempfile import unittest class NS3WrapperTest(unittest.TestCase): @@ -92,11 +91,10 @@ class NS3WrapperTest(unittest.TestCase): addresses = wrapper.invoke(ip, "Assign", devs) ### Create source - config = wrapper.singleton("Config") - # Config::SetDefault ("ns3::Ipv4RawSocketImpl::Protocol", StringValue ("2")); proto = wrapper.create("StringValue", "2") - wrapper.invoke(config, "SetDefault", "ns3::Ipv4RawSocketImpl::Protocol", proto) + wrapper.invoke("singleton::Config", "SetDefault", + "ns3::Ipv4RawSocketImpl::Protocol", proto) # InetSocketAddress dst = InetSocketAddress (addresses.GetAddress (3)); addr3 = wrapper.invoke(addresses, "GetAddress", 3) @@ -175,10 +173,10 @@ class NS3WrapperTest(unittest.TestCase): ### configure tracing #csma.EnablePcapAll ("csma-ping", false); - wrapper.invoke(csma, "EnablePcapAll", "csma-ping-pcap", False) + wrapper.invoke(csma, "EnablePcapAll", "/tmp/csma-ping-pcap", False) #csma.EnableAsciiAll ("csma-ping", false); - wrapper.invoke(csma, "EnableAsciiAll", "csma-ping-ascii") + wrapper.invoke(csma, "EnableAsciiAll", "/tmp/csma-ping-ascii") def SinkRx(packet, address): print packet @@ -190,27 +188,231 @@ class NS3WrapperTest(unittest.TestCase): #Config::ConnectWithoutContext ("/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", # MakeCallback (&SinkRx)); #cb = wrapper.create("MakeCallback", SinkRx) - #wrapper.invoke(config, "ConnectWithoutContext", + #wrapper.invoke("singleton::Config", "ConnectWithoutContext", # "/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", cb) # Config::Connect ("/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt", # MakeCallback (&PingRtt)); #cb2 = wrapper.create("MakeCallback", PingRtt) - #wrapper.invoke(config, "ConnectWithoutContext", + #wrapper.invoke("singleton::Config", "ConnectWithoutContext", # "/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt", # cb2) # Packet::EnablePrinting (); - packet = wrapper.singleton("Packet") - wrapper.invoke(packet, "EnablePrinting") + wrapper.invoke("singleton::Packet", "EnablePrinting") ### run Simulation # Simulator::Run (); - simulator = wrapper.singleton("Simulator") - wrapper.invoke(simulator, "Run") + wrapper.invoke("singleton::Simulator", "Run") # Simulator::Destroy (); - wrapper.invoke(simulator, "Destroy") + wrapper.invoke("singleton::Simulator", "Destroy") + + p = subprocess.Popen("ls /tmp/csma-ping-* | wc -w", stdout = subprocess.PIPE, + stderr = subprocess.PIPE, shell = True) + (out, err) = p.communicate() + + self.assertEquals(int(out), 8) + + p = subprocess.Popen("rm /tmp/csma-ping-*", shell = True) + p.communicate() + + def test_start(self): + wrapper = NS3Wrapper() + + ### create 2 nodes + c = wrapper.create("NodeContainer") + + # c.Create (2); + wrapper.invoke(c, "Create", 2) + + ### connect the nodes to a shared channel + # CsmaHelper csma; + csma = wrapper.create("CsmaHelper") + + # csma.SetChannelAttribute ("DataRate", DataRateValue (DataRate (5000000))); + dr = wrapper.create("DataRate", 5000000) + drv = wrapper.create("DataRateValue", dr) + wrapper.invoke(csma, "SetChannelAttribute", "DataRate", drv) + + # csma.SetChannelAttribute ("Delay", TimeValue (MilliSeconds (2))); + ms = wrapper.create("MilliSeconds", 2) + delay = wrapper.create("TimeValue", ms) + wrapper.invoke(csma, "SetChannelAttribute", "Delay", delay) + + # NetDeviceContainer devs = csma.Install (c); + devs = wrapper.invoke(csma, "Install", c) + + ### add IP stack to all nodes + # InternetStackHelper ipStack; + ipStack = wrapper.create("InternetStackHelper") + + # ipStack.Install (c); + wrapper.invoke(ipStack, "Install", c) + + ### assign ip addresses + #Ipv4AddressHelper ip; + ip = wrapper.create("Ipv4AddressHelper") + + # ip.SetBase ("192.168.1.0", "255.255.255.0"); + ip4 = wrapper.create("Ipv4Address", "192.168.1.0") + mask4 = wrapper.create("Ipv4Mask", "255.255.255.0") + wrapper.invoke(ip, "SetBase", ip4, mask4) + + # Ipv4InterfaceContainer addresses = ip.Assign (devs); + addresses = wrapper.invoke(ip, "Assign", devs) + + ### create pinger + #V4PingHelper ping = V4PingHelper (addresses.GetAddress (1)); + addr1 = wrapper.invoke(addresses, "GetAddress", 1) + ping = wrapper.create("V4PingHelper", addr1) + btrue = wrapper.create("BooleanValue", True) + wrapper.invoke(ping, "SetAttribute", "Verbose", btrue) + + #apps = ping.Install (pingers); + n0 = wrapper.invoke(c, "Get", 0) + apps = wrapper.invoke(ping, "Install", n0) + + #apps.Start (Seconds (0.0)); + s = wrapper.create ("Seconds", 0.0) + wrapper.invoke (apps, "Start", s) + + #apps.Stop (Seconds (5.0)); + s = wrapper.create ("Seconds", 5.0) + wrapper.invoke (apps, "Stop", s) + + ### run Simulation + # Simulator::Stop (6.0); + wrapper.stop(time = "6s") + + # Simulator::Run (); + wrapper.start() + + # wait until simulation is over + wrapper.shutdown() + + def test_runtime_attr_modify(self): + wrapper = NS3Wrapper() + + # Define a real time simulation + stype = wrapper.create("StringValue", "ns3::RealtimeSimulatorImpl") + wrapper.invoke("singleton::GlobalValue", "Bind", "SimulatorImplementationType", stype) + btrue = wrapper.create("BooleanValue", True) + wrapper.invoke("singleton::GlobalValue", "Bind", "ChecksumEnabled", btrue) + + ### create 2 nodes + ## NODE 1 + n1 = wrapper.create("Node") + + ## Install internet stack + ipv41 = wrapper.create("Ipv4L3Protocol") + wrapper.invoke(n1, "AggregateObject", ipv41) + + arp1 = wrapper.create("ArpL3Protocol") + wrapper.invoke(n1, "AggregateObject", arp1) + + icmp1 = wrapper.create("Icmpv4L4Protocol") + wrapper.invoke(n1, "AggregateObject", icmp1) + + ## Add IPv4 routing + lr1 = wrapper.create("Ipv4ListRouting") + wrapper.invoke(ipv41, "SetRoutingProtocol", lr1) + sr1 = wrapper.create("Ipv4StaticRouting") + wrapper.invoke(lr1, "AddRoutingProtocol", sr1, 1) + + ## NODE 2 + n2 = wrapper.create("Node") + + ## Install internet stack + ipv42 = wrapper.create("Ipv4L3Protocol") + wrapper.invoke(n2, "AggregateObject", ipv42) + + arp2 = wrapper.create("ArpL3Protocol") + wrapper.invoke(n2, "AggregateObject", arp2) + + icmp2 = wrapper.create("Icmpv4L4Protocol") + wrapper.invoke(n2, "AggregateObject", icmp2) + + ## Add IPv4 routing + lr2 = wrapper.create("Ipv4ListRouting") + wrapper.invoke(ipv42, "SetRoutingProtocol", lr2) + sr2 = wrapper.create("Ipv4StaticRouting") + wrapper.invoke(lr2, "AddRoutingProtocol", sr2, 1) + + ##### Create p2p device and enable ascii tracing + + p2pHelper = wrapper.create("PointToPointHelper") + asciiHelper = wrapper.create("AsciiTraceHelper") + + # Iface for node1 + p1 = wrapper.create("PointToPointNetDevice") + wrapper.invoke(n1, "AddDevice", p1) + q1 = wrapper.create("DropTailQueue") + wrapper.invoke(p1, "SetQueue", q1) + + # Add IPv4 address + ifindex1 = wrapper.invoke(ipv41, "AddInterface", p1) + mask1 = wrapper.create("Ipv4Mask", "/30") + addr1 = wrapper.create("Ipv4Address", "10.0.0.1") + inaddr1 = wrapper.create("Ipv4InterfaceAddress", addr1, mask1) + wrapper.invoke(ipv41, "AddAddress", ifindex1, inaddr1) + wrapper.invoke(ipv41, "SetMetric", ifindex1, 1) + wrapper.invoke(ipv41, "SetUp", ifindex1) + + # Enable collection of Ascii format to a specific file + filepath1 = "trace-p2p-1.tr" + stream1 = wrapper.invoke(asciiHelper, "CreateFileStream", filepath1) + wrapper.invoke(p2pHelper, "EnableAscii", stream1, p1) + + # Iface for node2 + p2 = wrapper.create("PointToPointNetDevice") + wrapper.invoke(n2, "AddDevice", p2) + q2 = wrapper.create("DropTailQueue") + wrapper.invoke(p2, "SetQueue", q2) + + # Add IPv4 address + ifindex2 = wrapper.invoke(ipv42, "AddInterface", p2) + mask2 = wrapper.create("Ipv4Mask", "/30") + addr2 = wrapper.create("Ipv4Address", "10.0.0.2") + inaddr2 = wrapper.create("Ipv4InterfaceAddress", addr2, mask2) + wrapper.invoke(ipv42, "AddAddress", ifindex2, inaddr2) + wrapper.invoke(ipv42, "SetMetric", ifindex2, 1) + wrapper.invoke(ipv42, "SetUp", ifindex2) + + # Enable collection of Ascii format to a specific file + filepath2 = "trace-p2p-2.tr" + stream2 = wrapper.invoke(asciiHelper, "CreateFileStream", filepath2) + wrapper.invoke(p2pHelper, "EnableAscii", stream2, p2) + + # Create channel + chan = wrapper.create("PointToPointChannel") + wrapper.set(chan, "Delay", "0s") + wrapper.invoke(p1, "Attach", chan) + wrapper.invoke(p2, "Attach", chan) + + ### create pinger + ping = wrapper.create("V4Ping") + wrapper.invoke(n1, "AddApplication", ping) + wrapper.set (ping, "Remote", "10.0.0.2") + wrapper.set (ping, "Interval", "1s") + wrapper.set (ping, "Verbose", True) + wrapper.set (ping, "StartTime", "0s") + wrapper.set (ping, "StopTime", "20s") + + ### run Simulation + wrapper.stop(time = "21s") + wrapper.start() + + time.sleep(1) + + wrapper.set(chan, "Delay", "5s") + + time.sleep(5) + + wrapper.set(chan, "Delay", "0s") + + # wait until simulation is over + wrapper.shutdown() if __name__ == '__main__': unittest.main() -- 2.43.0