Adding X11 forwarding tests for resources/linux/node.py
[nepi.git] / test / resources / linux / node.py
1 #!/usr/bin/env python
2 from neco.resources.linux.node import LinuxNode
3 from neco.design.box import Box
4 from neco.util.sshfuncs import RUNNING, FINISHED
5
6 import os.path
7 import time
8 import unittest
9
10
11 class DummyEC(object):
12     pass
13
14 class LinuxBoxTestCase(unittest.TestCase):
15     def setUp(self):
16         host = 'nepi2.pl.sophia.inria.fr'
17         user = 'inria_nepi'
18         self.node_fedora = self.create_node(host, user)
19
20         host = 'roseval.pl.sophia.inria.fr'
21         user = 'alina'
22         self.node_ubuntu = self.create_node(host, user)
23         
24         self.target = 'nepi5.pl.sophia.inria.fr'
25         self.home = '${HOME}/test-app'
26
27     def create_node(self, host, user):
28         box = Box()
29         ec = DummyEC()
30
31         node = LinuxNode(box, ec)
32         node.host = host
33         node.user = user
34
35         return node
36
37     def t_xterm(self, node, target):
38         if not node.is_alive():
39             print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
40             return 
41
42         node.enable_x11 = True
43
44         node.install('xterm')
45
46         out = node.execute('xterm')
47
48         node.uninstall('xterm')
49
50         self.assertEquals(out, "")
51
52     def t_execute(self, node, target):
53         if not node.is_alive():
54             print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
55             return 
56
57         command = "ping -qc3 %s" % target
58         out = node.execute(command)
59
60         expected = """3 packets transmitted, 3 received, 0% packet loss"""
61
62         self.assertTrue(out.find(expected) > 0)
63
64     def t_run(self, node, target):
65         if not node.is_alive():
66             print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
67             return
68
69         node.mkdir(self.home, clean = True)
70         
71         command = "ping %s" % target
72         dst = os.path.join(self.home, "app.sh")
73         node.upload(command, dst)
74         
75         cmd = "bash ./app.sh"
76         node.run(cmd, self.home)
77         pid, ppid = node.checkpid(self.home)
78
79         status = node.status(pid, ppid)
80         self.assertTrue(status, RUNNING)
81
82         node.kill(pid, ppid)
83         status = node.status(pid, ppid)
84         self.assertTrue(status, FINISHED)
85
86         node.rmdir(self.home)
87
88     def t_install(self, node, target):
89         if not node.is_alive():
90             print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
91             return
92
93         node.mkdir(self.home, clean = True)
94
95         prog = """#include <stdio.h>
96
97 int
98 main (void)
99 {
100     printf ("Hello, world!\\n");
101     return 0;
102 }
103 """
104         dst = os.path.join(self.home, "hello.c")
105         node.upload(prog, dst)
106
107         node.install('gcc')
108
109         command = "cd %s; gcc -Wall hello.c -o hello" % self.home
110         out = node.execute(command)
111
112         command = "%s/hello" % self.home
113         out = node.execute(command)
114
115         node.uninstall('gcc')
116         node.rmdir(self.home)
117
118         self.assertEquals(out, "Hello, world!\n")
119
120     def test_execute_fedora(self):
121         self.t_execute(self.node_fedora, self.target)
122
123     def test_execute_ubuntu(self):
124         self.t_execute(self.node_ubuntu, self.target)
125
126     def test_run_fedora(self):
127         self.t_run(self.node_fedora, self.target)
128
129     def test_run_ubuntu(self):
130         self.t_run(self.node_ubuntu, self.target)
131
132     def test_intall_fedora(self):
133         self.t_install(self.node_fedora, self.target)
134
135     def test_install_ubuntu(self):
136         self.t_install(self.node_ubuntu, self.target)
137
138     def xtest_xterm_fedora(self):
139         """ PlanetLab doesn't currently support X11 forwarding.
140         Interactive test. Should not run automatically """
141         self.t_xterm(self.node_fedora, self.target)
142
143     def xtest_xterm_ubuntu(self):
144         """ Interactive test. Should not run automatically """
145         self.t_xterm(self.node_ubuntu, self.target)
146
147
148 if __name__ == '__main__':
149     unittest.main()
150