2 # -*- coding: utf-8 -*-
5 from nepi.core.design import ExperimentDescription, FactoriesProvider
6 from nepi.core.execute import ExperimentController
14 class NetnsIntegrationTestCase(unittest.TestCase):
16 self._home_dir = os.path.join(os.getenv("HOME"), ".nepi",
18 os.makedirs(self._home_dir)
20 @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
21 def test_local_if(self):
22 exp_desc = ExperimentDescription()
23 testbed_version = "01"
25 user = getpass.getuser()
26 netns_provider = FactoriesProvider(testbed_id, testbed_version)
27 netns_desc = exp_desc.add_testbed_description(netns_provider)
28 netns_desc.set_attribute_value("homeDirectory", self._home_dir)
29 #netns_desc.set_attribute_value("enableDebug", True)
30 node1 = netns_desc.create("Node")
31 node2 = netns_desc.create("Node")
32 iface1 = netns_desc.create("NodeInterface")
33 iface1.set_attribute_value("up", True)
34 node1.connector("devs").connect(iface1.connector("node"))
35 ip1 = iface1.add_address()
36 ip1.set_attribute_value("Address", "10.0.0.1")
37 iface2 = netns_desc.create("NodeInterface")
38 iface2.set_attribute_value("up", True)
39 node2.connector("devs").connect(iface2.connector("node"))
40 ip2 = iface2.add_address()
41 ip2.set_attribute_value("Address", "10.0.0.2")
42 switch = netns_desc.create("Switch")
43 switch.set_attribute_value("up", True)
44 iface1.connector("switch").connect(switch.connector("devs"))
45 iface2.connector("switch").connect(switch.connector("devs"))
46 app = netns_desc.create("Application")
47 app.set_attribute_value("command", "ping -qc1 10.0.0.2")
48 app.set_attribute_value("user", user)
49 app.connector("node").connect(node1.connector("apps"))
50 app.enable_trace("stdout")
51 xml = exp_desc.to_xml()
53 controller = ExperimentController(xml)
55 while not controller.is_finished(app.guid):
57 ping_result = controller.trace(netns_desc.guid, app.guid, "stdout")
58 comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
60 --- 10.0.0.2 ping statistics ---
61 1 packets transmitted, 1 received, 0% packet loss, time 0ms
63 self.assertTrue(ping_result.startswith(comp_result))
68 shutil.rmtree(self._home_dir)
70 if __name__ == '__main__':