3 # NEPI, a framework to manage network experiments
4 # Copyright (C) 2013 INRIA
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
22 from nepi.resources.linux.node import LinuxNode
23 from nepi.util.sshfuncs import RUNNING, FINISHED
25 from test_utils import skipIfNotAlive, skipInteractive, create_node
32 class LinuxNodeTestCase(unittest.TestCase):
34 self.fedora_host = 'nepi2.pl.sophia.inria.fr'
35 self.fedora_user = 'inria_nepi'
37 self.ubuntu_host = 'roseval.pl.sophia.inria.fr'
38 self.ubuntu_user = 'alina'
40 self.target = 'nepi5.pl.sophia.inria.fr'
43 def t_xterm(self, host, user):
44 node, ec = create_node(host, user)
46 node.install_packages('xterm')
48 (out, err), proc = node.execute('xterm', forward_x11 = True)
50 self.assertEquals(out, "")
52 (out, err), proc = node.remove_packages('xterm')
54 self.assertEquals(out, "")
57 def t_execute(self, host, user):
58 node, ec = create_node(host, user)
60 command = "ping -qc3 %s" % self.target
62 (out, err), proc = node.execute(command)
64 expected = """3 packets transmitted, 3 received, 0% packet loss"""
66 self.assertTrue(out.find(expected) > 0)
69 def t_run(self, host, user):
70 node, ec = create_node(host, user)
72 app_home = os.path.join(node.exp_home, "my-app")
73 node.mkdir(app_home, clean = True)
75 command = "ping %s" % self.target
76 node.run(command, app_home)
77 pid, ppid = node.checkpid(app_home)
79 status = node.status(pid, ppid)
80 self.assertTrue(status, RUNNING)
83 status = node.status(pid, ppid)
84 self.assertTrue(status, FINISHED)
86 (out, err), proc = node.check_output(app_home, "stdout")
88 expected = """64 bytes from"""
90 self.assertTrue(out.find(expected) > 0)
95 def t_install(self, host, user):
96 node, ec = create_node(host, user)
98 (out, err), proc = node.mkdir(node.node_home, clean=True)
99 self.assertEquals(out, "")
101 (out, err), proc = node.install_packages('gcc')
102 self.assertEquals(out, "")
104 (out, err), proc = node.remove_packages('gcc')
105 self.assertEquals(out, "")
107 (out, err), proc = node.rmdir(node.exp_home)
108 self.assertEquals(out, "")
111 def t_compile(self, host, user):
112 node, ec = create_node(host, user)
114 app_home = os.path.join(node.exp_home, "my-app")
115 node.mkdir(app_home, clean = True)
117 prog = """#include <stdio.h>
122 printf ("Hello, world!\\n");
126 # upload the test program
127 dst = os.path.join(app_home, "hello.c")
128 node.upload(prog, dst, text = True)
131 node.install_packages('gcc')
133 # compile the program using gcc
134 command = "cd %s; gcc -Wall hello.c -o hello" % app_home
135 (out, err), proc = node.execute(command)
137 # execute the program and get the output from stdout
138 command = "%s/hello" % app_home
139 (out, err), proc = node.execute(command)
141 self.assertEquals(out, "Hello, world!\n")
143 # execute the program and get the output from a file
144 command = "%(home)s/hello > %(home)s/hello.out" % {
146 (out, err), proc = node.execute(command)
148 # retrieve the output file
149 src = os.path.join(app_home, "hello.out")
150 f = tempfile.NamedTemporaryFile(delete=False)
152 node.download(src, dst)
155 node.remove_packages('gcc')
162 self.assertEquals(out, "Hello, world!\n")
164 def test_execute_fedora(self):
165 self.t_execute(self.fedora_host, self.fedora_user)
167 def test_execute_ubuntu(self):
168 self.t_execute(self.ubuntu_host, self.ubuntu_user)
170 def test_run_fedora(self):
171 self.t_run(self.fedora_host, self.fedora_user)
173 def test_run_ubuntu(self):
174 self.t_run(self.ubuntu_host, self.ubuntu_user)
176 def test_intall_fedora(self):
177 self.t_install(self.fedora_host, self.fedora_user)
179 def test_install_ubuntu(self):
180 self.t_install(self.ubuntu_host, self.ubuntu_user)
182 def test_compile_fedora(self):
183 self.t_compile(self.fedora_host, self.fedora_user)
185 def test_compile_ubuntu(self):
186 self.t_compile(self.ubuntu_host, self.ubuntu_user)
189 def test_xterm_ubuntu(self):
190 """ Interactive test. Should not run automatically """
191 self.t_xterm(self.ubuntu_host, self.ubuntu_user)
194 if __name__ == '__main__':