2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from nepi.resources.linux.node import LinuxNode
22 from nepi.util.sshfuncs import RUNNING, FINISHED
24 from test_utils import skipIfNotAlive, skipInteractive, create_node
31 class LinuxNodeTestCase(unittest.TestCase):
33 self.fedora_host = 'nepi2.pl.sophia.inria.fr'
34 self.fedora_user = 'inria_nepi'
36 self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
37 self.ubuntu_user = 'alina'
39 self.target = 'nepi5.pl.sophia.inria.fr'
42 def t_xterm(self, host, user):
43 node, ec = create_node(host, user)
45 node.install_packages('xterm')
47 (out, err), proc = node.execute('xterm', forward_x11 = True)
49 self.assertEquals(out, "")
51 (out, err), proc = node.remove_packages('xterm')
53 self.assertEquals(out, "")
56 def t_execute(self, host, user):
57 node, ec = create_node(host, user)
59 command = "ping -qc3 %s" % self.target
61 (out, err), proc = node.execute(command)
63 expected = """3 packets transmitted, 3 received, 0% packet loss"""
65 self.assertTrue(out.find(expected) > 0)
68 def t_run(self, host, user):
69 node, ec = create_node(host, user)
71 app_home = os.path.join(node.exp_home, "my-app")
72 node.mkdir(app_home, clean = True)
74 command = "ping %s" % self.target
75 node.run(command, app_home)
76 pid, ppid = node.checkpid(app_home)
78 status = node.status(pid, ppid)
79 self.assertTrue(status, RUNNING)
82 status = node.status(pid, ppid)
83 self.assertTrue(status, FINISHED)
85 (out, err), proc = node.check_output(app_home, "stdout")
87 expected = """64 bytes from"""
89 self.assertTrue(out.find(expected) > 0)
94 def t_install(self, host, user):
95 node, ec = create_node(host, user)
97 (out, err), proc = node.install_packages('gcc')
98 self.assertEquals(out, "")
100 (out, err), proc = node.remove_packages('gcc')
102 self.assertEquals(out, "")
106 def t_compile(self, host, user):
107 node, ec = create_node(host, user)
109 app_home = os.path.join(node.exp_home, "my-app")
110 node.mkdir(app_home, clean = True)
112 prog = """#include <stdio.h>
117 printf ("Hello, world!\\n");
121 # upload the test program
122 dst = os.path.join(app_home, "hello.c")
123 node.upload(prog, dst, text = True)
126 node.install_packages('gcc')
128 # compile the program using gcc
129 command = "cd %s; gcc -Wall hello.c -o hello" % app_home
130 (out, err), proc = node.execute(command)
132 # execute the program and get the output from stdout
133 command = "%s/hello" % app_home
134 (out, err), proc = node.execute(command)
136 self.assertEquals(out, "Hello, world!\n")
138 # execute the program and get the output from a file
139 command = "%(home)s/hello > %(home)s/hello.out" % {
141 (out, err), proc = node.execute(command)
143 # retrieve the output file
144 src = os.path.join(app_home, "hello.out")
145 f = tempfile.NamedTemporaryFile(delete=False)
147 node.download(src, dst)
150 node.remove_packages('gcc')
157 self.assertEquals(out, "Hello, world!\n")
159 def test_execute_fedora(self):
160 self.t_execute(self.fedora_host, self.fedora_user)
162 def test_execute_ubuntu(self):
163 self.t_execute(self.ubuntu_host, self.ubuntu_user)
165 def test_run_fedora(self):
166 self.t_run(self.fedora_host, self.fedora_user)
168 def test_run_ubuntu(self):
169 self.t_run(self.ubuntu_host, self.ubuntu_user)
171 def test_intall_fedora(self):
172 self.t_install(self.fedora_host, self.fedora_user)
174 def test_install_ubuntu(self):
175 self.t_install(self.ubuntu_host, self.ubuntu_user)
177 def test_compile_fedora(self):
178 self.t_compile(self.fedora_host, self.fedora_user)
180 def test_compile_ubuntu(self):
181 self.t_compile(self.ubuntu_host, self.ubuntu_user)
184 def test_xterm_ubuntu(self):
185 """ Interactive test. Should not run automatically """
186 self.t_xterm(self.ubuntu_host, self.ubuntu_user)
189 if __name__ == '__main__':