self.fedora_user = "inria_nepi"
self.fedora_identity = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'])
+ def test_local_p2p_ping(self):
+ ec = ExperimentController(exp_id = "test-ns3-local-p2p")
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", "localhost")
+
+ simu = ec.register_resource("LinuxNS3Simulation")
+ ec.set(simu, "verbose", True)
+ ec.register_connection(simu, node)
+
+ nsnode1 = add_ns3_node(ec, simu)
+ dev1 = add_point2point_device(ec, nsnode1, "10.0.0.1", "30")
+
+ nsnode2 = add_ns3_node(ec, simu)
+ dev2 = add_point2point_device(ec, nsnode2, "10.0.0.2", "30")
+
+ # Create channel
+ chan = ec.register_resource("ns3::PointToPointChannel")
+ ec.set(chan, "Delay", "0s")
+ ec.register_connection(chan, dev1)
+ ec.register_connection(chan, dev2)
+
+ ### create pinger
+ ping = ec.register_resource("ns3::V4Ping")
+ ec.set (ping, "Remote", "10.0.0.2")
+ ec.set (ping, "Interval", "1s")
+ ec.set (ping, "Verbose", True)
+ ec.set (ping, "StartTime", "0s")
+ ec.set (ping, "StopTime", "20s")
+ ec.register_connection(ping, nsnode1)
+
+ ec.deploy()
+
+ ec.wait_finished([ping])
+
+ stdout = ec.trace(simu, "stdout")
+
+ expected = "20 packets transmitted, 20 received, 0% packet loss"
+ self.assertTrue(stdout.find(expected) > -1)
+
+ ec.shutdown()
+
def test_simple_p2p_ping(self):
ec = ExperimentController(exp_id = "test-ns3-p2p-ping")
#ec.set(node, "cleanHome", True)
simu = ec.register_resource("LinuxNS3Simulation")
+ ec.set(simu, "verbose", True)
ec.register_connection(simu, node)
nsnode1 = add_ns3_node(ec, simu)
#ec.set(node, "cleanHome", True)
simu = ec.register_resource("LinuxNS3Simulation")
+ ec.set(simu, "verbose", True)
ec.register_connection(simu, node)
nsnode1 = add_ns3_node(ec, simu)
#ec.set(node, "cleanHome", True)
simu = ec.register_resource("LinuxNS3Simulation")
+ ec.set(simu, "verbose", True)
sources = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"ns-3.18-user.tar.gz")
ec.set(simu, "sources", sources)
rm = ec.get_resource(ping)
start_time = rm.start_time
stop_time = rm.stop_time
- delta = stop_time - start_time
+ delta = stop_time - start_time
- self.assertTrue(delta.seconds >= 20)
- self.assertTrue(delta.seconds < 25)
+ self.assertTrue(delta.seconds >= 20, "Time elapsed %d" % delta.seconds)
+ self.assertTrue(delta.seconds < 25, "Time elapsed %d" % delta.seconds)
ec.shutdown()
- def test_dev2p_traces(self):
- ec = ExperimentController(exp_id = "test-ns3-dev2p-traces")
+ def test_traces(self):
+ ec = ExperimentController(exp_id = "test-ns3-traces")
node = ec.register_resource("LinuxNode")
ec.set(node, "hostname", self.fedora_host)
#ec.set(node, "cleanHome", True)
simu = ec.register_resource("LinuxNS3Simulation")
+ ec.set(simu, "verbose", True)
ec.register_connection(simu, node)
nsnode1 = add_ns3_node(ec, simu)
"""
- ec = ExperimentController(exp_id = "test-ns3-auto-routes")
+ ec = ExperimentController(exp_id = "test-ns3-dce")
node = ec.register_resource("LinuxNode")
ec.set(node, "hostname", self.fedora_host)
ec.shutdown()
+ def test_dce(self):
+ """
+ network topology:
+ n4
+ |
+ n1 -- p2p -- n2 -- csma -- n5 -- p2p -- n6
+ | |
+ ping n6 n3
+
+
+ """
+ ec = ExperimentController(exp_id = "test-ns3-dce")
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", self.fedora_host)
+ ec.set(node, "username", self.fedora_user)
+ ec.set(node, "identity", self.fedora_identity)
+ ec.set(node, "cleanProcesses", True)
+ #ec.set(node, "cleanHome", True)
+
+ simu = ec.register_resource("LinuxNS3Simulation")
+ ec.set(simu, "verbose", True)
+ ec.set(simu, "enableDCE", True)
+ ec.register_connection(simu, node)
+
+ nsnode1 = add_ns3_node(ec, simu)
+ p2p1 = add_point2point_device(ec, nsnode1, "10.0.0.1", "30")
+
+ nsnode2 = add_ns3_node(ec, simu)
+ p2p2 = add_point2point_device(ec, nsnode2, "10.0.0.2", "30")
+
+ # Create channel
+ chan = ec.register_resource("ns3::PointToPointChannel")
+ ec.set(chan, "Delay", "0s")
+ ec.register_connection(chan, p2p1)
+ ec.register_connection(chan, p2p2)
+
+ ### create pinger
+ ping = ec.register_resource("ns3::V4Ping")
+ ec.set (ping, "Remote", "10.0.0.2")
+ ec.set (ping, "Interval", "1s")
+ ec.set (ping, "Verbose", True)
+ ec.set (ping, "StartTime", "1s")
+ ec.set (ping, "StopTime", "21s")
+ ec.register_connection(ping, nsnode1)
+
+ ec.deploy()
+
+ ec.wait_finished([ping])
+
+ stdout = ec.trace(simu, "stdout")
+
+ print stdout
+
+ expected = "20 packets transmitted, 20 received, 0% packet loss"
+ self.assertTrue(stdout.find(expected) > -1)
+
+ ec.shutdown()
+
if __name__ == '__main__':
unittest.main()