#!/usr/bin/env python # # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Alina Quereilhac from __future__ import print_function from nepi.execution.ec import ExperimentController from nepi.execution.trace import TraceAttr from test_utils import skipIfNotAlive, skipIf import os import time import unittest def add_node(ec, emu, addr, prefix): node = ec.register_resource("netns::Node") ec.register_connection(node, emu) iface = ec.register_resource("netns::NodeInterface") ec.register_connection(iface, node) ip = ec.register_resource("netns::IPv4Address") ec.set(ip, "ip", addr) ec.set(ip, "prefix", prefix) ec.register_connection(ip, iface) print(ec.get(ip, "ip"), addr) print(ec.get(ip, "prefix"), prefix) return node, iface class LinuxNetNSEmulationTest(unittest.TestCase): def setUp(self): self.fedora_host = "mimas.inria.fr" self.fedora_user = "aquereil" self.fedora_identity = "%s/.ssh/id_rsa" % (os.environ['HOME']) @skipIfNotAlive @skipIf(os.getuid() != 0, "Test requires root privileges") def t_ping(self, host, user = None, identity = None): ec = ExperimentController(exp_id = "test-netns-p2p-ping") node = ec.register_resource("linux::Node") if host == "localhost": ec.set(node, "hostname", "localhost") else: ec.set(node, "hostname", host) ec.set(node, "username", user) ec.set(node, "identity", identity) ec.set(node, "cleanProcesses", True) #ec.set(node, "cleanHome", True) emu = ec.register_resource("linux::netns::Emulation") ec.set(emu, "verbose", True) ec.register_connection(emu, node) netnode1, iface1 = add_node(ec, emu, "10.0.0.1", "24") netnode2, iface2 = add_node(ec, emu, "10.0.0.2", "24") switch = ec.register_resource("netns::Switch") ec.register_connection(iface1, switch) ec.register_connection(iface2, switch) ping = ec.register_resource("netns::Application") ec.set(ping, "command", "ping -c20 10.0.0.2") ec.register_connection(ping, netnode1) ec.deploy() ec.wait_finished([ping]) stdout = ec.trace(ping, "stdout") expected = "20 packets transmitted, 20 received, 0% packet loss" self.assertTrue(stdout.find(expected) > -1) ec.shutdown() def ztest_ping_fedora(self): self.t_ping(self.fedora_host, self.fedora_user, self.fedora_identity) def test_ping_local(self): self.t_ping("localhost") if __name__ == '__main__': unittest.main()