from nepi.resources.linux.node import LinuxNode
from nepi.resources.linux.application import LinuxApplication
-from test_utils import skipIfNotAlive
+from test_utils import skipIfNotAlive, skipInteractive
import os
import time
class LinuxApplicationTestCase(unittest.TestCase):
def setUp(self):
- self.fedora_host = "nepi5.pl.sophia.inria.fr"
+ self.fedora_host = "nepi2.pl.sophia.inria.fr"
self.fedora_user = "inria_nepi"
self.ubuntu_host = "roseval.pl.sophia.inria.fr"
self.ubuntu_user = "alina"
- self.target = "nepi3.pl.sophia.inria.fr"
+ self.target = "nepi5.pl.sophia.inria.fr"
@skipIfNotAlive
def t_stdout(self, host, user):
ec.deploy()
- ec.wait_finished([app])
+ ec.wait_finished(app)
self.assertTrue(ec.state(node) == ResourceState.STARTED)
self.assertTrue(ec.state(app) == ResourceState.FINISHED)
ec.deploy()
- ec.wait_finished([app])
+ ec.wait_finished(app)
self.assertTrue(ec.state(node) == ResourceState.STARTED)
self.assertTrue(ec.state(app) == ResourceState.FINISHED)
path = ec.trace(app, "stdout", attr = TraceAttr.PATH)
rm = ec.get_resource(app)
- p = os.path.join(rm.app_home, "stdout")
+ p = os.path.join(rm.run_home, "stdout")
self.assertEquals(path, p)
ec.shutdown()
+ @skipIfNotAlive
+ def t_code(self, host, user):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ prog = """#include <stdio.h>
+
+int
+main (void)
+{
+ printf ("Hello, world!\\n");
+ return 0;
+}
+"""
+ cmd = "${RUN_HOME}/hello"
+ build = "gcc -Wall -x c ${APP_HOME}/code -o hello"
+
+ app = ec.register_resource("LinuxApplication")
+ ec.set(app, "command", cmd)
+ ec.set(app, "code", prog)
+ ec.set(app, "depends", "gcc")
+ ec.set(app, "build", build)
+ ec.register_connection(app, node)
+
+ ec.deploy()
+
+ ec.wait_finished(app)
+
+ out = ec.trace(app, 'stdout')
+ self.assertEquals(out, "Hello, world!\n")
+
+ ec.shutdown()
+
@skipIfNotAlive
def t_concurrency(self, host, user):
from nepi.execution.resource import ResourceFactory
path = ec.trace(app, 'stdout', attr = TraceAttr.PATH)
rm = ec.get_resource(app)
- p = os.path.join(rm.app_home, 'stdout')
+ p = os.path.join(rm.run_home, 'stdout')
self.assertEquals(path, p)
ec.shutdown()
ec.shutdown()
+ @skipIfNotAlive
+ def t_xterm(self, host, user):
+ from nepi.execution.resource import ResourceFactory
+
+ ResourceFactory.register_type(LinuxNode)
+ ResourceFactory.register_type(LinuxApplication)
+
+ ec = ExperimentController()
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ app = ec.register_resource("LinuxApplication")
+ ec.set(app, "command", "xterm")
+ ec.set(app, "depends", "xterm")
+ ec.set(app, "forwardX11", True)
+
+ ec.register_connection(app, node)
+
+ ec.deploy()
+
+ ec.wait_finished([app])
+
+ self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+ ec.shutdown()
+
def test_stdout_fedora(self):
self.t_stdout(self.fedora_host, self.fedora_user)
def test_ping_ubuntu(self):
self.t_ping(self.ubuntu_host, self.ubuntu_user)
- def ztest_concurrency_fedora(self):
+ def test_concurrency_fedora(self):
self.t_concurrency(self.fedora_host, self.fedora_user)
- def ztest_concurrency_ubuntu(self):
+ def test_concurrency_ubuntu(self):
self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
def test_condition_fedora(self):
def test_http_sources_ubuntu(self):
self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
+ def test_code_fedora(self):
+ self.t_code(self.fedora_host, self.fedora_user)
+
+ def test_code_ubuntu(self):
+ self.t_code(self.ubuntu_host, self.ubuntu_user)
+
+ @skipInteractive
+ def test_xterm_ubuntu(self):
+ """ Interactive test. Should not run automatically """
+ self.t_xterm(self.ubuntu_host, self.ubuntu_user)
+
+
if __name__ == '__main__':
unittest.main()