--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2014 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import base64
+import cPickle
+import errno
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+
+from nepi.resources.ns3.ns3client import NS3Client
+from nepi.resources.ns3.ns3server import NS3WrapperMessage
+
+class LinuxNS3Client(NS3Client):
+ def __init__(self, socket_name):
+ super(LinuxNS3Client, self).__init__()
+ self._socket_name = socket_name
+
+ @property
+ def socket_name(self):
+ return self._socket_name
+
+ def send_msg(self, msg, *args):
+ args = list(args)
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.connect(self.socket_name)
+
+ args.insert(0, msg)
+ def encode(arg):
+ arg = cPickle.dumps(arg)
+ return base64.b64encode(arg)
+
+ encoded = "|".join(map(encode, args))
+ sock.send("%s\n" % encoded)
+
+ reply = sock.recv(1024)
+ return cPickle.loads(base64.b64decode(reply))
+
+ def create(self, clazzname, *args):
+ args = list(args)
+ args.insert(0, clazzname)
+
+ return self.send_msg(NS3WrapperMessage.CREATE, *args)
+
+ def invoke(self, uuid, operation, *args):
+ args = list(args)
+ args.insert(0, operation)
+ args.insert(0, uuid)
+
+ return self.send_msg(NS3WrapperMessage.INVOKE, *args)
+
+ def set(self, uuid, name, value):
+ args = [uuid, name, value]
+
+ return self.send_msg(NS3WrapperMessage.SET, *args)
+
+ def get(self, uuid, name):
+ args = [uuid, name]
+
+ return self.send_msg(NS3WrapperMessage.GET, *args)
+
+ def trace(self, *args):
+ return self.send_msg(NS3WrapperMessage.TRACE, *args)
+
+ def start(self):
+ return self.send_msg(NS3WrapperMessage.START, [])
+
+ def stop(self, time = None):
+ args = None
+ if time:
+ args = [time]
+
+ return self.send_msg(NS3WrapperMessage.STOP, *args)
+
+ def shutdown(self):
+ return self.send_msg(NS3WrapperMessage.SHUTDOWN, [])
+
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2014 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import base64
+import cPickle
+import errno
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+
+from ns3server import NS3WrapperMessage
+
+class NS3Client(object):
+ """ Common Interface for NS3 client classes """
+ def __init__(self):
+ super(NS3Client, self).__init__()
+
+ def create(self, clazzname, *args):
+ pass
+
+ def invoke(self, uuid, operation, *args):
+ pass
+
+ def set(self, uuid, name, value):
+ pass
+
+ def get(self, uuid, name):
+ pass
+
+ def trace(self, *args):
+ pass
+
+ def start(self):
+ pass
+
+ def stop(self, time = None):
+ pass
+
+ def shutdown(self):
+ pass
+
--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2014 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+import base64
+import cPickle
+import errno
+import socket
+from optparse import OptionParser, SUPPRESS_HELP
+
+from ns3wrapper import NS3Wrapper
+
+class NS3WrapperMessage:
+ CREATE = "CREATE"
+ INVOKE = "INVOKE"
+ SET = "SET"
+ GET = "GET"
+ TRACE = "TRACE"
+ START = "START"
+ STOP = "STOP"
+ SHUTDOWN = "SHUTDOWN"
+
+def handle_message(ns3_wrapper, msg, args):
+ if msg == NS3WrapperMessage.SHUTDOWN:
+ ns3_wrapper.shutdown()
+ return "BYEBYE"
+
+ if msg == NS3WrapperMessage.STOP:
+ time = None
+ if args:
+ time = args[0]
+
+ ns3_wrapper.stop(time=time)
+ return "STOPPED"
+
+ if msg == NS3WrapperMessage.START:
+ ns3_wrapper.start()
+ return "STARTED"
+
+ if msg == NS3WrapperMessage.CREATE:
+ clazzname = args.pop(0)
+
+ uuid = ns3_wrapper.create(clazzname, *args)
+ return uuid
+
+ if msg == NS3WrapperMessage.INVOKE:
+ uuid = args.pop(0)
+ operation = args.pop(0)
+
+ uuid = ns3_wrapper.invoke(uuid, operation, *args)
+ return uuid
+
+ if msg == NS3WrapperMessage.GET:
+ uuid = args.pop(0)
+ name = args.pop(0)
+
+ value = ns3_wrapper.get(uuid, name)
+ return value
+
+ if msg == NS3WrapperMessage.SET:
+ uuid = args.pop(0)
+ name = args.pop(0)
+ value = args.pop(0)
+
+ value = ns3_wrapper.set(uuid, name, value)
+ return value
+
+ if msg == NS3WrapperMessage.TRACE:
+ return "NOT IMPLEMENTED"
+
+def create_socket(socket_name):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.bind(socket_name)
+ return sock
+
+def recv_msg(conn):
+ msg = []
+ chunk = ''
+
+ while '\n' not in chunk:
+ try:
+ chunk = conn.recv(1024)
+ except (OSError, socket.error), e:
+ if e[0] != errno.EINTR:
+ raise
+ # Ignore eintr errors
+ continue
+
+ if chunk:
+ msg.append(chunk)
+ else:
+ # empty chunk = EOF
+ break
+
+ msg = ''.join(msg).split('\n')[0]
+
+ # The message might have arguments that will be appended
+ # as a '|' separated list after the message identifier
+ def decode(arg):
+ arg = base64.b64decode(arg).rstrip()
+ return cPickle.loads(arg)
+
+ dargs = map(decode, msg.split("|"))
+
+ # decoded message
+ dmsg = dargs.pop(0)
+
+ return (dmsg, dargs)
+
+def send_reply(conn, reply):
+ encoded = base64.b64encode(cPickle.dumps(reply))
+ conn.send("%s\n" % encoded)
+
+def get_options():
+ usage = ("usage: %prog -S <socket-name>")
+
+ parser = OptionParser(usage = usage)
+
+ parser.add_option("-S", "--socket-name", dest="socket_name",
+ help = "Name for the unix socket used to interact with this process",
+ default = "tap.sock", type="str")
+
+ (options, args) = parser.parse_args()
+
+ return options.socket_name
+
+def run_server(socket_name):
+ ns3_wrapper = NS3Wrapper()
+
+ # create unix socket to receive instructions
+ sock = create_socket(socket_name)
+ sock.listen(0)
+
+ # wait for messages to arrive and process them
+ stop = False
+
+ while not stop:
+ conn, addr = sock.accept()
+ conn.settimeout(5)
+
+ try:
+ (msg, args) = recv_msg(conn)
+ except socket.timeout, e:
+ # Ingore time-out
+ continue
+
+ if not msg:
+ # Ignore - connection lost
+ break
+
+ ns3_wrapper.logger.debug("Message received %s args %s" % ( msg, str(args)))
+
+ if msg == NS3WrapperMessage.SHUTDOWN:
+ stop = True
+
+ reply = handle_message(ns3_wrapper, msg, args)
+
+ try:
+ send_reply(conn, reply)
+ except socket.error:
+ break
+
+if __name__ == '__main__':
+
+ socket_name = get_options()
+
+ run_server(socket_name)
+
import os
import sys
import threading
+import time
import uuid
+# TODO:
+# 1. ns-3 classes should be identified as ns3::clazzname?
+#
+
+SINGLETON = "singleton::"
+
class NS3Wrapper(object):
def __init__(self, homedir = None):
super(NS3Wrapper, self).__init__()
self._condition = None
self._started = False
- self._stopped = False
# holds reference to all C++ objects and variables in the simulation
self._objects = dict()
# exposed through the Python bindings
self._tids = dict()
- # Generate unique identifier for the simulation wrapper
- self._uuid = self.make_uuid()
-
# create home dir (where all simulation related files will end up)
- self._homedir = homedir or os.path.join("/tmp", self.uuid)
+ self._homedir = homedir or os.path.join("/", "tmp", "ns3_wrapper" )
home = os.path.normpath(self.homedir)
if not os.path.exists(home):
# Logging
loglevel = os.environ.get("NS3LOGLEVEL", "debug")
- self._logger = logging.getLogger("ns3wrapper.%s" % self.uuid)
+ self._logger = logging.getLogger("ns3wrapper")
self._logger.setLevel(getattr(logging, loglevel.upper()))
hdlr = logging.FileHandler(os.path.join(self.homedir, "ns3wrapper.log"))
# Load ns-3 shared libraries and import modules
self._load_ns3_module()
- # Add module as anoter object, so we can reference it later
- self._objects[self.uuid] = self.ns3
-
@property
def ns3(self):
return self._ns3
def homedir(self):
return self._homedir
- @property
- def uuid(self):
- return self._uuid
-
@property
def logger(self):
return self._logger
+ @property
+ def is_running(self):
+ return self._started and self._ns3 and not self.ns3.Simulator.IsFinished()
+
def make_uuid(self):
return "uuid%s" % uuid.uuid4()
- def is_running(self):
- return self._started and not self._stopped
-
def get_object(self, uuid):
return self._objects.get(uuid)
def get_typeid(self, uuid):
return self._tids.get(uuid)
- def singleton(self, clazzname):
- uuid = "uuid%s"%clazzname
-
- if not uuid in self._objects:
- if not hasattr(self.ns3, clazzname):
- msg = "Type %s not supported" % (typeid)
- self.logger.error(msg)
-
- clazz = getattr(self.ns3, clazzname)
- self._objects[uuid] = clazz
-
- typeid = "ns3::%s" % clazzname
- self._tids[uuid] = typeid
-
- return uuid
-
def create(self, clazzname, *args):
if not hasattr(self.ns3, clazzname):
msg = "Type %s not supported" % (clazzname)
self.logger.error(msg)
-
- realargs = [self.get_object(arg) if \
- str(arg).startswith("uuid") else arg for arg in args]
-
+
clazz = getattr(self.ns3, clazzname)
+
+ # arguments starting with 'uuid' identify ns-3 C++
+ # objects and must be replaced by the actual object
+ realargs = self.replace_args(args)
+
obj = clazz(*realargs)
uuid = self.make_uuid()
return uuid
def invoke(self, uuid, operation, *args):
- obj = self.get_object(uuid)
-
+ if uuid.startswith(SINGLETON):
+ obj = self._singleton(uuid)
+ else:
+ obj = self.get_object(uuid)
+
method = getattr(obj, operation)
- # arguments starting with 'uuid' identifie stored
+ # arguments starting with 'uuid' identify ns-3 C++
# objects and must be replaced by the actual object
- realargs = [self.get_object(arg) if \
- str(arg).startswith("uuid") else arg for arg in args]
+ realargs = self.replace_args(args)
result = method(*realargs)
# to set the value by scheduling an event, else
# we risk to corrupt the state of the
# simulation.
- if self._is_running:
+ if self.is_running:
# schedule the event in the Simulator
self._schedule_event(self._condition, set_attr, obj,
name, ns3_value)
else:
set_attr(obj, name, ns3_value)
+ return value
+
def get(self, uuid, name):
obj = self.get_object(uuid)
ns3_value = self._create_ns3_value(uuid, name)
def get_attr(obj, name, ns3_value):
obj.GetAttribute(name, ns3_value)
- if self._is_running:
+ if self.is_running:
# schedule the event in the Simulator
self._schedule_event(self._condition, get_attr, obj,
name, ns3_value)
self.ns3.Simulator.Stop()
else:
self.ns3.Simulator.Stop(self.ns3.Time(time))
- self._stopped = True
def shutdown(self):
if self.ns3:
- if not self.ns3.Simulator.IsFinished():
- self.stop()
+ while not self.ns3.Simulator.IsFinished():
+ #self.logger.debug("Waiting for simulation to finish")
+ time.sleep(0.5)
# TODO!!!! SHOULD WAIT UNTIL THE THREAD FINISHES
if self._simulator_thread:
ns3_value.DeserializeFromString(str_value, checker)
return ns3_value
+ # singletons are identified as "ns3::ClassName"
+ def _singleton(self, ident):
+ if not ident.startswith(SINGLETON):
+ return None
+
+ clazzname = ident[ident.find("::")+2:]
+ if not hasattr(self.ns3, clazzname):
+ msg = "Type %s not supported" % (clazzname)
+ self.logger.error(msg)
+
+ return getattr(self.ns3, clazzname)
+
+ # replace uuids and singleton references for the real objects
+ def replace_args(self, args):
+ realargs = [self.get_object(arg) if \
+ str(arg).startswith("uuid") else arg for arg in args]
+
+ realargs = [self._singleton(arg) if \
+ str(arg).startswith(SINGLETON) else arg for arg in realargs]
+
+ return realargs
+
def _load_ns3_module(self):
if self.ns3:
return
+++ /dev/null
-#
-# NEPI, a framework to manage network experiments
-# Copyright (C) 2013 INRIA
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
-
-
-
-class Server(object):
- def __init__(self, root_dir = ".", log_level = "ERROR",
- environment_setup = "", clean_root = False):
- self._root_dir = root_dir
- self._clean_root = clean_root
- self._stop = False
- self._ctrl_sock = None
- self._log_level = log_level
- self._rdbuf = ""
- self._environment_setup = environment_setup
-
- def run(self):
- try:
- if self.daemonize():
- self.post_daemonize()
- self.loop()
- self.cleanup()
- # ref: "os._exit(0)"
- # can not return normally after fork beacuse no exec was done.
- # This means that if we don't do a os._exit(0) here the code that
- # follows the call to "Server.run()" in the "caller code" will be
- # executed... but by now it has already been executed after the
- # first process (the one that did the first fork) returned.
- os._exit(0)
- except:
- print >>sys.stderr, "SERVER_ERROR."
- self.log_error()
- self.cleanup()
- os._exit(0)
- print >>sys.stderr, "SERVER_READY."
-
- def daemonize(self):
- # pipes for process synchronization
- (r, w) = os.pipe()
-
- # build root folder
- root = os.path.normpath(self._root_dir)
- if self._root_dir not in [".", ""] and os.path.exists(root) \
- and self._clean_root:
- shutil.rmtree(root)
- if not os.path.exists(root):
- os.makedirs(root, 0755)
-
- pid1 = os.fork()
- if pid1 > 0:
- os.close(w)
- while True:
- try:
- os.read(r, 1)
- except OSError, e: # pragma: no cover
- if e.errno == errno.EINTR:
- continue
- else:
- raise
- break
- os.close(r)
- # os.waitpid avoids leaving a <defunc> (zombie) process
- st = os.waitpid(pid1, 0)[1]
- if st:
- raise RuntimeError("Daemonization failed")
- # return 0 to inform the caller method that this is not the
- # daemonized process
- return 0
- os.close(r)
-
- # Decouple from parent environment.
- os.chdir(self._root_dir)
- os.umask(0)
- os.setsid()
-
- # fork 2
- pid2 = os.fork()
- if pid2 > 0:
- # see ref: "os._exit(0)"
- os._exit(0)
-
- # close all open file descriptors.
- max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
- if (max_fd == resource.RLIM_INFINITY):
- max_fd = MAX_FD
- for fd in range(3, max_fd):
- if fd != w:
- try:
- os.close(fd)
- except OSError:
- pass
-
- # Redirect standard file descriptors.
- stdin = open(DEV_NULL, "r")
- stderr = stdout = open(STD_ERR, "a", 0)
- os.dup2(stdin.fileno(), sys.stdin.fileno())
- # NOTE: sys.stdout.write will still be buffered, even if the file
- # was opened with 0 buffer
- os.dup2(stdout.fileno(), sys.stdout.fileno())
- os.dup2(stderr.fileno(), sys.stderr.fileno())
-
- # setup environment
- if self._environment_setup:
- # parse environment variables and pass to child process
- # do it by executing shell commands, in case there's some heavy setup involved
- envproc = subprocess.Popen(
- [ "bash", "-c",
- "( %s python -c 'import os,sys ; print \"\\x01\".join(\"\\x02\".join(map(str,x)) for x in os.environ.iteritems())' ) | tail -1" %
- ( self._environment_setup, ) ],
- stdin = subprocess.PIPE,
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE
- )
- out,err = envproc.communicate()
-
- # parse new environment
- if out:
- environment = dict(map(lambda x:x.split("\x02"), out.split("\x01")))
-
- # apply to current environment
- for name, value in environment.iteritems():
- os.environ[name] = value
-
- # apply pythonpath
- if 'PYTHONPATH' in environment:
- sys.path = environment['PYTHONPATH'].split(':') + sys.path
-
- # create control socket
- self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- try:
- self._ctrl_sock.bind(CTRL_SOCK)
- except socket.error:
- # Address in use, check pidfile
- pid = None
- try:
- pidfile = open(CTRL_PID, "r")
- pid = pidfile.read()
- pidfile.close()
- pid = int(pid)
- except:
- # no pidfile
- pass
-
- if pid is not None:
- # Check process liveliness
- if not os.path.exists("/proc/%d" % (pid,)):
- # Ok, it's dead, clean the socket
- os.remove(CTRL_SOCK)
-
- # try again
- self._ctrl_sock.bind(CTRL_SOCK)
-
- self._ctrl_sock.listen(0)
-
- # Save pidfile
- pidfile = open(CTRL_PID, "w")
- pidfile.write(str(os.getpid()))
- pidfile.close()
-
- # let the parent process know that the daemonization is finished
- os.write(w, "\n")
- os.close(w)
- return 1
-
- def post_daemonize(self):
- os.environ["NEPI_CONTROLLER_LOGLEVEL"] = self._log_level
- # QT, for some strange reason, redefines the SIGCHILD handler to write
- # a \0 to a fd (lets say fileno 'x'), when ever a SIGCHILD is received.
- # Server dameonization closes all file descriptors from fileno '3',
- # but the overloaded handler (inherited by the forked process) will
- # keep trying to write the \0 to fileno 'x', which might have been reused
- # after closing, for other operations. This is bad bad bad when fileno 'x'
- # is in use for communication pouroses, because unexpected \0 start
- # appearing in the communication messages... this is exactly what happens
- # when using netns in daemonized form. Thus, be have no other alternative than
- # restoring the SIGCHLD handler to the default here.
- import signal
- signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-
- def loop(self):
- while not self._stop:
- conn, addr = self._ctrl_sock.accept()
- self.log_error("ACCEPTED CONNECTION: %s" % (addr,))
- conn.settimeout(5)
- while not self._stop:
- try:
- msg = self.recv_msg(conn)
- except socket.timeout, e:
- #self.log_error("SERVER recv_msg: connection timedout ")
- continue
-
- if not msg:
- self.log_error("CONNECTION LOST")
- break
-
- if msg == STOP_MSG:
- self._stop = True
- reply = self.stop_action()
- else:
- reply = self.reply_action(msg)
-
- try:
- self.send_reply(conn, reply)
- except socket.error:
- self.log_error()
- self.log_error("NOTICE: Awaiting for reconnection")
- break
- try:
- conn.close()
- except:
- # Doesn't matter
- self.log_error()
-
- def recv_msg(self, conn):
- data = [self._rdbuf]
- chunk = data[0]
- while '\n' not in chunk:
- try:
- chunk = conn.recv(1024)
- except (OSError, socket.error), e:
- if e[0] != errno.EINTR:
- raise
- else:
- continue
- if chunk:
- data.append(chunk)
- else:
- # empty chunk = EOF
- break
- data = ''.join(data).split('\n',1)
- while len(data) < 2:
- data.append('')
- data, self._rdbuf = data
-
- decoded = base64.b64decode(data)
- return decoded.rstrip()
-
- def send_reply(self, conn, reply):
- encoded = base64.b64encode(reply)
- conn.send("%s\n" % encoded)
-
- def cleanup(self):
- try:
- self._ctrl_sock.close()
- os.remove(CTRL_SOCK)
- except:
- self.log_error()
-
- def stop_action(self):
- return "Stopping server"
-
- def reply_action(self, msg):
- return "Reply to: %s" % msg
-
- def log_error(self, text = None, context = ''):
- if text == None:
- text = traceback.format_exc()
- date = time.strftime("%Y-%m-%d %H:%M:%S")
- if context:
- context = " (%s)" % (context,)
- sys.stderr.write("ERROR%s: %s\n%s\n" % (context, date, text))
- return text
-
- def log_debug(self, text):
- if self._log_level == DC.DEBUG_LEVEL:
- date = time.strftime("%Y-%m-%d %H:%M:%S")
- sys.stderr.write("DEBUG: %s\n%s\n" % (date, text))
-
-class Forwarder(object):
- def __init__(self, root_dir = "."):
- self._ctrl_sock = None
- self._root_dir = root_dir
- self._stop = False
- self._rdbuf = ""
-
- def forward(self):
- self.connect()
- print >>sys.stderr, "FORWARDER_READY."
- while not self._stop:
- data = self.read_data()
- if not data:
- # Connection to client lost
- break
- self.send_to_server(data)
-
- data = self.recv_from_server()
- if not data:
- # Connection to server lost
- raise IOError, "Connection to server lost while "\
- "expecting response"
- self.write_data(data)
- self.disconnect()
-
- def read_data(self):
- return sys.stdin.readline()
-
- def write_data(self, data):
- sys.stdout.write(data)
- # sys.stdout.write is buffered, this is why we need to do a flush()
- sys.stdout.flush()
-
- def send_to_server(self, data):
- try:
- self._ctrl_sock.send(data)
- except (IOError, socket.error), e:
- if e[0] == errno.EPIPE:
- self.connect()
- self._ctrl_sock.send(data)
- else:
- raise e
- encoded = data.rstrip()
- msg = base64.b64decode(encoded)
- if msg == STOP_MSG:
- self._stop = True
-
- def recv_from_server(self):
- data = [self._rdbuf]
- chunk = data[0]
- while '\n' not in chunk:
- try:
- chunk = self._ctrl_sock.recv(1024)
- except (OSError, socket.error), e:
- if e[0] != errno.EINTR:
- raise
- continue
- if chunk:
- data.append(chunk)
- else:
- # empty chunk = EOF
- break
- data = ''.join(data).split('\n',1)
- while len(data) < 2:
- data.append('')
- data, self._rdbuf = data
-
- return data+'\n'
-
- def connect(self):
- self.disconnect()
- self._ctrl_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock_addr = os.path.join(self._root_dir, CTRL_SOCK)
- self._ctrl_sock.connect(sock_addr)
-
- def disconnect(self):
- try:
- self._ctrl_sock.close()
- except:
- pass
-
--- /dev/null
+#!/usr/bin/env python
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+
+# Test based on ns-3 csma/examples/csma-ping.cc file
+#
+# Network topology
+#
+# n0 n1 n2 n3
+# | | | |
+# -----------------
+#
+# node n0 sends IGMP traffic to node n3
+
+
+from nepi.resources.ns3.ns3server import run_server
+from nepi.resources.ns3.linuxns3client import LinuxNS3Client
+
+import os
+import threading
+import time
+import unittest
+
+class LinuxNS3ClientTest(unittest.TestCase):
+ def setUp(self):
+ self.socket_name = os.path.join("/", "tmp", "NS3WrapperServer.sock")
+
+ def tearDown(self):
+ os.remove(self.socket_name)
+
+ def test_runtime_attr_modify(self):
+ thread = threading.Thread(target = run_server,
+ args = [self.socket_name])
+
+ thread.setDaemon(True)
+ thread.start()
+
+ time.sleep(3)
+
+ # Verify that the communication socket was created
+ self.assertTrue(os.path.exists(self.socket_name))
+
+ # Instantiate the NS3 client
+ client = LinuxNS3Client(self.socket_name)
+
+ # Define a real time simulation
+ stype = client.create("StringValue", "ns3::RealtimeSimulatorImpl")
+ client.invoke("singleton::GlobalValue", "Bind", "SimulatorImplementationType", stype)
+ btrue = client.create("BooleanValue", True)
+ client.invoke("singleton::GlobalValue", "Bind", "ChecksumEnabled", btrue)
+
+ # Create Node
+ n1 = client.create("Node")
+ self.assertTrue(n1.startswith("uuid"))
+
+ ## Install internet stack
+ ipv41 = client.create("Ipv4L3Protocol")
+ client.invoke(n1, "AggregateObject", ipv41)
+
+ arp1 = client.create("ArpL3Protocol")
+ client.invoke(n1, "AggregateObject", arp1)
+
+ icmp1 = client.create("Icmpv4L4Protocol")
+ client.invoke(n1, "AggregateObject", icmp1)
+
+ ## Add IPv4 routing
+ lr1 = client.create("Ipv4ListRouting")
+ client.invoke(ipv41, "SetRoutingProtocol", lr1)
+ sr1 = client.create("Ipv4StaticRouting")
+ client.invoke(lr1, "AddRoutingProtocol", sr1, 1)
+
+ ## NODE 2
+ n2 = client.create("Node")
+
+ ## Install internet stack
+ ipv42 = client.create("Ipv4L3Protocol")
+ client.invoke(n2, "AggregateObject", ipv42)
+
+ arp2 = client.create("ArpL3Protocol")
+ client.invoke(n2, "AggregateObject", arp2)
+
+ icmp2 = client.create("Icmpv4L4Protocol")
+ client.invoke(n2, "AggregateObject", icmp2)
+
+ ## Add IPv4 routing
+ lr2 = client.create("Ipv4ListRouting")
+ client.invoke(ipv42, "SetRoutingProtocol", lr2)
+ sr2 = client.create("Ipv4StaticRouting")
+ client.invoke(lr2, "AddRoutingProtocol", sr2, 1)
+
+ ##### Create p2p device and enable ascii tracing
+ p2pHelper = client.create("PointToPointHelper")
+ asciiHelper = client.create("AsciiTraceHelper")
+
+ # Iface for node1
+ p1 = client.create("PointToPointNetDevice")
+ client.invoke(n1, "AddDevice", p1)
+ q1 = client.create("DropTailQueue")
+ client.invoke(p1, "SetQueue", q1)
+
+ # Add IPv4 address
+ ifindex1 = client.invoke(ipv41, "AddInterface", p1)
+ mask1 = client.create("Ipv4Mask", "/30")
+ addr1 = client.create("Ipv4Address", "10.0.0.1")
+ inaddr1 = client.create("Ipv4InterfaceAddress", addr1, mask1)
+ client.invoke(ipv41, "AddAddress", ifindex1, inaddr1)
+ client.invoke(ipv41, "SetMetric", ifindex1, 1)
+ client.invoke(ipv41, "SetUp", ifindex1)
+
+ # Enable collection of Ascii format to a specific file
+ filepath1 = "trace-p2p-1.tr"
+ stream1 = client.invoke(asciiHelper, "CreateFileStream", filepath1)
+ client.invoke(p2pHelper, "EnableAscii", stream1, p1)
+
+ # Iface for node2
+ p2 = client.create("PointToPointNetDevice")
+ client.invoke(n2, "AddDevice", p2)
+ q2 = client.create("DropTailQueue")
+ client.invoke(p2, "SetQueue", q2)
+
+ # Add IPv4 address
+ ifindex2 = client.invoke(ipv42, "AddInterface", p2)
+ mask2 = client.create("Ipv4Mask", "/30")
+ addr2 = client.create("Ipv4Address", "10.0.0.2")
+ inaddr2 = client.create("Ipv4InterfaceAddress", addr2, mask2)
+ client.invoke(ipv42, "AddAddress", ifindex2, inaddr2)
+ client.invoke(ipv42, "SetMetric", ifindex2, 1)
+ client.invoke(ipv42, "SetUp", ifindex2)
+
+ # Enable collection of Ascii format to a specific file
+ filepath2 = "trace-p2p-2.tr"
+ stream2 = client.invoke(asciiHelper, "CreateFileStream", filepath2)
+ client.invoke(p2pHelper, "EnableAscii", stream2, p2)
+
+ # Create channel
+ chan = client.create("PointToPointChannel")
+ client.set(chan, "Delay", "0s")
+ client.invoke(p1, "Attach", chan)
+ client.invoke(p2, "Attach", chan)
+
+ ### create pinger
+ ping = client.create("V4Ping")
+ client.invoke(n1, "AddApplication", ping)
+ client.set (ping, "Remote", "10.0.0.2")
+ client.set (ping, "Interval", "1s")
+ client.set (ping, "Verbose", True)
+ client.set (ping, "StartTime", "0s")
+ client.set (ping, "StopTime", "20s")
+
+ ### run Simulation
+ client.stop(time = "21s")
+ client.start()
+
+ time.sleep(1)
+
+ client.set(chan, "Delay", "5s")
+
+ time.sleep(5)
+
+ client.set(chan, "Delay", "0s")
+
+ # wait until simulation is over
+ client.shutdown()
+
+
+if __name__ == '__main__':
+ unittest.main()
+
from nepi.resources.ns3.ns3wrapper import NS3Wrapper
-import os.path
+import subprocess
import time
-import tempfile
import unittest
class NS3WrapperTest(unittest.TestCase):
addresses = wrapper.invoke(ip, "Assign", devs)
### Create source
- config = wrapper.singleton("Config")
-
# Config::SetDefault ("ns3::Ipv4RawSocketImpl::Protocol", StringValue ("2"));
proto = wrapper.create("StringValue", "2")
- wrapper.invoke(config, "SetDefault", "ns3::Ipv4RawSocketImpl::Protocol", proto)
+ wrapper.invoke("singleton::Config", "SetDefault",
+ "ns3::Ipv4RawSocketImpl::Protocol", proto)
# InetSocketAddress dst = InetSocketAddress (addresses.GetAddress (3));
addr3 = wrapper.invoke(addresses, "GetAddress", 3)
### configure tracing
#csma.EnablePcapAll ("csma-ping", false);
- wrapper.invoke(csma, "EnablePcapAll", "csma-ping-pcap", False)
+ wrapper.invoke(csma, "EnablePcapAll", "/tmp/csma-ping-pcap", False)
#csma.EnableAsciiAll ("csma-ping", false);
- wrapper.invoke(csma, "EnableAsciiAll", "csma-ping-ascii")
+ wrapper.invoke(csma, "EnableAsciiAll", "/tmp/csma-ping-ascii")
def SinkRx(packet, address):
print packet
#Config::ConnectWithoutContext ("/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx",
# MakeCallback (&SinkRx));
#cb = wrapper.create("MakeCallback", SinkRx)
- #wrapper.invoke(config, "ConnectWithoutContext",
+ #wrapper.invoke("singleton::Config", "ConnectWithoutContext",
# "/NodeList/3/ApplicationList/0/$ns3::PacketSink/Rx", cb)
# Config::Connect ("/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt",
# MakeCallback (&PingRtt));
#cb2 = wrapper.create("MakeCallback", PingRtt)
- #wrapper.invoke(config, "ConnectWithoutContext",
+ #wrapper.invoke("singleton::Config", "ConnectWithoutContext",
# "/NodeList/*/ApplicationList/*/$ns3::V4Ping/Rtt",
# cb2)
# Packet::EnablePrinting ();
- packet = wrapper.singleton("Packet")
- wrapper.invoke(packet, "EnablePrinting")
+ wrapper.invoke("singleton::Packet", "EnablePrinting")
### run Simulation
# Simulator::Run ();
- simulator = wrapper.singleton("Simulator")
- wrapper.invoke(simulator, "Run")
+ wrapper.invoke("singleton::Simulator", "Run")
# Simulator::Destroy ();
- wrapper.invoke(simulator, "Destroy")
+ wrapper.invoke("singleton::Simulator", "Destroy")
+
+ p = subprocess.Popen("ls /tmp/csma-ping-* | wc -w", stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE, shell = True)
+ (out, err) = p.communicate()
+
+ self.assertEquals(int(out), 8)
+
+ p = subprocess.Popen("rm /tmp/csma-ping-*", shell = True)
+ p.communicate()
+
+ def test_start(self):
+ wrapper = NS3Wrapper()
+
+ ### create 2 nodes
+ c = wrapper.create("NodeContainer")
+
+ # c.Create (2);
+ wrapper.invoke(c, "Create", 2)
+
+ ### connect the nodes to a shared channel
+ # CsmaHelper csma;
+ csma = wrapper.create("CsmaHelper")
+
+ # csma.SetChannelAttribute ("DataRate", DataRateValue (DataRate (5000000)));
+ dr = wrapper.create("DataRate", 5000000)
+ drv = wrapper.create("DataRateValue", dr)
+ wrapper.invoke(csma, "SetChannelAttribute", "DataRate", drv)
+
+ # csma.SetChannelAttribute ("Delay", TimeValue (MilliSeconds (2)));
+ ms = wrapper.create("MilliSeconds", 2)
+ delay = wrapper.create("TimeValue", ms)
+ wrapper.invoke(csma, "SetChannelAttribute", "Delay", delay)
+
+ # NetDeviceContainer devs = csma.Install (c);
+ devs = wrapper.invoke(csma, "Install", c)
+
+ ### add IP stack to all nodes
+ # InternetStackHelper ipStack;
+ ipStack = wrapper.create("InternetStackHelper")
+
+ # ipStack.Install (c);
+ wrapper.invoke(ipStack, "Install", c)
+
+ ### assign ip addresses
+ #Ipv4AddressHelper ip;
+ ip = wrapper.create("Ipv4AddressHelper")
+
+ # ip.SetBase ("192.168.1.0", "255.255.255.0");
+ ip4 = wrapper.create("Ipv4Address", "192.168.1.0")
+ mask4 = wrapper.create("Ipv4Mask", "255.255.255.0")
+ wrapper.invoke(ip, "SetBase", ip4, mask4)
+
+ # Ipv4InterfaceContainer addresses = ip.Assign (devs);
+ addresses = wrapper.invoke(ip, "Assign", devs)
+
+ ### create pinger
+ #V4PingHelper ping = V4PingHelper (addresses.GetAddress (1));
+ addr1 = wrapper.invoke(addresses, "GetAddress", 1)
+ ping = wrapper.create("V4PingHelper", addr1)
+ btrue = wrapper.create("BooleanValue", True)
+ wrapper.invoke(ping, "SetAttribute", "Verbose", btrue)
+
+ #apps = ping.Install (pingers);
+ n0 = wrapper.invoke(c, "Get", 0)
+ apps = wrapper.invoke(ping, "Install", n0)
+
+ #apps.Start (Seconds (0.0));
+ s = wrapper.create ("Seconds", 0.0)
+ wrapper.invoke (apps, "Start", s)
+
+ #apps.Stop (Seconds (5.0));
+ s = wrapper.create ("Seconds", 5.0)
+ wrapper.invoke (apps, "Stop", s)
+
+ ### run Simulation
+ # Simulator::Stop (6.0);
+ wrapper.stop(time = "6s")
+
+ # Simulator::Run ();
+ wrapper.start()
+
+ # wait until simulation is over
+ wrapper.shutdown()
+
+ def test_runtime_attr_modify(self):
+ wrapper = NS3Wrapper()
+
+ # Define a real time simulation
+ stype = wrapper.create("StringValue", "ns3::RealtimeSimulatorImpl")
+ wrapper.invoke("singleton::GlobalValue", "Bind", "SimulatorImplementationType", stype)
+ btrue = wrapper.create("BooleanValue", True)
+ wrapper.invoke("singleton::GlobalValue", "Bind", "ChecksumEnabled", btrue)
+
+ ### create 2 nodes
+ ## NODE 1
+ n1 = wrapper.create("Node")
+
+ ## Install internet stack
+ ipv41 = wrapper.create("Ipv4L3Protocol")
+ wrapper.invoke(n1, "AggregateObject", ipv41)
+
+ arp1 = wrapper.create("ArpL3Protocol")
+ wrapper.invoke(n1, "AggregateObject", arp1)
+
+ icmp1 = wrapper.create("Icmpv4L4Protocol")
+ wrapper.invoke(n1, "AggregateObject", icmp1)
+
+ ## Add IPv4 routing
+ lr1 = wrapper.create("Ipv4ListRouting")
+ wrapper.invoke(ipv41, "SetRoutingProtocol", lr1)
+ sr1 = wrapper.create("Ipv4StaticRouting")
+ wrapper.invoke(lr1, "AddRoutingProtocol", sr1, 1)
+
+ ## NODE 2
+ n2 = wrapper.create("Node")
+
+ ## Install internet stack
+ ipv42 = wrapper.create("Ipv4L3Protocol")
+ wrapper.invoke(n2, "AggregateObject", ipv42)
+
+ arp2 = wrapper.create("ArpL3Protocol")
+ wrapper.invoke(n2, "AggregateObject", arp2)
+
+ icmp2 = wrapper.create("Icmpv4L4Protocol")
+ wrapper.invoke(n2, "AggregateObject", icmp2)
+
+ ## Add IPv4 routing
+ lr2 = wrapper.create("Ipv4ListRouting")
+ wrapper.invoke(ipv42, "SetRoutingProtocol", lr2)
+ sr2 = wrapper.create("Ipv4StaticRouting")
+ wrapper.invoke(lr2, "AddRoutingProtocol", sr2, 1)
+
+ ##### Create p2p device and enable ascii tracing
+
+ p2pHelper = wrapper.create("PointToPointHelper")
+ asciiHelper = wrapper.create("AsciiTraceHelper")
+
+ # Iface for node1
+ p1 = wrapper.create("PointToPointNetDevice")
+ wrapper.invoke(n1, "AddDevice", p1)
+ q1 = wrapper.create("DropTailQueue")
+ wrapper.invoke(p1, "SetQueue", q1)
+
+ # Add IPv4 address
+ ifindex1 = wrapper.invoke(ipv41, "AddInterface", p1)
+ mask1 = wrapper.create("Ipv4Mask", "/30")
+ addr1 = wrapper.create("Ipv4Address", "10.0.0.1")
+ inaddr1 = wrapper.create("Ipv4InterfaceAddress", addr1, mask1)
+ wrapper.invoke(ipv41, "AddAddress", ifindex1, inaddr1)
+ wrapper.invoke(ipv41, "SetMetric", ifindex1, 1)
+ wrapper.invoke(ipv41, "SetUp", ifindex1)
+
+ # Enable collection of Ascii format to a specific file
+ filepath1 = "trace-p2p-1.tr"
+ stream1 = wrapper.invoke(asciiHelper, "CreateFileStream", filepath1)
+ wrapper.invoke(p2pHelper, "EnableAscii", stream1, p1)
+
+ # Iface for node2
+ p2 = wrapper.create("PointToPointNetDevice")
+ wrapper.invoke(n2, "AddDevice", p2)
+ q2 = wrapper.create("DropTailQueue")
+ wrapper.invoke(p2, "SetQueue", q2)
+
+ # Add IPv4 address
+ ifindex2 = wrapper.invoke(ipv42, "AddInterface", p2)
+ mask2 = wrapper.create("Ipv4Mask", "/30")
+ addr2 = wrapper.create("Ipv4Address", "10.0.0.2")
+ inaddr2 = wrapper.create("Ipv4InterfaceAddress", addr2, mask2)
+ wrapper.invoke(ipv42, "AddAddress", ifindex2, inaddr2)
+ wrapper.invoke(ipv42, "SetMetric", ifindex2, 1)
+ wrapper.invoke(ipv42, "SetUp", ifindex2)
+
+ # Enable collection of Ascii format to a specific file
+ filepath2 = "trace-p2p-2.tr"
+ stream2 = wrapper.invoke(asciiHelper, "CreateFileStream", filepath2)
+ wrapper.invoke(p2pHelper, "EnableAscii", stream2, p2)
+
+ # Create channel
+ chan = wrapper.create("PointToPointChannel")
+ wrapper.set(chan, "Delay", "0s")
+ wrapper.invoke(p1, "Attach", chan)
+ wrapper.invoke(p2, "Attach", chan)
+
+ ### create pinger
+ ping = wrapper.create("V4Ping")
+ wrapper.invoke(n1, "AddApplication", ping)
+ wrapper.set (ping, "Remote", "10.0.0.2")
+ wrapper.set (ping, "Interval", "1s")
+ wrapper.set (ping, "Verbose", True)
+ wrapper.set (ping, "StartTime", "0s")
+ wrapper.set (ping, "StopTime", "20s")
+
+ ### run Simulation
+ wrapper.stop(time = "21s")
+ wrapper.start()
+
+ time.sleep(1)
+
+ wrapper.set(chan, "Delay", "5s")
+
+ time.sleep(5)
+
+ wrapper.set(chan, "Delay", "0s")
+
+ # wait until simulation is over
+ wrapper.shutdown()
if __name__ == '__main__':
unittest.main()