attr = self._attrs[name]
return attr.value
+ def has_changed(self, name):
+ """ Returns the True is the value of the attribute
+ has been modified by the user.
+
+ :param name: Name of the attribute
+ :type name: str
+ :rtype: str
+ """
+ attr = self._attrs[name]
+ return attr.has_changed()
+
def enable_trace(self, name):
""" Explicitly enable trace generation
"""
return self.get("forwardX11") or self._in_foreground
+ def trace_filepath(self, filename):
+ return os.path.join(self.run_home, filename)
+
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
self.info("Retrieving '%s' trace %s " % (name, attr))
- path = os.path.join(self.run_home, name)
+ path = self.trace_filepath(name)
command = "(test -f %s && echo 'success') || echo 'error'" % path
(out, err), proc = self.node.execute(command)
self.simulation.local_socket,
self.simulation.remote_socket)
- def send_msg(self, msg, *args, **kwargs):
- args = list(args)
-
+ def send_msg(self, msg_type, *args, **kwargs):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self.simulation.local_socket)
- timeout = kwargs.get("timeout")
- if timeout:
- sock.settimeout(timeout)
+ msg = [msg_type, args, kwargs]
- args.insert(0, msg)
- def encode(arg):
- arg = cPickle.dumps(arg)
- return base64.b64encode(arg)
+ def encode(item):
+ item = cPickle.dumps(item)
+ return base64.b64encode(item)
- encoded = "|".join(map(encode, args))
+ encoded = "|".join(map(encode, msg))
sock.send("%s\n" % encoded)
reply = sock.recv(1024)
return cPickle.loads(base64.b64decode(reply))
- def create(self, clazzname, *args, **kwargs):
- args = list(args)
- args.insert(0, clazzname)
-
+ def create(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.CREATE, *args, **kwargs)
- def factory(self, type_name, **kwargs):
- args = [type_name]
- args.append(kwargs)
-
+ def factory(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.FACTORY, *args, **kwargs)
- def invoke(self, uuid, operation, *args, **kwargs):
- args = list(args)
- args.insert(0, operation)
- args.insert(0, uuid)
-
+ def invoke(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.INVOKE, *args, **kwargs)
- def set(self, uuid, name, value, **kwargs):
- args = [uuid, name, value]
-
+ def set(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.SET, *args, **kwargs)
- def get(self, uuid, name, **kwargs):
- args = [uuid, name]
-
+ def get(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.GET, *args, **kwargs)
- def enable_trace(self, *args, **kwargs):
- return self.send_msg(NS3WrapperMessage.ENABLE_TRACE, *args, **kwargs)
-
- def flush(self, **kwargs):
- args = []
+ def flush(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.FLUSH, *args, **kwargs)
- def start(self, **kwargs):
- args = []
+ def start(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.START, *args, **kwargs)
- def stop(self, **kwargs):
- args = []
- time = kwargs.get("time")
- if time: args.append(time)
-
+ def stop(self, *args, **kwargs):
return self.send_msg(NS3WrapperMessage.STOP, *args, **kwargs)
- def shutdown(self, **kwargs):
- args = []
+ def shutdown(self, *args, **kwargs):
ret = None
try:
self._run_in_background()
def configure(self):
- if self._attrs.get("simulatorImplementationType").has_changed():
+ if self.has_changed("simulatorImplementationType"):
simu_type = self.get("simulatorImplementationType")
stype = self.create("StringValue", simu_type)
self.invoke(GLOBAL_VALUE_UUID, "Bind", "SimulatorImplementationType", stype)
- if self._attrs.get("checksumEnabled").has_changed():
+ if self.has_changed("checksumEnabled"):
check_sum = self.get("checksumEnabled")
btrue = self.create("BooleanValue", check_sum)
self.invoke(GLOBAL_VALUE_UUID, "Bind", "ChecksumEnabled", btrue)
- if self._attrs.get("schedulerType").has_changed():
+ if self.has_changed("schedulerType"):
sched_type = self.get("schedulerType")
stype = self.create("StringValue", sched_type)
self.invoke(GLOBAL_VALUE_UUID, "Bind", "SchedulerType", btrue)
-
+
def do_deploy(self):
if not self.node or self.node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
ResourceState, reschedule_delay
from nepi.execution.attribute import Flags
+from nepi.execution.trace import TraceAttr
reschedule_delay = "2s"
super(NS3Base, self).__init__(ec, guid)
self._uuid = None
self._connected = set()
+ self._trace_filename = dict()
@property
def connected(self):
if nodes: return nodes[0]
return None
+ def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ filename = self._trace_filename.get(name)
+ if not filename:
+ self.error("Can resolve trace %s. Did you enabled it?" % name)
+ return ""
+
+ return self.simulation.trace(filename, attr, block, offset)
+
@property
def _rms_to_wait(self):
""" Returns the collection of ns-3 RMs that this RM needs to
self.debug("---- RESCHEDULING DEPLOY ----" )
self.ec.schedule(reschedule_delay, self.deploy)
else:
- self.info("Entering deploy")
self.do_discover()
self.do_provision()
def get(self, *args, **kwargs):
pass
- def trace(self, *args, **kwargs):
- pass
-
def flush(self, *args, **kwargs):
pass
from nepi.execution.attribute import Attribute, Flags
from nepi.execution.resource import clsinit_copy
+from nepi.execution.trace import Trace
from nepi.resources.ns3.ns3base import NS3Base
import ipaddr
cls._register_attribute(ip)
cls._register_attribute(prefix)
+ @classmethod
+ def _register_traces(cls):
+ pcap = Trace("pcap", "Dump traffic sniffed on the network device in Pcap format")
+ promisc_pcap = Trace("promiscPcap", "Dump traffic sniffed in promiscuous mode on the network device in Pcap format")
+ ascii = Trace("ascii", "Dump traffic sniffed on the network device in Ascii format")
+
+ cls._register_trace(pcap)
+ cls._register_trace(promisc_pcap)
+ cls._register_trace(ascii)
+
+ def __init__(self, ec, guid):
+ super(NS3BaseNetDevice, self).__init__(ec, guid)
+ self._ascii_helper_uuid = None
+ self._device_helper_uuid = None
+
@property
def node(self):
from nepi.resources.ns3.ns3node import NS3BaseNode
return channels[0]
+ @property
+ def ascii_helper_uuid(self):
+ if not self._ascii_helper_uuid:
+ self._ascii_helper_uuid = self.simulation.create("AsciiTraceHelper")
+ return self._ascii_helper_uuid
+
+ @property
+ def device_helper_uuid(self):
+ if not self._device_helper_uuid:
+ rtype = self.get_rtype()
+ if rtype == "ns3::PointToPointNetDevice":
+ classname = "PointToPointHelper"
+ elif rtype == "ns3::CsmaNetDevice":
+ classname = "CsmaHelper"
+ elif rtype == "ns3::EmuNetDevice":
+ classname = "EmuHelper"
+ elif rtype == "ns3::FdNetDevice":
+ classname = "FdNetDeviceHelper"
+ elif rtype in [ "ns3::BaseStationNetDevice", "SubscriberStationNetDevice" ]:
+ classname = "WimaxHelper"
+ elif rtype == "ns3::WifiNetDevice":
+ classname = "YansWifiPhyHelper"
+
+ self._device_helper_uuid = self.simulation.create(classname)
+
+ return self._device_helper_uuid
+
@property
def _rms_to_wait(self):
others = set()
# IPv6
# TODO!
pass
+
+ # Enable traces
+ self._configure_traces()
+
+ def _configure_traces(self):
+ if self.trace_enabled("pcap"):
+ helper_uuid = self.device_helper_uuid
+
+ filename = "trace-pcap-netdev-%d.pcap" % self.guid
+ self._trace_filename["pcap"] = filename
+
+ filepath = self.simulation.trace_filepath(filename)
+
+ self.simulation.invoke(helper_uuid, "EnablePcap", filepath,
+ self.uuid, promiscuous = False, explicitFilename = True)
+
+ if self.trace_enabled("promiscPcap"):
+ helper_uuid = self.device_helper_uuid
+
+ filename = "trace-promisc-pcap-netdev-%d.pcap" % self.guid
+ self._trace_filename["promiscPcap"] = filename
+
+ filepath = self.simulation.trace_filepath(filename)
+
+ self.simulation.invoke(helper_uuid, "EnablePcap", filepath,
+ self.uuid, promiscuous = True, explicitFilename = True)
+
+ if self.trace_enabled("ascii"):
+ helper_uuid = self.device_helper_uuid
+ ascii_helper_uuid = self.ascii_helper_uuid
+
+ filename = "trace-ascii-netdev-%d.tr" % self.guid
+ self._trace_filename["ascii"] = filename
+
+ filepath = self.simulation.trace_filepath(filename)
+ stream_uuid = self.simulation.invoke(ascii_helper_uuid,
+ "CreateFileStream", filepath)
+ self.simulation.invoke(helper_uuid, "EnableAscii", stream_uuid,
+ self.uuid)
def _connect_object(self):
node = self.node
INVOKE = "INVOKE"
SET = "SET"
GET = "GET"
- ENABLE_TRACE = "ENABLE_TRACE"
FLUSH = "FLUSH"
START = "START"
STOP = "STOP"
SHUTDOWN = "SHUTDOWN"
-def handle_message(ns3_wrapper, msg, args):
- if msg == NS3WrapperMessage.SHUTDOWN:
+def handle_message(ns3_wrapper, msg_type, args, kwargs):
+ if msg_type == NS3WrapperMessage.SHUTDOWN:
ns3_wrapper.shutdown()
ns3_wrapper.logger.debug("SHUTDOWN")
return "BYEBYE"
- if msg == NS3WrapperMessage.STOP:
- time = None
- if args:
- time = args[0]
+ if msg_type == NS3WrapperMessage.STOP:
+ time = kwargs.get("time")
ns3_wrapper.logger.debug("STOP time=%s" % str(time))
ns3_wrapper.stop(time=time)
return "STOPPED"
- if msg == NS3WrapperMessage.START:
+ if msg_type == NS3WrapperMessage.START:
ns3_wrapper.logger.debug("START")
ns3_wrapper.start()
return "STARTED"
- if msg == NS3WrapperMessage.CREATE:
+ if msg_type == NS3WrapperMessage.CREATE:
clazzname = args.pop(0)
ns3_wrapper.logger.debug("CREATE %s %s" % (clazzname, str(args)))
uuid = ns3_wrapper.create(clazzname, *args)
return uuid
- if msg == NS3WrapperMessage.FACTORY:
+ if msg_type == NS3WrapperMessage.FACTORY:
type_name = args.pop(0)
- kwargs = args.pop(0)
ns3_wrapper.logger.debug("FACTORY %s %s" % (type_name, str(kwargs)))
uuid = ns3_wrapper.factory(type_name, **kwargs)
return uuid
- if msg == NS3WrapperMessage.INVOKE:
+ if msg_type == NS3WrapperMessage.INVOKE:
uuid = args.pop(0)
operation = args.pop(0)
- ns3_wrapper.logger.debug("INVOKE %s %s %s " % (uuid, operation,
- str(args)))
+ ns3_wrapper.logger.debug("INVOKE %s %s %s %s " % (uuid, operation,
+ str(args), str(kwargs)))
- uuid = ns3_wrapper.invoke(uuid, operation, *args)
+ uuid = ns3_wrapper.invoke(uuid, operation, *args, **kwargs)
return uuid
- if msg == NS3WrapperMessage.GET:
+ if msg_type == NS3WrapperMessage.GET:
uuid = args.pop(0)
name = args.pop(0)
value = ns3_wrapper.get(uuid, name)
return value
- if msg == NS3WrapperMessage.SET:
+ if msg_type == NS3WrapperMessage.SET:
uuid = args.pop(0)
name = args.pop(0)
value = args.pop(0)
value = ns3_wrapper.set(uuid, name, value)
return value
- if msg == NS3WrapperMessage.ENABLE_TRACE:
- ns3_wrapper.logger.debug("ENABLE_TRACE")
-
- return "NOT YET IMPLEMENTED"
-
- if msg == NS3WrapperMessage.FLUSH:
+ if msg_type == NS3WrapperMessage.FLUSH:
# Forces flushing output and error streams.
# NS-3 output will stay unflushed until the program exits or
# explicit invocation flush is done
# empty chunk = EOF
break
- msg = ''.join(msg).split('\n')[0]
+ msg = ''.join(msg).strip()
+
+ # The message is formatted as follows:
+ # MESSAGE_TYPE|args|kwargs
+ #
+ # where MESSAGE_TYPE, args and kwargs are pickld and enoded in base64
- # The message might have arguments that will be appended
- # as a '|' separated list after the message identifier
- def decode(arg):
- arg = base64.b64decode(arg).rstrip()
- return cPickle.loads(arg)
+ def decode(item):
+ item = base64.b64decode(item).rstrip()
+ return cPickle.loads(item)
- dargs = map(decode, msg.split("|"))
+ decoded = map(decode, msg.split("|"))
# decoded message
- dmsg = dargs.pop(0)
+ dmsg_type = decoded.pop(0)
+ dargs = list(decoded.pop(0)) # transforming touple into list
+ dkwargs = decoded.pop(0)
- return (dmsg, dargs)
+ return (dmsg_type, dargs, dkwargs)
def send_reply(conn, reply):
encoded = base64.b64encode(cPickle.dumps(reply))
conn.settimeout(5)
try:
- (msg, args) = recv_msg(conn)
+ (msg_type, args, kwargs) = recv_msg(conn)
except socket.timeout, e:
# Ingore time-out
continue
- if not msg:
+ if not msg_type:
# Ignore - connection lost
break
- if msg == NS3WrapperMessage.SHUTDOWN:
+ if msg_type == NS3WrapperMessage.SHUTDOWN:
stop = True
try:
- reply = handle_message(ns3_wrapper, msg, args)
+ reply = handle_message(ns3_wrapper, msg_type, args, kwargs)
except:
import traceback
err = traceback.format_exc()
def get(self, *args, **kwargs):
return self.client.get(*args, **kwargs)
- def enable_trace(self, *args, **kwargs):
- return self.client.enable_trace(*args, **kwargs)
-
def flush(self, *args, **kwargs):
return self.client.flush(*args, **kwargs)
return uuid
- def invoke(self, uuid, operation, *args):
+ def invoke(self, uuid, operation, *args, **kwargs):
if operation == "isAppRunning":
return self._is_app_running(uuid)
# arguments starting with 'uuid' identify ns-3 C++
# objects and must be replaced by the actual object
realargs = self.replace_args(args)
+ realkwargs = self.replace_kwargs(kwargs)
- result = method(*realargs)
+ result = method(*realargs, **realkwargs)
if result is None or \
isinstance(result, bool):
if self._simulator_thread:
self._simulator_thread.join()
-
+
self.ns3.Simulator.Destroy()
# Remove all references to ns-3 objects
return realargs
+ # replace uuids and singleton references for the real objects
+ def replace_kwargs(self, kwargs):
+ realkwargs = dict([(k, self.get_object(v) \
+ if str(v).startswith("uuid") else v) \
+ for k,v in kwargs.iteritems()])
+
+ realkwargs = dict([(k, self._singleton(v) \
+ if str(v).startswith(SINGLETON) else v )\
+ for k, v in realkwargs.iteritems()])
+
+ return realkwargs
+
def _is_app_running(self, uuid):
now = self.ns3.Simulator.Now()
if now.IsZero():
from nepi.execution.ec import ExperimentController
+from nepi.execution.trace import TraceAttr
import os
import time
import unittest
+def add_ns3_node(ec, simu):
+ ns3_node = ec.register_resource("ns3::Node")
+ ec.register_connection(ns3_node, simu)
+
+ ipv4 = ec.register_resource("ns3::Ipv4L3Protocol")
+ ec.register_connection(ns3_node, ipv4)
+
+ arp = ec.register_resource("ns3::ArpL3Protocol")
+ ec.register_connection(ns3_node, arp)
+
+ icmp = ec.register_resource("ns3::Icmpv4L4Protocol")
+ ec.register_connection(ns3_node, icmp)
+
+ return ns3_node
+
+def add_point2point_device(ec, ns3_node, address, prefix):
+ dev = ec.register_resource("ns3::PointToPointNetDevice")
+ ec.set(dev, "ip", address)
+ ec.set(dev, "prefix", prefix)
+ ec.register_connection(ns3_node, dev)
+
+ queue = ec.register_resource("ns3::DropTailQueue")
+ ec.register_connection(dev, queue)
+
+ return dev
+
class LinuxNS3ClientTest(unittest.TestCase):
def setUp(self):
self.fedora_host = "nepi2.pl.sophia.inria.fr"
- #self.fedora_host = "peeramide.irisa.fr"
self.fedora_user = "inria_test"
def test_simple_ping(self):
ec.set(simu, "nsLog", "V4Ping:Node")
ec.register_connection(simu, node)
- nsnode1 = ec.register_resource("ns3::Node")
- ec.register_connection(nsnode1, simu)
-
- ipv41 = ec.register_resource("ns3::Ipv4L3Protocol")
- ec.register_connection(nsnode1, ipv41)
+ nsnode1 = add_ns3_node(ec, simu)
+ p1 = add_point2point_device(ec, nsnode1, "10.0.0.1", "30")
- arp1 = ec.register_resource("ns3::ArpL3Protocol")
- ec.register_connection(nsnode1, arp1)
-
- icmp1 = ec.register_resource("ns3::Icmpv4L4Protocol")
- ec.register_connection(nsnode1, icmp1)
-
- p1 = ec.register_resource("ns3::PointToPointNetDevice")
- ec.set(p1, "ip", "10.0.0.1")
- ec.set(p1, "prefix", "30")
- ec.register_connection(nsnode1, p1)
- q1 = ec.register_resource("ns3::DropTailQueue")
- ec.register_connection(p1, q1)
-
- nsnode2 = ec.register_resource("ns3::Node")
- ec.register_connection(nsnode2, simu)
-
- ipv42 = ec.register_resource("ns3::Ipv4L3Protocol")
- ec.register_connection(nsnode2, ipv42)
-
- arp2 = ec.register_resource("ns3::ArpL3Protocol")
- ec.register_connection(nsnode2, arp2)
-
- icmp2 = ec.register_resource("ns3::Icmpv4L4Protocol")
- ec.register_connection(nsnode2, icmp2)
-
- p2 = ec.register_resource("ns3::PointToPointNetDevice")
- ec.set(p2, "ip", "10.0.0.2")
- ec.set(p2, "prefix", "30")
- ec.register_connection(nsnode2, p2)
- q2 = ec.register_resource("ns3::DropTailQueue")
- ec.register_connection(p2, q2)
+ nsnode2 = add_ns3_node(ec, simu)
+ p2 = add_point2point_device(ec, nsnode2, "10.0.0.2", "30")
# Create channel
chan = ec.register_resource("ns3::PointToPointChannel")
ec.set(simu, "checksumEnabled", True)
ec.register_connection(simu, node)
- nsnode1 = ec.register_resource("ns3::Node")
- ec.register_connection(nsnode1, simu)
+ nsnode1 = add_ns3_node(ec, simu)
+ p1 = add_point2point_device(ec, nsnode1, "10.0.0.1", "30")
- ipv41 = ec.register_resource("ns3::Ipv4L3Protocol")
- ec.register_connection(nsnode1, ipv41)
-
- arp1 = ec.register_resource("ns3::ArpL3Protocol")
- ec.register_connection(nsnode1, arp1)
-
- icmp1 = ec.register_resource("ns3::Icmpv4L4Protocol")
- ec.register_connection(nsnode1, icmp1)
-
- p1 = ec.register_resource("ns3::PointToPointNetDevice")
- ec.set(p1, "ip", "10.0.0.1")
- ec.set(p1, "prefix", "30")
- ec.register_connection(nsnode1, p1)
- q1 = ec.register_resource("ns3::DropTailQueue")
- ec.register_connection(p1, q1)
-
- nsnode2 = ec.register_resource("ns3::Node")
- ec.register_connection(nsnode2, simu)
-
- ipv42 = ec.register_resource("ns3::Ipv4L3Protocol")
- ec.register_connection(nsnode2, ipv42)
-
- arp2 = ec.register_resource("ns3::ArpL3Protocol")
- ec.register_connection(nsnode2, arp2)
-
- icmp2 = ec.register_resource("ns3::Icmpv4L4Protocol")
- ec.register_connection(nsnode2, icmp2)
-
- p2 = ec.register_resource("ns3::PointToPointNetDevice")
- ec.set(p2, "ip", "10.0.0.2")
- ec.set(p2, "prefix", "30")
- ec.register_connection(nsnode2, p2)
- q2 = ec.register_resource("ns3::DropTailQueue")
- ec.register_connection(p2, q2)
+ nsnode2 = add_ns3_node(ec, simu)
+ p2 = add_point2point_device(ec, nsnode2, "10.0.0.2", "30")
# Create channel
chan = ec.register_resource("ns3::PointToPointChannel")
ec.shutdown()
+ def test_p2p_traces(self):
+ ec = ExperimentController(exp_id = "test-ns3-p2p-traces")
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", self.fedora_host)
+ ec.set(node, "username", self.fedora_user)
+ ec.set(node, "cleanProcesses", True)
+ #ec.set(node, "cleanHome", True)
+
+ simu = ec.register_resource("LinuxNS3Simulation")
+ ec.set(simu, "verbose", True)
+ ec.set(simu, "nsLog", "V4Ping:Node")
+ ec.register_connection(simu, node)
+
+ nsnode1 = add_ns3_node(ec, simu)
+ p1 = add_point2point_device(ec, nsnode1, "10.0.0.1", "30")
+
+ nsnode2 = add_ns3_node(ec, simu)
+ p2 = add_point2point_device(ec, nsnode2, "10.0.0.2", "30")
+
+ # Create channel
+ chan = ec.register_resource("ns3::PointToPointChannel")
+ ec.set(chan, "Delay", "0s")
+ ec.register_connection(chan, p1)
+ ec.register_connection(chan, p2)
+
+ ### create pinger
+ ping = ec.register_resource("ns3::V4Ping")
+ ec.set (ping, "Remote", "10.0.0.2")
+ ec.set (ping, "Interval", "1s")
+ ec.set (ping, "Verbose", True)
+ ec.set (ping, "StartTime", "0s")
+ ec.set (ping, "StopTime", "20s")
+ ec.register_connection(ping, nsnode1)
+
+ # enable traces
+ ec.enable_trace(p1, "pcap")
+ ec.enable_trace(p1, "promiscPcap")
+ ec.enable_trace(p1, "ascii")
+
+ ec.enable_trace(p2, "pcap")
+ ec.enable_trace(p2, "promiscPcap")
+ ec.enable_trace(p2, "ascii")
+
+ ec.deploy()
+
+ ec.wait_finished([ping])
+
+ # Trace verification
+ rm_simu = ec.get_resource(simu)
+
+ # TODO: Fix this in ns-3: pcap traces do not flush until the Simulator
+ # process is ended, so we can't get the traces of the 'pcap' and
+ # 'promiscPcap' traces.
+ #
+ #for trace in ["pcap", "promiscPcap", "ascii"]:
+ for trace in ["ascii"]:
+ for guid in [p1, p2]:
+ output = ec.trace(guid, trace)
+
+ size = ec.trace(guid, trace, attr = TraceAttr.SIZE)
+ self.assertEquals(size, len(output))
+ self.assertTrue(size > 100)
+
+ block = ec.trace(guid, trace, attr = TraceAttr.STREAM, block = 5, offset = 1)
+ self.assertEquals(block, output[5:10])
+
+ trace_path = ec.trace(guid, trace, attr = TraceAttr.PATH)
+ rm = ec.get_resource(guid)
+ path = os.path.join(rm_simu.run_home, rm._trace_filename.get(trace))
+ self.assertEquals(trace_path, path)
+
+ ec.shutdown()
+
if __name__ == '__main__':
unittest.main()