2 from neco.resources.linux.node import LinuxNode
3 from neco.design.box import Box
4 from neco.util.sshfuncs import RUNNING, FINISHED
11 class DummyEC(object):
14 class LinuxBoxTestCase(unittest.TestCase):
16 host = 'nepi2.pl.sophia.inria.fr'
18 self.node_fedora = self.create_node(host, user)
20 host = 'roseval.pl.sophia.inria.fr'
22 self.node_ubuntu = self.create_node(host, user)
24 self.target = 'nepi5.pl.sophia.inria.fr'
25 self.home = '${HOME}/test-app'
27 def create_node(self, host, user):
31 node = LinuxNode(box, ec)
37 def t_xterm(self, node, target):
38 if not node.is_alive():
39 print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
42 node.enable_x11 = True
46 out = node.execute('xterm')
48 node.uninstall('xterm')
50 self.assertEquals(out, "")
52 def t_execute(self, node, target):
53 if not node.is_alive():
54 print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
57 command = "ping -qc3 %s" % target
58 out = node.execute(command)
60 expected = """3 packets transmitted, 3 received, 0% packet loss"""
62 self.assertTrue(out.find(expected) > 0)
64 def t_run(self, node, target):
65 if not node.is_alive():
66 print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
69 node.mkdir(self.home, clean = True)
71 command = "ping %s" % target
72 dst = os.path.join(self.home, "app.sh")
73 node.upload(command, dst)
76 node.run(cmd, self.home)
77 pid, ppid = node.checkpid(self.home)
79 status = node.status(pid, ppid)
80 self.assertTrue(status, RUNNING)
83 status = node.status(pid, ppid)
84 self.assertTrue(status, FINISHED)
88 def t_install(self, node, target):
89 if not node.is_alive():
90 print "*** WARNING: Skipping test: Node %s is not alive\n" % (node.host)
93 node.mkdir(self.home, clean = True)
95 prog = """#include <stdio.h>
100 printf ("Hello, world!\\n");
104 dst = os.path.join(self.home, "hello.c")
105 node.upload(prog, dst)
109 command = "cd %s; gcc -Wall hello.c -o hello" % self.home
110 out = node.execute(command)
112 command = "%s/hello" % self.home
113 out = node.execute(command)
115 node.uninstall('gcc')
116 node.rmdir(self.home)
118 self.assertEquals(out, "Hello, world!\n")
120 def test_execute_fedora(self):
121 self.t_execute(self.node_fedora, self.target)
123 def test_execute_ubuntu(self):
124 self.t_execute(self.node_ubuntu, self.target)
126 def test_run_fedora(self):
127 self.t_run(self.node_fedora, self.target)
129 def test_run_ubuntu(self):
130 self.t_run(self.node_ubuntu, self.target)
132 def test_intall_fedora(self):
133 self.t_install(self.node_fedora, self.target)
135 def test_install_ubuntu(self):
136 self.t_install(self.node_ubuntu, self.target)
138 def xtest_xterm_fedora(self):
139 """ PlanetLab doesn't currently support X11 forwarding.
140 Interactive test. Should not run automatically """
141 self.t_xterm(self.node_fedora, self.target)
143 def xtest_xterm_ubuntu(self):
144 """ Interactive test. Should not run automatically """
145 self.t_xterm(self.node_ubuntu, self.target)
148 if __name__ == '__main__':