use print() - import print_function - should be fine for both py2 and py3
[nepi.git] / test / resources / linux / udptunnel.py
1 #!/usr/bin/env python
2 #
3 #    NEPI, a framework to manage network experiments
4 #    Copyright (C) 2013 INRIA
5 #
6 #    This program is free software: you can redistribute it and/or modify
7 #    it under the terms of the GNU General Public License version 2 as
8 #    published by the Free Software Foundation;
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from __future__ import print_function
21
22 from nepi.execution.ec import ExperimentController 
23
24 from test_utils import skipIfAnyNotAliveWithIdentity
25
26 import os
27 import time
28 import unittest
29
30 class LinuxUdpTunnelTestCase(unittest.TestCase):
31     def setUp(self):
32         self.host = "roseval.pl.sophia.inria.fr"
33         self.user = "inria_nepi"
34         self.identity = "%s/.ssh/id_rsa" % (os.environ['HOME'])
35
36     @skipIfAnyNotAliveWithIdentity
37     def t_tap_udp_tunnel(self, user, host, identity):
38
39         ec = ExperimentController(exp_id="test-tap-udp-tunnel")
40         
41         node1 = ec.register_resource("linux::Node")
42         ec.set(node1, "hostname", "localhost")
43         ec.set(node1, "cleanExperiment", True)
44         ec.set(node1, "cleanProcesses", True)
45
46         tap1 = ec.register_resource("linux::Tap")
47         ec.set(tap1, "ip", "192.168.3.1")
48         ec.set(tap1, "prefix", "30")
49         ec.register_connection(tap1, node1)
50
51         node2 = ec.register_resource("linux::Node")
52         ec.set(node2, "hostname", host)
53         ec.set(node2, "username", user)
54         ec.set(node2, "identity", identity)
55         ec.set(node2, "cleanExperiment", True)
56         ec.set(node2, "cleanProcesses", True)
57
58         tap2 = ec.register_resource("linux::Tap")
59         ec.set(tap2, "ip", "192.168.3.2")
60         ec.set(tap2, "prefix", "30")
61         ec.register_connection(tap2, node2)
62
63         udptun = ec.register_resource("linux::UdpTunnel")
64         ec.register_connection(tap1, udptun)
65         ec.register_connection(tap2, udptun)
66
67         app = ec.register_resource("linux::Application")
68         cmd = "ping -c3 192.168.3.2"
69         ec.set(app, "command", cmd)
70         ec.register_connection(app, node1)
71
72         ec.deploy()
73
74         ec.wait_finished(app)
75
76         ping = ec.trace(app, "stdout")
77         print(ping)
78         expected = """3 packets transmitted, 3 received, 0% packet loss"""
79         self.assertTrue(ping.find(expected) > -1)
80         
81         vif_name = ec.get(tap1, "deviceName")
82         self.assertTrue(vif_name.startswith("tap"))
83         
84         vif_name = ec.get(tap2, "deviceName")
85         self.assertTrue(vif_name.startswith("tap"))
86
87         ec.shutdown()
88
89     @skipIfAnyNotAliveWithIdentity
90     def t_tun_udp_tunnel(self, user, host, identity):
91
92         ec = ExperimentController(exp_id="test-tun-udp-tunnel")
93         
94         node1 = ec.register_resource("linux::Node")
95         ec.set(node1, "hostname", "localhost")
96         ec.set(node1, "cleanExperiment", True)
97         ec.set(node1, "cleanProcesses", True)
98
99         tun1 = ec.register_resource("linux::Tun")
100         ec.set(tun1, "ip", "192.168.3.1")
101         ec.set(tun1, "prefix", "30")
102         ec.register_connection(tun1, node1)
103
104         node2 = ec.register_resource("linux::Node")
105         ec.set(node2, "hostname", host)
106         ec.set(node2, "username", user)
107         ec.set(node2, "identity", identity)
108         ec.set(node2, "cleanExperiment", True)
109         ec.set(node2, "cleanProcesses", True)
110
111         tun2 = ec.register_resource("linux::Tun")
112         ec.set(tun2, "ip", "192.168.3.2")
113         ec.set(tun2, "prefix", "30")
114         ec.register_connection(tun2, node2)
115
116         udptun = ec.register_resource("linux::UdpTunnel")
117         ec.register_connection(tun1, udptun)
118         ec.register_connection(tun2, udptun)
119
120         app = ec.register_resource("linux::Application")
121         cmd = "ping -c3 192.168.3.2"
122         ec.set(app, "command", cmd)
123         ec.register_connection(app, node1)
124
125         ec.deploy()
126
127         ec.wait_finished(app)
128
129         ping = ec.trace(app, "stdout")
130
131         print(ping)
132         expected = """3 packets transmitted, 3 received, 0% packet loss"""
133         self.assertTrue(ping.find(expected) > -1)
134         
135         vif_name = ec.get(tun1, "deviceName")
136         self.assertTrue(vif_name.startswith("tun"))
137         
138         vif_name = ec.get(tun2, "deviceName")
139         self.assertTrue(vif_name.startswith("tun"))
140
141         ec.shutdown()
142
143     def test_tap_udp_tunnel(self):
144         self.t_tap_udp_tunnel(self.user, self.host, self.identity)
145
146     def ztest_tun_udp_tunnel(self):
147         self.t_tun_udp_tunnel(self.user, self.host, self.identity)
148
149 if __name__ == '__main__':
150     unittest.main()
151