#!/usr/bin/env python # # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Alina Quereilhac from nepi.execution.ec import ExperimentController from nepi.execution.resource import ResourceState, ResourceAction from nepi.execution.trace import TraceAttr from test_utils import skipIfNotAlive import os import time import unittest def add_ns3_node(ec, simu): node = ec.register_resource("ns3::Node") ec.set(node, "enableStack", True) ec.register_connection(node, simu) return node def add_fd_device(ec, node, ip, prefix): dev = ec.register_resource("ns3::FdNetDevice") ec.set(dev, "ip", ip) ec.set(dev, "prefix", prefix) ec.register_connection(node, dev) return dev def add_tap_device(ec, node, ip, prefix): dev = ec.register_resource("linux::Tap") ec.set(dev, "ip", ip) ec.set(dev, "prefix", prefix) ec.register_connection(node, dev) return dev def add_point2point_device(ec, node, ip, prefix): dev = ec.register_resource("ns3::PointToPointNetDevice") ec.set(dev, "ip", ip) ec.set(dev, "prefix", prefix) ec.register_connection(node, dev) queue = ec.register_resource("ns3::DropTailQueue") ec.register_connection(dev, queue) return dev class LinuxNS3FdNetDeviceTest(unittest.TestCase): def setUp(self): self.fedora_host = "nepi2.pl.sophia.inria.fr" self.fedora_user = "inria_nepi" self.fedora_identity = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME']) self.ubuntu_host = "roseval.pl.sophia.inria.fr" self.ubuntu_user = "inria_nepi" self.ubuntu_identity = "%s/.ssh/id_rsa" % (os.environ['HOME']) @skipIfNotAlive def t_cross_tunnel_ping(self, host, user = None, identity = None): ec = ExperimentController(exp_id = "test-linux-ns3-tap-fd") node1 = ec.register_resource("linux::Node") ec.set(node1, "hostname", "localhost") ec.set(node1, "cleanProcesses", True) ec.set(node1, "cleanExperiment", True) simu1 = ec.register_resource("linux::ns3::Simulation") ec.set(simu1, "simulatorImplementationType", "ns3::RealtimeSimulatorImpl") ec.set(simu1, "checksumEnabled", True) ec.set(simu1, "verbose", True) #ec.set(simu1, "buildMode", "debug") #ec.set(simu1, "nsLog", "FdNetDevice") ec.register_connection(simu1, node1) nsnode1 = ec.register_resource("ns3::Node") ec.set(nsnode1, "enableStack", True) ec.register_connection(nsnode1, simu1) fddev1 = ec.register_resource("ns3::FdNetDevice") ec.set(fddev1, "ip", "10.0.0.1") ec.set(fddev1, "prefix", "30") ec.register_connection(nsnode1, fddev1) node2 = ec.register_resource("linux::Node") ec.set(node2, "hostname", host) ec.set(node2, "username", user) ec.set(node2, "identity", identity) ec.set(node2, "cleanProcesses", True) ec.set(node2, "cleanExperiment", True) simu2 = ec.register_resource("linux::ns3::Simulation") ec.set(simu2, "simulatorImplementationType", "ns3::RealtimeSimulatorImpl") ec.set(simu2, "checksumEnabled", True) ec.set(simu2, "verbose", True) #ec.set(simu2, "buildMode", "debug") #ec.set(simu2, "nsLog", "FdNetDevice") ec.register_connection(simu2, node2) nsnode2 = ec.register_resource("ns3::Node") ec.set(nsnode2, "enableStack", True) ec.register_connection(nsnode2, simu2) fddev2 = ec.register_resource("ns3::FdNetDevice") ec.set(fddev2, "ip", "10.0.0.2") ec.set(fddev2, "prefix", "30") ec.register_connection(nsnode2, fddev2) tunnel = ec.register_resource("linux::ns3::FdUdpTunnel") ec.register_connection(tunnel, fddev1) ec.register_connection(tunnel, fddev2) ping = ec.register_resource("ns3::V4Ping") ec.set (ping, "Remote", "10.0.0.2") ec.set (ping, "Interval", "1s") ec.set (ping, "Verbose", True) ec.set (ping, "StartTime", "0s") ec.set (ping, "StopTime", "20s") ec.register_connection(ping, nsnode1) ec.deploy() ec.wait_finished([ping]) stdout = ec.trace(simu1, "stdout") expected = "20 packets transmitted, 20 received, 0% packet loss" self.assertTrue(stdout.find(expected) > -1) ec.shutdown() def ztest_cross_ping_fedora(self): self.t_cross_ping(self.fedora_host, self.fedora_user, self.fedora_identity) def test_cross_tunnel_ping_ubuntu(self): self.t_cross_tunnel_ping(self.ubuntu_host, self.ubuntu_user, self.ubuntu_identity) if __name__ == '__main__': unittest.main()