NS3Wrapper server and client
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 13 Jan 2014 12:02:56 +0000 (13:02 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 13 Jan 2014 12:02:56 +0000 (13:02 +0100)
src/nepi/resources/ns3/linuxns3client.py [new file with mode: 0644]
src/nepi/resources/ns3/ns3client.py [new file with mode: 0644]
src/nepi/resources/ns3/ns3server.py [new file with mode: 0644]
src/nepi/resources/ns3/ns3wrapper.py
src/nepi/resources/ns3/ns3wrapper_server.py [deleted file]
test/resources/ns3/linuxns3client.py [new file with mode: 0644]
test/resources/ns3/ns3wrapper.py

diff --git a/src/nepi/resources/ns3/linuxns3client.py b/src/nepi/resources/ns3/linuxns3client.py
new file mode 100644 (file)
index 0000000..972e800
--- /dev/null
@@ -0,0 +1,93 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import base64
+import cPickle
+import errno
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+
+from nepi.resources.ns3.ns3client import NS3Client
+from nepi.resources.ns3.ns3server import NS3WrapperMessage
+
+class LinuxNS3Client(NS3Client):
+    def __init__(self, socket_name):
+        super(LinuxNS3Client, self).__init__()
+        self._socket_name = socket_name
+
+    @property
+    def socket_name(self):
+        return self._socket_name
+
+    def send_msg(self, msg, *args):
+        args = list(args)
+
+        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        sock.connect(self.socket_name)
+
+        args.insert(0, msg)
+        def encode(arg):
+            arg = cPickle.dumps(arg)
+            return base64.b64encode(arg) 
+
+        encoded = "|".join(map(encode, args))
+        sock.send("%s\n" % encoded)
+        
+        reply = sock.recv(1024)
+        return cPickle.loads(base64.b64decode(reply))
+
+    def create(self, clazzname, *args):
+        args = list(args)
+        args.insert(0, clazzname)
+        
+        return self.send_msg(NS3WrapperMessage.CREATE, *args)
+
+    def invoke(self, uuid, operation, *args):
+        args = list(args)
+        args.insert(0, operation)
+        args.insert(0, uuid)
+
+        return self.send_msg(NS3WrapperMessage.INVOKE, *args)
+
+    def set(self, uuid, name, value):
+        args = [uuid, name, value]
+
+        return self.send_msg(NS3WrapperMessage.SET, *args)
+
+    def get(self, uuid, name):
+        args = [uuid, name]
+
+        return self.send_msg(NS3WrapperMessage.GET, *args)
+
+    def trace(self, *args):
+        return self.send_msg(NS3WrapperMessage.TRACE, *args)
+
+    def start(self):
+        return self.send_msg(NS3WrapperMessage.START, [])
+
+    def stop(self, time = None):
+        args = None
+        if time:
+            args = [time]
+
+        return self.send_msg(NS3WrapperMessage.STOP, *args)
+
+    def shutdown(self):
+        return self.send_msg(NS3WrapperMessage.SHUTDOWN, [])
+
diff --git a/src/nepi/resources/ns3/ns3client.py b/src/nepi/resources/ns3/ns3client.py
new file mode 100644 (file)
index 0000000..b225780
--- /dev/null
@@ -0,0 +1,56 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import base64
+import cPickle
+import errno
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+
+from ns3server import NS3WrapperMessage
+
+class NS3Client(object):
+    """ Common Interface for NS3 client classes """
+    def __init__(self):
+        super(NS3Client, self).__init__()
+
+    def create(self, clazzname, *args):
+        pass
+
+    def invoke(self, uuid, operation, *args):
+        pass
+
+    def set(self, uuid, name, value):
+        pass
+
+    def get(self, uuid, name):
+        pass
+
+    def trace(self, *args):
+        pass
+
+    def start(self):
+        pass
+
+    def stop(self, time = None):
+        pass
+
+    def shutdown(self):
+        pass
+
diff --git a/src/nepi/resources/ns3/ns3server.py b/src/nepi/resources/ns3/ns3server.py
new file mode 100644 (file)
index 0000000..f538259
--- /dev/null
@@ -0,0 +1,183 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2014 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import base64
+import cPickle
+import errno
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+
+from ns3wrapper import NS3Wrapper
+
+class NS3WrapperMessage:
+    CREATE = "CREATE"
+    INVOKE = "INVOKE"
+    SET = "SET"
+    GET = "GET"
+    TRACE = "TRACE"
+    START = "START"
+    STOP = "STOP"
+    SHUTDOWN = "SHUTDOWN"
+
+def handle_message(ns3_wrapper, msg, args):
+    if msg == NS3WrapperMessage.SHUTDOWN:
+        ns3_wrapper.shutdown()
+        return "BYEBYE"
+    
+    if msg == NS3WrapperMessage.STOP:
+        time = None
+        if args:
+            time = args[0]
+
+        ns3_wrapper.stop(time=time)
+        return "STOPPED"
+
+    if msg == NS3WrapperMessage.START:
+        ns3_wrapper.start()
+        return "STARTED"
+
+    if msg == NS3WrapperMessage.CREATE:
+        clazzname = args.pop(0)
+        
+        uuid = ns3_wrapper.create(clazzname, *args)
+        return uuid
+
+    if msg == NS3WrapperMessage.INVOKE:
+        uuid = args.pop(0)
+        operation = args.pop(0)
+        
+        uuid = ns3_wrapper.invoke(uuid, operation, *args)
+        return uuid
+
+    if msg == NS3WrapperMessage.GET:
+        uuid = args.pop(0)
+        name = args.pop(0)
+
+        value = ns3_wrapper.get(uuid, name)
+        return value
+
+    if msg == NS3WrapperMessage.SET:
+        uuid = args.pop(0)
+        name = args.pop(0)
+        value = args.pop(0)
+
+        value = ns3_wrapper.set(uuid, name, value)
+        return value
+    if msg == NS3WrapperMessage.TRACE:
+        return "NOT IMPLEMENTED"
+
+def create_socket(socket_name):
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    sock.bind(socket_name)
+    return sock
+
+def recv_msg(conn):
+    msg = []
+    chunk = ''
+
+    while '\n' not in chunk:
+        try:
+            chunk = conn.recv(1024)
+        except (OSError, socket.error), e:
+            if e[0] != errno.EINTR:
+                raise
+            # Ignore eintr errors
+            continue
+
+        if chunk:
+            msg.append(chunk)
+        else:
+            # empty chunk = EOF
+            break
+    msg = ''.join(msg).split('\n')[0]
+
+    # The message might have arguments that will be appended
+    # as a '|' separated list after the message identifier
+    def decode(arg):
+        arg = base64.b64decode(arg).rstrip()
+        return cPickle.loads(arg)
+
+    dargs = map(decode, msg.split("|"))
+
+    # decoded message
+    dmsg = dargs.pop(0)
+
+    return (dmsg, dargs)
+
+def send_reply(conn, reply):
+    encoded = base64.b64encode(cPickle.dumps(reply))
+    conn.send("%s\n" % encoded)
+
+def get_options():
+    usage = ("usage: %prog -S <socket-name>")
+    
+    parser = OptionParser(usage = usage)
+
+    parser.add_option("-S", "--socket-name", dest="socket_name",
+        help = "Name for the unix socket used to interact with this process", 
+        default = "tap.sock", type="str")
+
+    (options, args) = parser.parse_args()
+    
+    return options.socket_name
+
+def run_server(socket_name): 
+    ns3_wrapper = NS3Wrapper()
+
+    # create unix socket to receive instructions
+    sock = create_socket(socket_name)
+    sock.listen(0)
+
+    # wait for messages to arrive and process them
+    stop = False
+
+    while not stop:
+        conn, addr = sock.accept()
+        conn.settimeout(5)
+
+        try:
+            (msg, args) = recv_msg(conn)
+        except socket.timeout, e:
+            # Ingore time-out
+            continue
+
+        if not msg:
+            # Ignore - connection lost
+            break
+        ns3_wrapper.logger.debug("Message received %s args %s" % ( msg, str(args)))
+
+        if msg == NS3WrapperMessage.SHUTDOWN:
+           stop = True
+   
+        reply = handle_message(ns3_wrapper, msg, args)
+
+        try:
+            send_reply(conn, reply)
+        except socket.error:
+            break
+
+if __name__ == '__main__':
+            
+    socket_name = get_options()
+
+    run_server(socket_name)
+
index 00175a8..b8fa5bc 100644 (file)
@@ -21,8 +21,15 @@ import logging
 import os
 import sys
 import threading
+import time
 import uuid
 
+# TODO: 
+#       1. ns-3 classes should be identified as ns3::clazzname?
+# 
+
+SINGLETON = "singleton::"
+
 class NS3Wrapper(object):
     def __init__(self, homedir = None):
         super(NS3Wrapper, self).__init__()
@@ -31,7 +38,6 @@ class NS3Wrapper(object):
         self._condition = None
 
         self._started = False
-        self._stopped = False
 
         # holds reference to all C++ objects and variables in the simulation
         self._objects = dict()
@@ -42,11 +48,8 @@ class NS3Wrapper(object):
         # exposed through the Python bindings
         self._tids = dict()
 
-        # Generate unique identifier for the simulation wrapper 
-        self._uuid = self.make_uuid()
-
         # create home dir (where all simulation related files will end up)
-        self._homedir = homedir or os.path.join("/tmp", self.uuid)
+        self._homedir = homedir or os.path.join("/", "tmp", "ns3_wrapper" )
         
         home = os.path.normpath(self.homedir)
         if not os.path.exists(home):
@@ -54,7 +57,7 @@ class NS3Wrapper(object):
 
         # Logging
         loglevel = os.environ.get("NS3LOGLEVEL", "debug")
-        self._logger = logging.getLogger("ns3wrapper.%s" % self.uuid)
+        self._logger = logging.getLogger("ns3wrapper")
         self._logger.setLevel(getattr(logging, loglevel.upper()))
         
         hdlr = logging.FileHandler(os.path.join(self.homedir, "ns3wrapper.log"))
@@ -69,9 +72,6 @@ class NS3Wrapper(object):
         # Load ns-3 shared libraries and import modules
         self._load_ns3_module()
         
-        # Add module as anoter object, so we can reference it later
-        self._objects[self.uuid] = self.ns3
-        
     @property
     def ns3(self):
         return self._ns3
@@ -80,51 +80,34 @@ class NS3Wrapper(object):
     def homedir(self):
         return self._homedir
 
-    @property
-    def uuid(self):
-        return self._uuid
-
     @property
     def logger(self):
         return self._logger
 
+    @property
+    def is_running(self):
+        return self._started and self._ns3 and not self.ns3.Simulator.IsFinished()
+
     def make_uuid(self):
         return "uuid%s" % uuid.uuid4()
 
-    def is_running(self):
-        return self._started and not self._stopped
-
     def get_object(self, uuid):
         return self._objects.get(uuid)
 
     def get_typeid(self, uuid):
         return self._tids.get(uuid)
 
-    def singleton(self, clazzname):
-        uuid = "uuid%s"%clazzname
-
-        if not uuid in self._objects:
-            if not hasattr(self.ns3, clazzname):
-                msg = "Type %s not supported" % (typeid)
-                self.logger.error(msg)
-
-            clazz = getattr(self.ns3, clazzname)
-            self._objects[uuid] = clazz
-
-            typeid = "ns3::%s" % clazzname
-            self._tids[uuid] = typeid
-
-        return uuid
-
     def create(self, clazzname, *args):
         if not hasattr(self.ns3, clazzname):
             msg = "Type %s not supported" % (clazzname) 
             self.logger.error(msg)
-
-        realargs = [self.get_object(arg) if \
-                str(arg).startswith("uuid") else arg for arg in args]
-      
+     
         clazz = getattr(self.ns3, clazzname)
+        # arguments starting with 'uuid' identify ns-3 C++
+        # objects and must be replaced by the actual object
+        realargs = self.replace_args(args)
+       
         obj = clazz(*realargs)
         
         uuid = self.make_uuid()
@@ -137,14 +120,16 @@ class NS3Wrapper(object):
         return uuid
 
     def invoke(self, uuid, operation, *args):
-        obj = self.get_object(uuid)
-        
+        if uuid.startswith(SINGLETON):
+            obj = self._singleton(uuid)
+        else:
+            obj = self.get_object(uuid)
+    
         method = getattr(obj, operation)
 
-        # arguments starting with 'uuid' identifie stored
+        # arguments starting with 'uuid' identify ns-3 C++
         # objects and must be replaced by the actual object
-        realargs = [self.get_object(arg) if \
-                str(arg).startswith("uuid") else arg for arg in args]
+        realargs = self.replace_args(args)
 
         result = method(*realargs)
 
@@ -170,13 +155,15 @@ class NS3Wrapper(object):
         # to set the value by scheduling an event, else
         # we risk to corrupt the state of the
         # simulation.
-        if self._is_running:
+        if self.is_running:
             # schedule the event in the Simulator
             self._schedule_event(self._condition, set_attr, obj,
                     name, ns3_value)
         else:
             set_attr(obj, name, ns3_value)
 
+        return value
+
     def get(self, uuid, name):
         obj = self.get_object(uuid)
         ns3_value = self._create_ns3_value(uuid, name)
@@ -184,7 +171,7 @@ class NS3Wrapper(object):
         def get_attr(obj, name, ns3_value):
             obj.GetAttribute(name, ns3_value)
 
-        if self._is_running:
+        if self.is_running:
             # schedule the event in the Simulator
             self._schedule_event(self._condition, get_attr, obj,
                     name, ns3_value)
@@ -212,12 +199,12 @@ class NS3Wrapper(object):
             self.ns3.Simulator.Stop()
         else:
             self.ns3.Simulator.Stop(self.ns3.Time(time))
-        self._stopped = True
 
     def shutdown(self):
         if self.ns3:
-            if not self.ns3.Simulator.IsFinished():
-                self.stop()
+            while not self.ns3.Simulator.IsFinished():
+                #self.logger.debug("Waiting for simulation to finish")
+                time.sleep(0.5)
             
             # TODO!!!! SHOULD WAIT UNTIL THE THREAD FINISHES
             if self._simulator_thread:
@@ -329,6 +316,28 @@ class NS3Wrapper(object):
         ns3_value.DeserializeFromString(str_value, checker)
         return ns3_value
 
+    # singletons are identified as "ns3::ClassName"
+    def _singleton(self, ident):
+        if not ident.startswith(SINGLETON):
+            return None
+
+        clazzname = ident[ident.find("::")+2:]
+        if not hasattr(self.ns3, clazzname):
+            msg = "Type %s not supported" % (clazzname)
+            self.logger.error(msg)
+
+        return getattr(self.ns3, clazzname)
+
+    # replace uuids and singleton references for the real objects
+    def replace_args(self, args):
+        realargs = [self.get_object(arg) if \
+                str(arg).startswith("uuid") else arg for arg in args]
+        realargs = [self._singleton(arg) if \
+                str(arg).startswith(SINGLETON) else arg for arg in realargs]
+
+        return realargs
     def _load_ns3_module(self):
         if self.ns3:
             return 
diff --git a/src/nepi/resources/ns3/ns3wrapper_server.py b/src/nepi/resources/ns3/ns3wrapper_server.py
deleted file mode 100644 (file)
index 2773948..0000000
+++ /dev/null
@@ -1,364 +0,0 @@
-#
-#    NEPI, a framework to manage network experiments
-#    Copyright (C) 2013 INRIA
-#
-#    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
-#
-#    This program is distributed in the hope that it will be useful,
-#    but WITHOUT ANY WARRANTY; without even the implied warranty of
-#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-#    GNU General Public License for more details.
-#
-#    You should have received a copy of the GNU General Public License
-#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-#
-
-
-
-class Server(object):
-    def __init__(self, root_dir = ".", log_level = "ERROR", 
-            environment_setup = "", clean_root = False):
-        self._root_dir = root_dir
-        self._clean_root = clean_root
-        self._stop = False
-        self._ctrl_sock = None
-        self._log_level = log_level
-        self._rdbuf = ""
-        self._environment_setup = environment_setup
-
-    def run(self):
-        try:
-            if self.daemonize():
-                self.post_daemonize()
-                self.loop()
-                self.cleanup()
-                # ref: "os._exit(0)"
-                # can not return normally after fork beacuse no exec was done.
-                # This means that if we don't do a os._exit(0) here the code that 
-                # follows the call to "Server.run()" in the "caller code" will be 
-                # executed... but by now it has already been executed after the 
-                # first process (the one that did the first fork) returned.
-                os._exit(0)
-        except:
-            print >>sys.stderr, "SERVER_ERROR."
-            self.log_error()
-            self.cleanup()
-            os._exit(0)
-        print >>sys.stderr, "SERVER_READY."
-
-    def daemonize(self):
-        # pipes for process synchronization
-        (r, w) = os.pipe()
-        
-        # build root folder
-        root = os.path.normpath(self._root_dir)
-        if self._root_dir not in [".", ""] and os.path.exists(root) \
-                and self._clean_root:
-            shutil.rmtree(root)
-        if not os.path.exists(root):
-            os.makedirs(root, 0755)
-
-        pid1 = os.fork()
-        if pid1 > 0:
-            os.close(w)
-            while True:
-                try:
-                    os.read(r, 1)
-                except OSError, e: # pragma: no cover
-                    if e.errno == errno.EINTR:
-                        continue
-                    else:
-                        raise
-                break
-            os.close(r)
-            # os.waitpid avoids leaving a <defunc> (zombie) process
-            st = os.waitpid(pid1, 0)[1]
-            if st:
-                raise RuntimeError("Daemonization failed")
-            # return 0 to inform the caller method that this is not the 
-            # daemonized process
-            return 0
-        os.close(r)
-
-        # Decouple from parent environment.
-        os.chdir(self._root_dir)
-        os.umask(0)
-        os.setsid()
-
-        # fork 2
-        pid2 = os.fork()
-        if pid2 > 0:
-            # see ref: "os._exit(0)"
-            os._exit(0)
-
-        # close all open file descriptors.
-        max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
-        if (max_fd == resource.RLIM_INFINITY):
-            max_fd = MAX_FD
-        for fd in range(3, max_fd):
-            if fd != w:
-                try:
-                    os.close(fd)
-                except OSError:
-                    pass
-
-        # Redirect standard file descriptors.
-        stdin = open(DEV_NULL, "r")
-        stderr = stdout = open(STD_ERR, "a", 0)
-        os.dup2(stdin.fileno(), sys.stdin.fileno())
-        # NOTE: sys.stdout.write will still be buffered, even if the file
-        # was opened with 0 buffer
-        os.dup2(stdout.fileno(), sys.stdout.fileno())
-        os.dup2(stderr.fileno(), sys.stderr.fileno())
-        
-        # setup environment
-        if self._environment_setup:
-            # parse environment variables and pass to child process
-            # do it by executing shell commands, in case there's some heavy setup involved
-            envproc = subprocess.Popen(
-                [ "bash", "-c", 
-                    "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
-                        ( self._environment_setup, ) ],
-                stdin = subprocess.PIPE, 
-                stdout = subprocess.PIPE,
-                stderr = subprocess.PIPE
-            )
-            out,err = envproc.communicate()
-
-            # parse new environment
-            if out:
-                environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
-            
-                # apply to current environment
-                for name, value in environment.iteritems():
-                    os.environ[name] = value
-                
-                # apply pythonpath
-                if 'PYTHONPATH' in environment:
-                    sys.path = environment['PYTHONPATH'].split(':') + sys.path
-
-        # create control socket
-        self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        try:
-            self._ctrl_sock.bind(CTRL_SOCK)
-        except socket.error:
-            # Address in use, check pidfile
-            pid = None
-            try:
-                pidfile = open(CTRL_PID, "r")
-                pid = pidfile.read()
-                pidfile.close()
-                pid = int(pid)
-            except:
-                # no pidfile
-                pass
-            
-            if pid is not None:
-                # Check process liveliness
-                if not os.path.exists("/proc/%d" % (pid,)):
-                    # Ok, it's dead, clean the socket
-                    os.remove(CTRL_SOCK)
-            
-            # try again
-            self._ctrl_sock.bind(CTRL_SOCK)
-            
-        self._ctrl_sock.listen(0)
-        
-        # Save pidfile
-        pidfile = open(CTRL_PID, "w")
-        pidfile.write(str(os.getpid()))
-        pidfile.close()
-
-        # let the parent process know that the daemonization is finished
-        os.write(w, "\n")
-        os.close(w)
-        return 1
-
-    def post_daemonize(self):
-        os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
-        # QT, for some strange reason, redefines the SIGCHILD handler to write
-        # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
-        # Server dameonization closes all file descriptors from fileno '3',
-        # but the overloaded handler (inherited by the forked process) will
-        # keep trying to write the \0 to fileno 'x', which might have been reused 
-        # after closing, for other operations. This is bad bad bad when fileno 'x'
-        # is in use for communication pouroses, because unexpected \0 start
-        # appearing in the communication messages... this is exactly what happens 
-        # when using netns in daemonized form. Thus, be have no other alternative than
-        # restoring the SIGCHLD handler to the default here.
-        import signal
-        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-
-    def loop(self):
-        while not self._stop:
-            conn, addr = self._ctrl_sock.accept()
-            self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
-            conn.settimeout(5)
-            while not self._stop:
-                try:
-                    msg = self.recv_msg(conn)
-                except socket.timeout, e:
-                    #self.log_error("SERVER recv_msg: connection timedout ")
-                    continue
-                
-                if not msg:
-                    self.log_error("CONNECTION LOST")
-                    break
-                    
-                if msg == STOP_MSG:
-                    self._stop = True
-                    reply = self.stop_action()
-                else:
-                    reply = self.reply_action(msg)
-                
-                try:
-                    self.send_reply(conn, reply)
-                except socket.error:
-                    self.log_error()
-                    self.log_error("NOTICE: Awaiting for reconnection")
-                    break
-            try:
-                conn.close()
-            except:
-                # Doesn't matter
-                self.log_error()
-
-    def recv_msg(self, conn):
-        data = [self._rdbuf]
-        chunk = data[0]
-        while '\n' not in chunk:
-            try:
-                chunk = conn.recv(1024)
-            except (OSError, socket.error), e:
-                if e[0] != errno.EINTR:
-                    raise
-                else:
-                    continue
-            if chunk:
-                data.append(chunk)
-            else:
-                # empty chunk = EOF
-                break
-        data = ''.join(data).split('\n',1)
-        while len(data) < 2:
-            data.append('')
-        data, self._rdbuf = data
-        
-        decoded = base64.b64decode(data)
-        return decoded.rstrip()
-
-    def send_reply(self, conn, reply):
-        encoded = base64.b64encode(reply)
-        conn.send("%s\n" % encoded)
-       
-    def cleanup(self):
-        try:
-            self._ctrl_sock.close()
-            os.remove(CTRL_SOCK)
-        except:
-            self.log_error()
-
-    def stop_action(self):
-        return "Stopping server"
-
-    def reply_action(self, msg):
-        return "Reply to: %s" % msg
-
-    def log_error(self, text = None, context = ''):
-        if text == None:
-            text = traceback.format_exc()
-        date = time.strftime("%Y-%m-%d %H:%M:%S")
-        if context:
-            context = " (%s)" % (context,)
-        sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
-        return text
-
-    def log_debug(self, text):
-        if self._log_level == DC.DEBUG_LEVEL:
-            date = time.strftime("%Y-%m-%d %H:%M:%S")
-            sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
-
-class Forwarder(object):
-    def __init__(self, root_dir = "."):
-        self._ctrl_sock = None
-        self._root_dir = root_dir
-        self._stop = False
-        self._rdbuf = ""
-
-    def forward(self):
-        self.connect()
-        print >>sys.stderr, "FORWARDER_READY."
-        while not self._stop:
-            data = self.read_data()
-            if not data:
-                # Connection to client lost
-                break
-            self.send_to_server(data)
-            
-            data = self.recv_from_server()
-            if not data:
-                # Connection to server lost
-                raise IOError, "Connection to server lost while "\
-                    "expecting response"
-            self.write_data(data)
-        self.disconnect()
-
-    def read_data(self):
-        return sys.stdin.readline()
-
-    def write_data(self, data):
-        sys.stdout.write(data)
-        # sys.stdout.write is buffered, this is why we need to do a flush()
-        sys.stdout.flush()
-
-    def send_to_server(self, data):
-        try:
-            self._ctrl_sock.send(data)
-        except (IOError, socket.error), e:
-            if e[0] == errno.EPIPE:
-                self.connect()
-                self._ctrl_sock.send(data)
-            else:
-                raise e
-        encoded = data.rstrip() 
-        msg = base64.b64decode(encoded)
-        if msg == STOP_MSG:
-            self._stop = True
-
-    def recv_from_server(self):
-        data = [self._rdbuf]
-        chunk = data[0]
-        while '\n' not in chunk:
-            try:
-                chunk = self._ctrl_sock.recv(1024)
-            except (OSError, socket.error), e:
-                if e[0] != errno.EINTR:
-                    raise
-                continue
-            if chunk:
-                data.append(chunk)
-            else:
-                # empty chunk = EOF
-                break
-        data = ''.join(data).split('\n',1)
-        while len(data) < 2:
-            data.append('')
-        data, self._rdbuf = data
-        
-        return data+'\n'
-    def connect(self):
-        self.disconnect()
-        self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
-        self._ctrl_sock.connect(sock_addr)
-
-    def disconnect(self):
-        try:
-            self._ctrl_sock.close()
-        except:
-            pass
-
diff --git a/test/resources/ns3/linuxns3client.py b/test/resources/ns3/linuxns3client.py
new file mode 100644 (file)
index 0000000..207e437
--- /dev/null
@@ -0,0 +1,185 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+
+# Test based on ns-3 csma/examples/csma-ping.cc file
+#
+# Network topology
+#
+#       n0    n1   n2   n3
+#       |     |    |    |
+#       -----------------
+#
+#  node n0 sends IGMP traffic to node n3
+
+
+from nepi.resources.ns3.ns3server import run_server
+from nepi.resources.ns3.linuxns3client import LinuxNS3Client
+
+import os
+import threading
+import time
+import unittest
+
+class LinuxNS3ClientTest(unittest.TestCase):
+    def setUp(self):
+        self.socket_name = os.path.join("/", "tmp", "NS3WrapperServer.sock")
+
+    def tearDown(self):
+        os.remove(self.socket_name) 
+
+    def test_runtime_attr_modify(self):
+        thread = threading.Thread(target = run_server,
+                args = [self.socket_name])
+
+        thread.setDaemon(True)
+        thread.start()
+
+        time.sleep(3)
+
+        # Verify that the communication socket was created
+        self.assertTrue(os.path.exists(self.socket_name))
+
+        # Instantiate the NS3 client
+        client = LinuxNS3Client(self.socket_name)
+        # Define a real time simulation 
+        stype = client.create("StringValue", "ns3::RealtimeSimulatorImpl")
+        client.invoke("singleton::GlobalValue", "Bind", "SimulatorImplementationType", stype)
+        btrue = client.create("BooleanValue", True)
+        client.invoke("singleton::GlobalValue", "Bind", "ChecksumEnabled", btrue)
+        
+        # Create Node
+        n1 = client.create("Node")
+        self.assertTrue(n1.startswith("uuid"))
+
+        ## Install internet stack
+        ipv41 = client.create("Ipv4L3Protocol")
+        client.invoke(n1, "AggregateObject", ipv41)
+
+        arp1 = client.create("ArpL3Protocol")
+        client.invoke(n1, "AggregateObject", arp1)
+        
+        icmp1 = client.create("Icmpv4L4Protocol")
+        client.invoke(n1, "AggregateObject", icmp1)
+
+        ## Add IPv4 routing
+        lr1 = client.create("Ipv4ListRouting")
+        client.invoke(ipv41, "SetRoutingProtocol", lr1)
+        sr1 = client.create("Ipv4StaticRouting")
+        client.invoke(lr1, "AddRoutingProtocol", sr1, 1)
+
+        ## NODE 2
+        n2 = client.create("Node")
+
+        ## Install internet stack
+        ipv42 = client.create("Ipv4L3Protocol")
+        client.invoke(n2, "AggregateObject", ipv42)
+
+        arp2 = client.create("ArpL3Protocol")
+        client.invoke(n2, "AggregateObject", arp2)
+        
+        icmp2 = client.create("Icmpv4L4Protocol")
+        client.invoke(n2, "AggregateObject", icmp2)
+
+        ## Add IPv4 routing
+        lr2 = client.create("Ipv4ListRouting")
+        client.invoke(ipv42, "SetRoutingProtocol", lr2)
+        sr2 = client.create("Ipv4StaticRouting")
+        client.invoke(lr2, "AddRoutingProtocol", sr2, 1)
+
+        ##### Create p2p device and enable ascii tracing
+        p2pHelper = client.create("PointToPointHelper")
+        asciiHelper = client.create("AsciiTraceHelper")
+
+        # Iface for node1
+        p1 = client.create("PointToPointNetDevice")
+        client.invoke(n1, "AddDevice", p1)
+        q1 = client.create("DropTailQueue")
+        client.invoke(p1, "SetQueue", q1)
+      
+        # Add IPv4 address
+        ifindex1 = client.invoke(ipv41, "AddInterface", p1)
+        mask1 = client.create("Ipv4Mask", "/30")
+        addr1 = client.create("Ipv4Address", "10.0.0.1")
+        inaddr1 = client.create("Ipv4InterfaceAddress", addr1, mask1)
+        client.invoke(ipv41, "AddAddress", ifindex1, inaddr1)
+        client.invoke(ipv41, "SetMetric", ifindex1, 1)
+        client.invoke(ipv41, "SetUp", ifindex1)
+
+        # Enable collection of Ascii format to a specific file
+        filepath1 = "trace-p2p-1.tr"
+        stream1 = client.invoke(asciiHelper, "CreateFileStream", filepath1)
+        client.invoke(p2pHelper, "EnableAscii", stream1, p1)
+       
+        # Iface for node2
+        p2 = client.create("PointToPointNetDevice")
+        client.invoke(n2, "AddDevice", p2)
+        q2 = client.create("DropTailQueue")
+        client.invoke(p2, "SetQueue", q2)
+
+        # Add IPv4 address
+        ifindex2 = client.invoke(ipv42, "AddInterface", p2)
+        mask2 = client.create("Ipv4Mask", "/30")
+        addr2 = client.create("Ipv4Address", "10.0.0.2")
+        inaddr2 = client.create("Ipv4InterfaceAddress", addr2, mask2)
+        client.invoke(ipv42, "AddAddress", ifindex2, inaddr2)
+        client.invoke(ipv42, "SetMetric", ifindex2, 1)
+        client.invoke(ipv42, "SetUp", ifindex2)
+
+        # Enable collection of Ascii format to a specific file
+        filepath2 = "trace-p2p-2.tr"
+        stream2 = client.invoke(asciiHelper, "CreateFileStream", filepath2)
+        client.invoke(p2pHelper, "EnableAscii", stream2, p2)
+
+        # Create channel
+        chan = client.create("PointToPointChannel")
+        client.set(chan, "Delay", "0s")
+        client.invoke(p1, "Attach", chan)
+        client.invoke(p2, "Attach", chan)
+
+        ### create pinger
+        ping = client.create("V4Ping")
+        client.invoke(n1, "AddApplication", ping)
+        client.set (ping, "Remote", "10.0.0.2")
+        client.set (ping, "Interval", "1s")
+        client.set (ping, "Verbose", True)
+        client.set (ping, "StartTime", "0s")
+        client.set (ping, "StopTime", "20s")
+
+        ### run Simulation
+        client.stop(time = "21s")
+        client.start()
+
+        time.sleep(1)
+
+        client.set(chan, "Delay", "5s")
+
+        time.sleep(5)
+
+        client.set(chan, "Delay", "0s")
+
+        # wait until simulation is over
+        client.shutdown()
+
+
+if __name__ == '__main__':
+    unittest.main()
+
index 569decb..70bf970 100755 (executable)
@@ -32,9 +32,8 @@
 
 from nepi.resources.ns3.ns3wrapper import NS3Wrapper
 
-import os.path
+import subprocess
 import time
-import tempfile
 import unittest
 
 class NS3WrapperTest(unittest.TestCase):
@@ -92,11 +91,10 @@ class NS3WrapperTest(unittest.TestCase):
         addresses = wrapper.invoke(ip, "Assign", devs)
 
         ### Create source
-        config = wrapper.singleton("Config")
-        
         # Config::SetDefault ("ns3::Ipv4RawSocketImpl::Protocol", StringValue ("2"));
         proto = wrapper.create("StringValue", "2")
-        wrapper.invoke(config, "SetDefault", "ns3::Ipv4RawSocketImpl::Protocol", proto)
+        wrapper.invoke("singleton::Config", "SetDefault", 
+                "ns3::Ipv4RawSocketImpl::Protocol", proto)
 
         # InetSocketAddress dst = InetSocketAddress (addresses.GetAddress (3));
         addr3 = wrapper.invoke(addresses, "GetAddress", 3)
@@ -175,10 +173,10 @@ class NS3WrapperTest(unittest.TestCase):
 
         ### configure tracing
         #csma.EnablePcapAll ("csma-ping", false);
-        wrapper.invoke(csma, "EnablePcapAll", "csma-ping-pcap", False)
+        wrapper.invoke(csma, "EnablePcapAll", "/tmp/csma-ping-pcap", False)
  
         #csma.EnableAsciiAll ("csma-ping", false);
-        wrapper.invoke(csma, "EnableAsciiAll", "csma-ping-ascii")
+        wrapper.invoke(csma, "EnableAsciiAll", "/tmp/csma-ping-ascii")
  
         def SinkRx(packet, address):
             print packet
@@ -190,27 +188,231 @@ class NS3WrapperTest(unittest.TestCase):
         #Config::ConnectWithoutContext ("/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", 
         # MakeCallback (&SinkRx));
         #cb = wrapper.create("MakeCallback", SinkRx)
-        #wrapper.invoke(config, "ConnectWithoutContext", 
+        #wrapper.invoke("singleton::Config", "ConnectWithoutContext", 
         #        "/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", cb)
 
         # Config::Connect ("/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt", 
         # MakeCallback (&PingRtt));
         #cb2 = wrapper.create("MakeCallback", PingRtt)
-        #wrapper.invoke(config, "ConnectWithoutContext", 
+        #wrapper.invoke("singleton::Config", "ConnectWithoutContext", 
         #        "/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt", 
         #        cb2)
 
         # Packet::EnablePrinting ();
-        packet = wrapper.singleton("Packet")
-        wrapper.invoke(packet, "EnablePrinting")
+        wrapper.invoke("singleton::Packet", "EnablePrinting")
 
         ### run Simulation
         # Simulator::Run ();
-        simulator = wrapper.singleton("Simulator")
-        wrapper.invoke(simulator, "Run")
+        wrapper.invoke("singleton::Simulator", "Run")
 
         # Simulator::Destroy ();
-        wrapper.invoke(simulator, "Destroy")
+        wrapper.invoke("singleton::Simulator", "Destroy")
+
+        p = subprocess.Popen("ls /tmp/csma-ping-* | wc -w", stdout = subprocess.PIPE, 
+                stderr = subprocess.PIPE, shell = True)
+        (out, err) = p.communicate()
+
+        self.assertEquals(int(out), 8)
+
+        p = subprocess.Popen("rm /tmp/csma-ping-*",  shell = True)
+        p.communicate()
+
+    def test_start(self):
+        wrapper = NS3Wrapper()
+
+        ### create 2  nodes
+        c = wrapper.create("NodeContainer")
+
+        # c.Create (2);
+        wrapper.invoke(c, "Create", 2)
+
+        ### connect the nodes to a shared channel
+        # CsmaHelper csma;
+        csma = wrapper.create("CsmaHelper")
+
+        # csma.SetChannelAttribute ("DataRate", DataRateValue (DataRate (5000000)));
+        dr = wrapper.create("DataRate", 5000000)
+        drv = wrapper.create("DataRateValue", dr)
+        wrapper.invoke(csma, "SetChannelAttribute", "DataRate", drv)
+
+        # csma.SetChannelAttribute ("Delay", TimeValue (MilliSeconds (2)));
+        ms = wrapper.create("MilliSeconds", 2)
+        delay = wrapper.create("TimeValue", ms)
+        wrapper.invoke(csma, "SetChannelAttribute", "Delay", delay)
+
+        # NetDeviceContainer devs = csma.Install (c);
+        devs = wrapper.invoke(csma, "Install", c)
+
+        ### add IP stack to all nodes
+        # InternetStackHelper ipStack;
+        ipStack = wrapper.create("InternetStackHelper")
+        
+        # ipStack.Install (c);
+        wrapper.invoke(ipStack, "Install", c)
+
+        ### assign ip addresses
+        #Ipv4AddressHelper ip;
+        ip = wrapper.create("Ipv4AddressHelper")
+
+        # ip.SetBase ("192.168.1.0", "255.255.255.0");
+        ip4 = wrapper.create("Ipv4Address", "192.168.1.0")
+        mask4 = wrapper.create("Ipv4Mask", "255.255.255.0")
+        wrapper.invoke(ip, "SetBase", ip4, mask4)
+
+        # Ipv4InterfaceContainer addresses = ip.Assign (devs);
+        addresses = wrapper.invoke(ip, "Assign", devs)
+
+        ### create pinger
+        #V4PingHelper ping = V4PingHelper (addresses.GetAddress (1));
+        addr1 = wrapper.invoke(addresses, "GetAddress", 1)
+        ping = wrapper.create("V4PingHelper", addr1)
+        btrue = wrapper.create("BooleanValue", True)
+        wrapper.invoke(ping, "SetAttribute", "Verbose", btrue)
+        
+        #apps = ping.Install (pingers);
+        n0 = wrapper.invoke(c, "Get", 0)
+        apps = wrapper.invoke(ping, "Install", n0)
+        
+        #apps.Start (Seconds (0.0));
+        s = wrapper.create ("Seconds", 0.0)
+        wrapper.invoke (apps, "Start", s)
+        
+        #apps.Stop (Seconds (5.0));
+        s = wrapper.create ("Seconds", 5.0)
+        wrapper.invoke (apps, "Stop", s)
+
+        ### run Simulation
+        # Simulator::Stop (6.0);
+        wrapper.stop(time = "6s")
+
+        # Simulator::Run ();
+        wrapper.start()
+
+        # wait until simulation is over
+        wrapper.shutdown()
+
+    def test_runtime_attr_modify(self):
+        wrapper = NS3Wrapper()
+       
+        # Define a real time simulation 
+        stype = wrapper.create("StringValue", "ns3::RealtimeSimulatorImpl")
+        wrapper.invoke("singleton::GlobalValue", "Bind", "SimulatorImplementationType", stype)
+        btrue = wrapper.create("BooleanValue", True)
+        wrapper.invoke("singleton::GlobalValue", "Bind", "ChecksumEnabled", btrue)
+
+        ### create 2  nodes
+        ## NODE 1
+        n1 = wrapper.create("Node")
+
+        ## Install internet stack
+        ipv41 = wrapper.create("Ipv4L3Protocol")
+        wrapper.invoke(n1, "AggregateObject", ipv41)
+
+        arp1 = wrapper.create("ArpL3Protocol")
+        wrapper.invoke(n1, "AggregateObject", arp1)
+        
+        icmp1 = wrapper.create("Icmpv4L4Protocol")
+        wrapper.invoke(n1, "AggregateObject", icmp1)
+
+        ## Add IPv4 routing
+        lr1 = wrapper.create("Ipv4ListRouting")
+        wrapper.invoke(ipv41, "SetRoutingProtocol", lr1)
+        sr1 = wrapper.create("Ipv4StaticRouting")
+        wrapper.invoke(lr1, "AddRoutingProtocol", sr1, 1)
+
+        ## NODE 2
+        n2 = wrapper.create("Node")
+
+        ## Install internet stack
+        ipv42 = wrapper.create("Ipv4L3Protocol")
+        wrapper.invoke(n2, "AggregateObject", ipv42)
+
+        arp2 = wrapper.create("ArpL3Protocol")
+        wrapper.invoke(n2, "AggregateObject", arp2)
+        
+        icmp2 = wrapper.create("Icmpv4L4Protocol")
+        wrapper.invoke(n2, "AggregateObject", icmp2)
+
+        ## Add IPv4 routing
+        lr2 = wrapper.create("Ipv4ListRouting")
+        wrapper.invoke(ipv42, "SetRoutingProtocol", lr2)
+        sr2 = wrapper.create("Ipv4StaticRouting")
+        wrapper.invoke(lr2, "AddRoutingProtocol", sr2, 1)
+
+        ##### Create p2p device and enable ascii tracing
+
+        p2pHelper = wrapper.create("PointToPointHelper")
+        asciiHelper = wrapper.create("AsciiTraceHelper")
+
+        # Iface for node1
+        p1 = wrapper.create("PointToPointNetDevice")
+        wrapper.invoke(n1, "AddDevice", p1)
+        q1 = wrapper.create("DropTailQueue")
+        wrapper.invoke(p1, "SetQueue", q1)
+      
+        # Add IPv4 address
+        ifindex1 = wrapper.invoke(ipv41, "AddInterface", p1)
+        mask1 = wrapper.create("Ipv4Mask", "/30")
+        addr1 = wrapper.create("Ipv4Address", "10.0.0.1")
+        inaddr1 = wrapper.create("Ipv4InterfaceAddress", addr1, mask1)
+        wrapper.invoke(ipv41, "AddAddress", ifindex1, inaddr1)
+        wrapper.invoke(ipv41, "SetMetric", ifindex1, 1)
+        wrapper.invoke(ipv41, "SetUp", ifindex1)
+
+        # Enable collection of Ascii format to a specific file
+        filepath1 = "trace-p2p-1.tr"
+        stream1 = wrapper.invoke(asciiHelper, "CreateFileStream", filepath1)
+        wrapper.invoke(p2pHelper, "EnableAscii", stream1, p1)
+       
+        # Iface for node2
+        p2 = wrapper.create("PointToPointNetDevice")
+        wrapper.invoke(n2, "AddDevice", p2)
+        q2 = wrapper.create("DropTailQueue")
+        wrapper.invoke(p2, "SetQueue", q2)
+
+        # Add IPv4 address
+        ifindex2 = wrapper.invoke(ipv42, "AddInterface", p2)
+        mask2 = wrapper.create("Ipv4Mask", "/30")
+        addr2 = wrapper.create("Ipv4Address", "10.0.0.2")
+        inaddr2 = wrapper.create("Ipv4InterfaceAddress", addr2, mask2)
+        wrapper.invoke(ipv42, "AddAddress", ifindex2, inaddr2)
+        wrapper.invoke(ipv42, "SetMetric", ifindex2, 1)
+        wrapper.invoke(ipv42, "SetUp", ifindex2)
+
+        # Enable collection of Ascii format to a specific file
+        filepath2 = "trace-p2p-2.tr"
+        stream2 = wrapper.invoke(asciiHelper, "CreateFileStream", filepath2)
+        wrapper.invoke(p2pHelper, "EnableAscii", stream2, p2)
+
+        # Create channel
+        chan = wrapper.create("PointToPointChannel")
+        wrapper.set(chan, "Delay", "0s")
+        wrapper.invoke(p1, "Attach", chan)
+        wrapper.invoke(p2, "Attach", chan)
+
+        ### create pinger
+        ping = wrapper.create("V4Ping")
+        wrapper.invoke(n1, "AddApplication", ping)
+        wrapper.set (ping, "Remote", "10.0.0.2")
+        wrapper.set (ping, "Interval", "1s")
+        wrapper.set (ping, "Verbose", True)
+        wrapper.set (ping, "StartTime", "0s")
+        wrapper.set (ping, "StopTime", "20s")
+
+        ### run Simulation
+        wrapper.stop(time = "21s")
+        wrapper.start()
+
+        time.sleep(1)
+
+        wrapper.set(chan, "Delay", "5s")
+
+        time.sleep(5)
+
+        wrapper.set(chan, "Delay", "0s")
+
+        # wait until simulation is over
+        wrapper.shutdown()
 
 if __name__ == '__main__':
     unittest.main()