from test_utils import skipIfNotAlive, skipInteractive
import os
+import shutil
import time
import tempfile
import unittest
ec.shutdown()
+ @skipIfNotAlive
+ def t_copy_files(self, host, user):
+ # create some temp files and directories to copy
+ dirpath = tempfile.mkdtemp()
+ f = tempfile.NamedTemporaryFile(dir=dirpath, delete=False)
+ f.close()
+
+ f1 = tempfile.NamedTemporaryFile(delete=False)
+ f1.close()
+ f1.name
+
+ ec = ExperimentController(exp_id="test-copyfile")
+
+ node = ec.register_resource("LinuxNode")
+ ec.set(node, "hostname", host)
+ ec.set(node, "username", user)
+ ec.set(node, "cleanHome", True)
+ ec.set(node, "cleanProcesses", True)
+
+ app = ec.register_resource("LinuxApplication")
+ ec.set(app, "command", "ls ${SRC}")
+ ec.set(app, "sources", "%s;%s" % (dirpath, f1.name))
+ ec.register_connection(app, node)
+
+ ec.deploy()
+
+ ec.wait_finished([app])
+
+ stdout = ec.trace(app, "stdout")
+
+ self.assertTrue(stdout.find(os.path.basename(dirpath)) > -1)
+ self.assertTrue(stdout.find(os.path.basename(f1.name)) > -1)
+
+ ec.shutdown()
+
+ os.remove(f1.name)
+ shutil.rmtree(dirpath)
+
def test_stdout_fedora(self):
self.t_stdout(self.fedora_host, self.fedora_user)
""" Interactive test. Should not run automatically """
self.t_xterm(self.ubuntu_host, self.ubuntu_user)
+ def test_copy_files_fedora(self):
+ self.t_copy_files(self.fedora_host, self.fedora_user)
+
+ def test_copy_files_ubuntu(self):
+ self.t_copy_files(self.ubuntu_host, self.ubuntu_user)
if __name__ == '__main__':
unittest.main()