From 1df0acb80ba1c737280390c1277a4a751843eac4 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Mon, 10 Jun 2013 17:36:50 -0700 Subject: [PATCH] Bugfixing LinuxNode and LinuxApplication --- examples/linux/ccnx/vlc_2_hosts.py | 59 +++++-- src/nepi/execution/attribute.py | 22 +-- src/nepi/execution/ec.py | 72 +++++--- src/nepi/execution/resource.py | 11 +- src/nepi/execution/trace.py | 4 +- src/nepi/resources/linux/application.py | 153 ++++++++-------- src/nepi/resources/linux/node.py | 224 ++++++++++++++++-------- src/nepi/resources/linux/rpmfuncs.py | 4 +- src/nepi/util/execfuncs.py | 10 +- src/nepi/util/sshfuncs.py | 151 ++++++++-------- test/execution/ec.py | 2 +- test/resources/linux/application.py | 40 ++--- test/resources/linux/node.py | 162 +++++++++++++---- test/util/sshfuncs.py | 10 +- 14 files changed, 576 insertions(+), 348 deletions(-) diff --git a/examples/linux/ccnx/vlc_2_hosts.py b/examples/linux/ccnx/vlc_2_hosts.py index 0291d03f..f880fca6 100755 --- a/examples/linux/ccnx/vlc_2_hosts.py +++ b/examples/linux/ccnx/vlc_2_hosts.py @@ -28,11 +28,12 @@ from optparse import OptionParser, SUPPRESS_HELP import os import time -def add_node(ec, host, user): +def add_node(ec, host, user, ssh_key = None): node = ec.register_resource("LinuxNode") ec.set(node, "hostname", host) ec.set(node, "username", user) - #ec.set(node, "cleanHome", True) + ec.set(node, "identity", ssh_key) + ec.set(node, "cleanHome", True) ec.set(node, "cleanProcesses", True) return node @@ -40,18 +41,18 @@ def add_ccnd(ec, os_type, peers): if os_type == "f12": depends = ( " autoconf openssl-devel expat-devel libpcap-devel " " ecryptfs-utils-devel libxml2-devel automake gawk " - " gcc gcc-c++ git pcre-devel ") + " gcc gcc-c++ git pcre-devel make ") elif os_type == "ubuntu": depends = ( " autoconf libssl-dev libexpat-dev libpcap-dev " " libecryptfs0 libxml2-utils automake gawk gcc g++ " - " git-core pkg-config libpcre3-dev ") + " git-core pkg-config libpcre3-dev make ") sources = "http://www.ccnx.org/releases/ccnx-0.7.1.tar.gz" build = ( # Evaluate if ccnx binaries are already installed " ( " - " test -d ${EXP_HOME}/ccnx/bin" + " test -f ${EXP_HOME}/ccnx/bin/ccnd" " ) || ( " # If not, untar and build " ( " @@ -66,7 +67,7 @@ def add_ccnd(ec, os_type, peers): install = ( # Evaluate if ccnx binaries are already installed " ( " - " test -d ${EXP_HOME}/ccnx/bin " + " test -f ${EXP_HOME}/ccnx/bin/ccnd" " ) || ( " " mkdir -p ${EXP_HOME}/ccnx/bin && " " cp -r ${SOURCES}/ccnx ${EXP_HOME}" @@ -75,11 +76,11 @@ def add_ccnd(ec, os_type, peers): env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" - # BASH command -> ' ccndstart 2>&1 ; ccndc add ccnx:/ udp host ; ccnr 2>&1 ' - command = "ccndstart 2>&1 ; " + # BASH command -> ' ccndstart ; ccndc add ccnx:/ udp host ; ccnr ' + command = "ccndstart ; " peers = map(lambda peer: "ccndc add ccnx:/ udp %s" % peer, peers) command += " ; ".join(peers) + " ; " - command += " ccnr 2>&1 " + command += " ccnr " app = ec.register_resource("LinuxApplication") ec.set(app, "depends", depends) @@ -104,7 +105,7 @@ def add_publish(ec, movie): def add_stream(ec): env = "PATH=$PATH:${EXP_HOME}/ccnx/bin" - command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) 2>&1" + command = "sudo -S dbus-uuidgen --ensure ; ( ccncat ccnx:/VIDEO | vlc - ) " app = ec.register_resource("LinuxApplication") ec.set(app, "depends", "vlc") @@ -117,37 +118,56 @@ def add_stream(ec): def get_options(): slicename = os.environ.get("PL_SLICE") - usage = "usage: %prog -s -u -m -l " + # We use a specific SSH private key for PL if the PL_SSHKEY is specified or the + # id_rsa_planetlab exists + default_key = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME']) + default_key = default_key if os.path.exists(default_key) else None + pl_ssh_key = os.environ.get("PL_SSHKEY", default_key) + + usage = "usage: %prog -s -u -m -l -i " parser = OptionParser(usage=usage) parser.add_option("-s", "--pl-slice", dest="pl_slice", - help="PlanetLab slicename", default=slicename, type="str") - parser.add_option("-u", "--user-2", dest="user2", - help="User for non PlanetLab machine", type="str") + help="PlanetLab slicename", default = slicename, type="str") + parser.add_option("-u", "--username", dest="username", + help="User for extra host (non PlanetLab)", type="str") parser.add_option("-m", "--movie", dest="movie", help="Stream movie", type="str") parser.add_option("-l", "--exp-id", dest="exp_id", help="Label to identify experiment", type="str") + parser.add_option("-i", "--pl-ssh-key", dest="pl_ssh_key", + help="Path to private SSH key to be used for connection", + default = pl_ssh_key, type="str") (options, args) = parser.parse_args() if not options.movie: parser.error("movie is a required argument") - return (options.pl_slice, options.user2, options.movie, options.exp_id) + return (options.pl_slice, options.username, options.movie, options.exp_id, + options.pl_ssh_key) if __name__ == '__main__': - ( pl_slice, user2, movie, exp_id ) = get_options() + ( pl_slice, username, movie, exp_id, pl_ssh_key ) = get_options() # Search for available RMs populate_factory() + # PlanetLab node host1 = 'planetlab2.u-strasbg.fr' + + # Another node + # IMPORTANT NOTE: you must replace this host for another one + # you have access to. You must set up your SSH keys so + # the host can be accessed through SSH without prompting + # for a password. The host must allow X forwarding using SSH. host2 = 'roseval.pl.sophia.inria.fr' + # Create the ExperimentController instance ec = ExperimentController(exp_id = exp_id) - node1 = add_node(ec, host1, pl_slice) + # Register a ResourceManager (RM) for the PlanetLab node + node1 = add_node(ec, host1, pl_slice, pl_ssh_key) peers = [host2] ccnd1 = add_ccnd(ec, "f12", peers) @@ -161,7 +181,7 @@ if __name__ == '__main__': ec.register_condition(pub, ResourceAction.START, ccnd1, ResourceState.STARTED) - node2 = add_node(ec, host2, user2) + node2 = add_node(ec, host2, username) peers = [host1] ccnd2 = add_ccnd(ec, "ubuntu", peers) ec.register_connection(ccnd2, node2) @@ -177,10 +197,13 @@ if __name__ == '__main__': ec.register_condition(stream, ResourceAction.START, pub, ResourceState.STARTED) + # Deploy all ResourceManagers ec.deploy() + # Wait until the applications are finished apps = [ccnd1, pub, ccnd2, stream] ec.wait_finished(apps) + # Shutdown the experiment controller ec.shutdown() diff --git a/src/nepi/execution/attribute.py b/src/nepi/execution/attribute.py index 3d46edc4..c6b973bc 100644 --- a/src/nepi/execution/attribute.py +++ b/src/nepi/execution/attribute.py @@ -79,53 +79,53 @@ class Attribute(object): @property def name(self): - """ Returns the name of the attribute """ + """ Returns the name of the attribute """ return self._name @property def default(self): - """ Returns the default value of the attribute """ + """ Returns the default value of the attribute """ return self._default @property def type(self): - """ Returns the type of the attribute """ + """ Returns the type of the attribute """ return self._type @property def help(self): - """ Returns the help of the attribute """ + """ Returns the help of the attribute """ return self._help @property def flags(self): - """ Returns the flags of the attribute """ + """ Returns the flags of the attribute """ return self._flags @property def allowed(self): - """ Returns the allowed value for this attribute """ + """ Returns the allowed value for this attribute """ return self._allowed @property def range(self): - """ Returns the range of the attribute """ + """ Returns the range of the attribute """ return self._range def has_flag(self, flag): - """ Returns true if the attribute has the flag 'flag' + """ Returns true if the attribute has the flag 'flag' :param flag: Flag that need to be ckecked :type flag: Flags - """ + """ return (self._flags & flag) == flag def get_value(self): - """ Returns the value of the attribute """ + """ Returns the value of the attribute """ return self._value def set_value(self, value): - """ Change the value of the attribute after checking the type """ + """ Change the value of the attribute after checking the type """ valid = True if self.type == Types.Enumerate: diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 34c5c50b..ba573c49 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -439,6 +439,9 @@ class ExperimentController(object): if not group: group = self.resources + if isinstance(group, int): + group = [group] + # Before starting deployment we disorder the group list with the # purpose of speeding up the whole deployment process. # It is likely that the user inserted in the 'group' list closely @@ -450,7 +453,7 @@ class ExperimentController(object): # same conditions (e.g. LinuxApplications running on a same # node share a single lock, so they will tend to be serialized). # If we disorder the group list, this problem can be mitigated. - random.shuffle(group) + #random.shuffle(group) def wait_all_and_start(group): reschedule = False @@ -467,7 +470,7 @@ class ExperimentController(object): # If all resources are read, we schedule the start for guid in group: rm = self.get_resource(guid) - self.schedule("0.01s", rm.start_with_conditions) + self.schedule("0s", rm.start_with_conditions) if wait_all_ready: # Schedule the function that will check all resources are @@ -479,7 +482,7 @@ class ExperimentController(object): for guid in group: rm = self.get_resource(guid) - self.schedule("0.001s", rm.deploy) + self.schedule("0s", rm.deploy) if not wait_all_ready: self.schedule("1s", rm.start_with_conditions) @@ -518,12 +521,16 @@ class ExperimentController(object): def shutdown(self): """ Shutdown the Experiment Controller. - It means : Release all the resources and stop the scheduler + Releases all the resources and stops task processing thread """ self.release() - self._stop_scheduler() + # Mark the EC state as TERMINATED + self._state = ECState.TERMINATED + + # Notify condition to wake up the processing thread + self._notify() if self._thread.is_alive(): self._thread.join() @@ -554,18 +561,28 @@ class ExperimentController(object): self._tasks[task.id] = task # Notify condition to wake up the processing thread - self._cond.acquire() - self._cond.notify() - self._cond.release() + self._notify() return task.id def _process(self): - """ Process at executing the task that are in the scheduler. + """ Process scheduled tasks. + + The _process method is executed in an independent thread held by the + ExperimentController for as long as the experiment is running. + + Tasks are scheduled by invoking the schedule method with a target callback. + The schedule method is givedn a execution time which controls the + order in which tasks are processed. + + Tasks are processed in parallel using multithreading. + The environmental variable NEPI_NTHREADS can be used to control + the number of threads used to process tasks. The default value is 50. """ + nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) - runner = ParallelRun(maxthreads = 50) + runner = ParallelRun(maxthreads = nthreads) runner.start() try: @@ -602,18 +619,18 @@ class ExperimentController(object): self._logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED - - # Mark EC state as terminated - if self.ecstate == ECState.RUNNING: - # Synchronize to get errors if occurred + finally: runner.sync() - self._state = ECState.TERMINATED def _execute(self, task): - """ Invoke the callback of the task 'task' + """ Executes a single task. + + If the invokation of the task callback raises an + exception, the processing thread of the ExperimentController + will be stopped and the experiment will be aborted. - :param task: Id of the task - :type task: int + :param task: Object containing the callback to execute + :type task: Task """ # Invoke callback @@ -629,22 +646,21 @@ class ExperimentController(object): self._logger.error("Error occurred while executing task: %s" % err) - self._stop_scheduler() + # Set the EC to FAILED state (this will force to exit the task + # processing thread) + self._state = ECState.FAILED + + # Notify condition to wake up the processing thread + self._notify() # Propage error to the ParallelRunner raise - def _stop_scheduler(self): - """ Stop the scheduler and put the EC into a FAILED State. - + def _notify(self): + """ Awakes the processing thread in case it is blocked waiting + for a new task to be scheduled. """ - - # Mark the EC as failed - self._state = ECState.FAILED - - # Wake up the EC in case it was sleeping self._cond.acquire() self._cond.notify() self._cond.release() - diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 423ca926..078b07d1 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -627,17 +627,16 @@ class ResourceFactory(object): return rclass(ec, guid) def populate_factory(): - """Register all the possible RM that exists in the current version of Nepi. - - """ + """Register all the possible RM that exists in the current version of Nepi. + """ for rclass in find_types(): ResourceFactory.register_type(rclass) def find_types(): - """Look into the different folders to find all the - availables Resources Managers + """Look into the different folders to find all the + availables Resources Managers - """ + """ search_path = os.environ.get("NEPI_SEARCH_PATH", "") search_path = set(search_path.split(" ")) diff --git a/src/nepi/execution/trace.py b/src/nepi/execution/trace.py index 1d16242e..3b22eaf3 100644 --- a/src/nepi/execution/trace.py +++ b/src/nepi/execution/trace.py @@ -45,11 +45,11 @@ class Trace(object): @property def name(self): - """ Returns the name of the trace """ + """ Returns the name of the trace """ return self._name @property def help(self): - """ Returns the help of the trace """ + """ Returns the help of the trace """ return self._help diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 080c3a40..5f4c998a 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -21,16 +21,13 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.resources.linux.node import LinuxNode -from nepi.util import sshfuncs +from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import strfnow, strfdiff import os -reschedule_delay = "0.5s" -state_check_delay = 1 - # TODO: Resolve wildcards in commands!! -# TODO: If command is not set give a warning but do not generate an error! + @clsinit class LinuxApplication(ResourceManager): @@ -215,25 +212,21 @@ class LinuxApplication(ResourceManager): # Install self.install() + # Upload command command = self.get("command") x11 = self.get("forwardX11") - if not x11 and command: + env = self.get("env") + + if command and not x11: self.info("Uploading command '%s'" % command) - # Export environment - environ = "" - if self.get("env"): - for var in self.get("env").split(" "): - environ += 'export %s\n' % var - - command = environ + command - - # If the command runs asynchronous, pre upload the command - # to the app.sh file in the remote host - dst = os.path.join(self.app_home, "app.sh") + # replace application specific paths in the command command = self.replace_paths(command) - self.node.upload(command, dst, text = True) + self.node.upload_command(command, self.app_home, + shfile = "app.sh", + env = env) + super(LinuxApplication, self).provision() def upload_sources(self): @@ -253,25 +246,29 @@ class LinuxApplication(ResourceManager): http_sources.append(source) sources.remove(source) - # Download http sources + # Download http sources remotely if http_sources: - cmd = " wget -c --directory-prefix=${SOURCES} " - verif = "" + command = " wget -c --directory-prefix=${SOURCES} " + check = "" for source in http_sources: - cmd += " %s " % (source) - verif += " ls ${SOURCES}/%s ;" % os.path.basename(source) + command += " %s " % (source) + check += " ls ${SOURCES}/%s ;" % os.path.basename(source) - # Wget output goes to stderr :S - cmd += " 2> /dev/null ; " - - # Add verification - cmd += " %s " % verif + # Append the command to check that the sources were downloaded + command += " ; %s " % check + # replace application specific paths in the command + command = self.replace_paths(command) + # Upload the command to a file, and execute asynchronously - self.upload_and_run(cmd, - "http_sources.sh", "http_sources_pid", - "http_sources_out", "http_sources_err") + self.node.run_and_wait(command, self.app_home, + shfile = "http_sources.sh", + pidfile = "http_sources_pidfile", + ecodefile = "http_sources_exitcode", + stdout = "http_sources_stdout", + stderr = "http_sources_stderr") + if sources: self.node.upload(sources, self.src_dir) @@ -299,7 +296,7 @@ class LinuxApplication(ResourceManager): depends = self.get("depends") if depends: self.info(" Installing dependencies %s" % depends) - self.node.install_packages(depends, home = self.app_home) + self.node.install_packages(depends, self.app_home) def build(self): build = self.get("build") @@ -309,26 +306,40 @@ class LinuxApplication(ResourceManager): # create dir for build self.node.mkdir(self.build_dir) + # replace application specific paths in the command + command = self.replace_paths(command) + # Upload the command to a file, and execute asynchronously - self.upload_and_run(build, - "build.sh", "build_pid", - "build_out", "build_err") + self.node.run_and_wait(command, self.app_home, + shfile = "build.sh", + pidfile = "build_pidfile", + ecodefile = "build_exitcode", + stdout = "build_stdout", + stderr = "build_stderr") def install(self): install = self.get("install") if install: self.info(" Installing sources ") + # replace application specific paths in the command + command = self.replace_paths(command) + # Upload the command to a file, and execute asynchronously - self.upload_and_run(install, - "install.sh", "install_pid", - "install_out", "install_err") + self.node.run_and_wait(command, self.app_home, + shfile = "install.sh", + pidfile = "install_pidfile", + ecodefile = "install_exitcode", + stdout = "install_stdout", + stderr = "install_stderr") def deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) + + reschedule_delay = "0.5s" self.ec.schedule(reschedule_delay, self.deploy) else: try: @@ -352,16 +363,25 @@ class LinuxApplication(ResourceManager): x11 = self.get('forwardX11') or False failed = False - super(LinuxApplication, self).start() - if not command: - self.info("No command to start ") + # If no command was given, then the application + # is directly marked as FINISHED self._state = ResourceState.FINISHED - return + else: + super(LinuxApplication, self).start() self.info("Starting command '%s'" % command) if x11: + # If X11 forwarding was specified, then the application + # can not run detached, so instead of invoking asynchronous + # 'run' we invoke synchronous 'execute'. + if not command: + msg = "No command is defined but X11 forwarding has been set" + self.error(msg) + self._state = ResourceState.FAILED + raise RuntimeError, msg + if env: # Export environment environ = "" @@ -392,28 +412,22 @@ class LinuxApplication(ResourceManager): stderr = stderr, sudo = sudo) + # check if execution errors occurred + msg = " Failed to start command '%s' " % command + if proc.poll() and err: - failed = True + self.error(msg, out, err) + raise RuntimeError, msg - if not failed: - pid, ppid = self.node.wait_pid(home = self.app_home) - if pid: self._pid = int(pid) - if ppid: self._ppid = int(ppid) + # Check status of process running in background + pid, ppid = self.node.wait_pid(self.app_home) + if pid: self._pid = int(pid) + if ppid: self._ppid = int(ppid) + # If the process is not running, check for error information + # on the remote machine if not self.pid or not self.ppid: - failed = True - - (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr') - - if failed or out or chkerr: - # check if execution errors occurred - msg = " Failed to start command '%s' " % command - out = out - if err: - err = err - elif chkerr: - err = chkerr - + (out, err), proc = self.node.check_output(self.app_home, 'stderr') self.error(msg, out, err) msg2 = " Setting state to Failed" @@ -456,10 +470,11 @@ class LinuxApplication(ResourceManager): if self._state == ResourceState.STARTED: # To avoid overwhelming the remote hosts and the local processor # with too many ssh queries, the state is only requested - # every 'state_check_delay' . + # every 'state_check_delay' seconds. + state_check_delay = 0.5 if strfdiff(strfnow(), self._last_state_check) > state_check_delay: # check if execution errors occurred - (out, err), proc = self.node.check_output(self.app_home, 'stderr') + (out, err), proc = self.node.check_errors(self.app_home) if out or err: if err.find("No such file or directory") >= 0 : @@ -474,7 +489,7 @@ class LinuxApplication(ResourceManager): elif self.pid and self.ppid: status = self.node.status(self.pid, self.ppid) - if status == sshfuncs.FINISHED: + if status == ProcStatus.FINISHED: self._state = ResourceState.FINISHED @@ -482,18 +497,6 @@ class LinuxApplication(ResourceManager): return self._state - def upload_and_run(self, cmd, fname, pidfile, outfile, errfile): - dst = os.path.join(self.app_home, fname) - cmd = self.replace_paths(cmd) - self.node.upload(cmd, dst, text = True) - - cmd = "bash ./%s" % fname - (out, err), proc = self.node.run_and_wait(cmd, self.app_home, - pidfile = pidfile, - stdout = outfile, - stderr = errfile, - raise_on_error = True) - def replace_paths(self, command): """ Replace all special path tags with shell-escaped actual paths. diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 8387defc..0f3a01cb 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -20,7 +20,8 @@ from nepi.execution.attribute import Attribute, Flags from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.resources.linux import rpmfuncs, debfuncs -from nepi.util import sshfuncs, execfuncs +from nepi.util import sshfuncs, execfuncs +from nepi.util.sshfuncs import ProcStatus import collections import os @@ -37,6 +38,15 @@ import threading reschedule_delay = "0.5s" +class ExitCode: + """ + Error codes that the rexitcode function can return if unable to + check the exit code of a spawned process + """ + FILENOTFOUND = -1 + CORRUPTFILE = -2 + ERROR = -3 + OK = 0 @clsinit class LinuxNode(ResourceManager): @@ -264,46 +274,46 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages, home = None): - home = home or self.node_home - - cmd = "" + def install_packages(self, packages, home): + command = "" if self.os in ["f12", "f14"]: - cmd = rpmfuncs.install_packages_command(self.os, packages) + command = rpmfuncs.install_packages_command(self.os, packages) elif self.os in ["debian", "ubuntu"]: - cmd = debfuncs.install_packages_command(self.os, packages) + command = debfuncs.install_packages_command(self.os, packages) else: msg = "Error installing packages ( OS not known ) " self.error(msg, self.os) raise RuntimeError, msg out = err = "" - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "instpkg_pid", - stdout = "instpkg_out", - stderr = "instpkg_err", + (out, err), proc = self.run_and_wait(command, home, + shfile = "instpkg.sh", + pidfile = "instpkg_pidfile", + ecodefile = "instpkg_exitcode", + stdout = "instpkg_stdout", + stderr = "instpkg_stderr", raise_on_error = True) return (out, err), proc - def remove_packages(self, packages, home = None): - home = home or self.node_home - - cmd = "" + def remove_packages(self, packages, home): + command = "" if self.os in ["f12", "f14"]: - cmd = rpmfuncs.remove_packages_command(self.os, packages) + command = rpmfuncs.remove_packages_command(self.os, packages) elif self.os in ["debian", "ubuntu"]: - cmd = debfuncs.remove_packages_command(self.os, packages) + command = debfuncs.remove_packages_command(self.os, packages) else: msg = "Error removing packages ( OS not known ) " self.error(msg) raise RuntimeError, msg out = err = "" - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "rmpkg_pid", - stdout = "rmpkg_out", - stderr = "rmpkg_err", + (out, err), proc = self.run_and_wait(command, home, + shfile = "rmpkg.sh", + pidfile = "rmpkg_pidfile", + ecodefile = "rmpkg_exitcode", + stdout = "rmpkg_stdout", + stderr = "rmpkg_stderr", raise_on_error = True) return (out, err), proc @@ -316,22 +326,27 @@ class LinuxNode(ResourceManager): def rmdir(self, path): return self.execute("rm -rf %s" % path, with_lock = True) - - def run_and_wait(self, command, - home = ".", - pidfile = "pid", + + def run_and_wait(self, command, home, + shfile = "cmd.sh", + pidfile = "pidfile", + ecodefile = "exitcode", stdin = None, - stdout = 'stdout', - stderr = 'stderr', + stdout = "stdout", + stderr = "stderr", sudo = False, tty = False, raise_on_error = False): - """ runs a command in background on the remote host, but waits - until the command finishes execution. - This is more robust than doing a simple synchronized 'execute', - since in the remote host the command can continue to run detached - even if network disconnections occur + """ + runs a command in background on the remote host, busy-waiting + until the command finishes execution. + This is more robust than doing a simple synchronized 'execute', + since in the remote host the command can continue to run detached + even if network disconnections occur """ + self.upload_command(command, home, shfile, ecodefile) + + command = "bash ./%s" % shfile # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, @@ -356,11 +371,11 @@ class LinuxNode(ResourceManager): # wait until command finishes to execute self.wait_run(pid, ppid) - - # check if execution errors occurred - (out, err), proc = self.check_output(home, stderr) + + (out, err), proc = self.check_errors(home, ecodefile, stderr) - if err or out: + # Out is what was written in the stderr file + if out or err: msg = " Failed to run command '%s' " % command self.error(msg, out, err) @@ -368,21 +383,93 @@ class LinuxNode(ResourceManager): raise RuntimeError, msg return (out, err), proc + + def exitcode(self, home, ecodefile = "exitcode"): + """ + Get the exit code of an application. + Returns an integer value with the exit code + """ + (out, err), proc = self.check_output(home, ecodefile) + + # Succeeded to open file, return exit code in the file + if proc.wait() == 0: + try: + return int(out.strip()) + except: + # Error in the content of the file! + return ExitCode.CORRUPTFILE + + # No such file or directory + if proc.returncode == 1: + return ExitCode.FILENOTFOUND + + # Other error from 'cat' + return ExitCode.ERROR + + def upload_command(self, command, home, + shfile = "cmd.sh", + ecodefile = "exitcode", + env = None): + + command = "{ ( %(command)s ) ; } ; echo $? > %(ecodefile)s " % { + 'command': command, + 'ecodefile': ecodefile, + } + + # Export environment + environ = "" + if env: + for var in env.split(" "): + environ += 'export %s\n' % var + + command = environ + command + + dst = os.path.join(home, shfile) + return self.upload(command, dst, text = True) + + def check_errors(self, home, + ecodefile = "exitcode", + stderr = "stderr"): + """ + Checks whether errors occurred while running a command. + It first checks the exit code for the command, and only if the + exit code is an error one it returns the error output. + """ + out = err = "" + proc = None + + # get Exit code + ecode = self.exitcode(home, ecodefile) + + if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]: + err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile) + elif ecode > 0 or ecode == ExitCode.FILENOTFOUND: + # The process returned an error code or didn't exist. + # Check standard error. + (out, err), proc = self.check_output(home, stderr) + + # If the stderr file was not found, assume nothing happened. + # We just ignore the error. + if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: # cat - No such file or directory + err = "" + + return (out, err), proc - def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False): + def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): """ Waits until the pid file for the command is generated, and returns the pid and ppid of the process """ pid = ppid = None delay = 1.0 - for i in xrange(5): - pidtuple = self.checkpid(home = home, pidfile = pidfile) + + for i in xrange(4): + pidtuple = self.getpid(home = home, pidfile = pidfile) if pidtuple: pid, ppid = pidtuple break else: time.sleep(delay) - delay = min(30,delay*1.2) + delay = delay * 1.5 else: msg = " Failed to get pid for pidfile %s/%s " % ( home, pidfile ) @@ -395,30 +482,26 @@ class LinuxNode(ResourceManager): def wait_run(self, pid, ppid, trial = 0): """ wait for a remote process to finish execution """ - delay = 1.0 - first = True - bustspin = 0 + start_delay = 1.0 while True: status = self.status(pid, ppid) - if status is sshfuncs.FINISHED: + if status is ProcStatus.FINISHED: break - elif status is not sshfuncs.RUNNING: - bustspin += 1 - time.sleep(delay*(5.5+random.random())) - if bustspin > 12: + elif status is not ProcStatus.RUNNING: + delay = delay * 1.5 + time.sleep(delay) + # If it takes more than 20 seconds to start, then + # asume something went wrong + if delay > 20: break else: - if first: - first = False - - time.sleep(delay*(0.5+random.random())) - delay = min(30,delay*1.2) - bustspin = 0 + # The app is running, just wait... + time.sleep(0.5) def check_output(self, home, filename): - """ checks file content """ + """ Retrives content of file """ (out, err), proc = self.execute("cat %s" % os.path.join(home, filename), retry = 1, with_lock = True) return (out, err), proc @@ -448,7 +531,7 @@ class LinuxNode(ResourceManager): def copy(self, src, dst): if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, + (out, err), proc = execfuncs.lcopy(source, dest, recursive = True, strict_host_checking = False) else: @@ -533,16 +616,15 @@ class LinuxNode(ResourceManager): return (out, err), proc - def run(self, command, - home = None, + def run(self, command, home, create_home = False, - pidfile = "pid", + pidfile = 'pidfile', stdin = None, stdout = 'stdout', stderr = 'stderr', sudo = False, tty = False): - + self.debug("Running command '%s'" % command) if self.localhost: @@ -555,10 +637,8 @@ class LinuxNode(ResourceManager): sudo = sudo, user = user) else: - # Start process in a "daemonized" way, using nohup and heavy - # stdin/out redirection to avoid connection issues with self._lock: - (out,err), proc = sshfuncs.rspawn( + (out, err), proc = sshfuncs.rspawn( command, pidfile = pidfile, home = home, @@ -578,12 +658,12 @@ class LinuxNode(ResourceManager): return (out, err), proc - def checkpid(self, home = ".", pidfile = "pid"): + def getpid(self, home, pidfile = "pidfile"): if self.localhost: - pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile)) + pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) else: with self._lock: - pidtuple = sshfuncs.rcheckpid( + pidtuple = sshfuncs.rgetpid( os.path.join(home, pidfile), host = self.get("hostname"), user = self.get("username"), @@ -594,7 +674,7 @@ class LinuxNode(ResourceManager): ) return pidtuple - + def status(self, pid, ppid): if self.localhost: status = execfuncs.lstatus(pid, ppid) @@ -617,7 +697,7 @@ class LinuxNode(ResourceManager): proc = None status = self.status(pid, ppid) - if status == sshfuncs.RUNNING: + if status == sshfuncs.ProcStatus.RUNNING: if self.localhost: (out, err), proc = execfuncs.lkill(pid, ppid, sudo) else: @@ -632,12 +712,6 @@ class LinuxNode(ResourceManager): identity = self.get("identity"), server_key = self.get("serverKey") ) - return (out, err), proc - def check_bad_host(self, out, err): - badre = re.compile(r'(?:' - r'|Error: disk I/O error' - r')', - re.I) - return badre.search(out) or badre.search(err) + return (out, err), proc diff --git a/src/nepi/resources/linux/rpmfuncs.py b/src/nepi/resources/linux/rpmfuncs.py index 13941bd2..d832fb6e 100644 --- a/src/nepi/resources/linux/rpmfuncs.py +++ b/src/nepi/resources/linux/rpmfuncs.py @@ -29,7 +29,7 @@ def install_packages_command(os, packages): cmd = "( %s )" % install_rpmfusion_command(os) for p in packages: cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % { - 'package': p} + 'package': p} #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim)) return " ( %s )" % cmd @@ -42,7 +42,7 @@ def remove_packages_command(os, packages): for p in packages: cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % { 'package': p} - + #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...) return cmd diff --git a/src/nepi/util/execfuncs.py b/src/nepi/util/execfuncs.py index 773465cf..65cea4ec 100644 --- a/src/nepi/util/execfuncs.py +++ b/src/nepi/util/execfuncs.py @@ -17,7 +17,7 @@ # # Author: Alina Quereilhac -from nepi.util.sshfuncs import RUNNING, FINISHED, NOT_STARTED, STDOUT +from nepi.util.sshfuncs import ProcStatus, STDOUT import subprocess @@ -134,7 +134,7 @@ def lspawn(command, pidfile, return (out,err),proc -def lcheckpid(pidfile): +def lgetpid(pidfile): """ Check the pidfile of a process spawned with remote_spawn. @@ -179,14 +179,14 @@ def lstatus(pid, ppid): }) if proc.wait(): - return NOT_STARTED + return ProcStatus.NOT_STARTED status = False if out: status = (out.strip() == 'wait') else: - return NOT_STARTED - return RUNNING if status else FINISHED + return ProcStatus.NOT_STARTED + return ProcStatus.RUNNING if status else ProcStatus.FINISHED def lkill(pid, ppid, sudo = False): diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index a88dc78a..50f5d7c8 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -57,20 +57,18 @@ class STDOUT: redirect to whatever stdout was redirected to. """ -class RUNNING: +class ProcStatus: """ - Process is still running + Codes for status of remote spawned process """ + # Process is still running + RUNNING = 1 -class FINISHED: - """ - Process is finished - """ - -class NOT_STARTED: - """ - Process hasn't started running yet (this should be very rare) - """ + # Process is finished + FINISHED = 2 + + # Process hasn't started running yet (this should be very rare) + NOT_STARTED = 3 hostbyname_cache = dict() hostbyname_cache_lock = threading.Lock() @@ -511,6 +509,9 @@ def rcopy(source, dest, tmp_known_hosts = None args = ['scp', '-q', '-p', '-C', + # Speed up transfer using blowfish cypher specification which is + # faster than the default one (3des) + '-c', 'blowfish', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', '-o', 'ConnectTimeout=60', @@ -588,7 +589,7 @@ def rcopy(source, dest, def rspawn(command, pidfile, stdout = '/dev/null', stderr = STDOUT, - stdin = '/dev/null', + stdin = '/dev/null', home = None, create_home = False, sudo = False, @@ -600,28 +601,41 @@ def rspawn(command, pidfile, server_key = None, tty = False): """ - Spawn a remote command such that it will continue working asynchronously. - - Parameters: - command: the command to run - it should be a single line. - - pidfile: path of a (ideally unique to this task) pidfile for tracking the process. - - stdout: path of a file to redirect standard output to - must be a string. - Defaults to /dev/null - stderr: path of a file to redirect standard error to - string or the special STDOUT value - to redirect to the same file stdout was redirected to. Defaults to STDOUT. - stdin: path of a file with input to be piped into the command's standard input + Spawn a remote command such that it will continue working asynchronously in + background. + + :param command: The command to run, it should be a single line. + :type command: str + + :param pidfile: Path to a file where to store the pid and ppid of the + spawned process + :type pidfile: str + + :param stdout: Path to file to redirect standard output. + The default value is /dev/null + :type stdout: str + + :param stderr: Path to file to redirect standard error. + If the special STDOUT value is used, stderr will + be redirected to the same file as stdout + :type stderr: str + + :param stdin: Path to a file with input to be piped into the command's standard input + :type stdin: str + + :param home: Path to working directory folder. + It is assumed to exist unless the create_home flag is set. + :type home: str + + :param create_home: Flag to force creation of the home folder before + running the command + :type create_home: bool + + :param sudo: Flag forcing execution with sudo user + :type sudo: bool - home: path of a folder to use as working directory - should exist, unless you specify create_home - - create_home: if True, the home folder will be created first with mkdir -p - - sudo: whether the command needs to be executed as root - - host/port/user/agent/identity: see rexec - - Returns: + :rtype: touple + (stdout, stderr), process Of the spawning process, which only captures errors at spawning time. @@ -667,7 +681,7 @@ def rspawn(command, pidfile, return ((out, err), proc) @eintr_retry -def rcheckpid(pidfile, +def rgetpid(pidfile, host = None, port = None, user = None, @@ -675,19 +689,21 @@ def rcheckpid(pidfile, identity = None, server_key = None): """ - Check the pidfile of a process spawned with remote_spawn. - - Parameters: - pidfile: the pidfile passed to remote_span + Returns the pid and ppid of a process from a remote file where the + information was stored. + + :param home: Path to directory where the pidfile is located + :type home: str + + :param pidfile: Name of file containing the pid information + :type pidfile: str - host/port/user/agent/identity: see rexec - - Returns: + :rtype: int - A (pid, ppid) tuple useful for calling remote_status and remote_kill, - or None if the pidfile isn't valid yet (maybe the process is still starting). - """ + A (pid, ppid) tuple useful for calling rstatus and rkill, + or None if the pidfile isn't valid yet (can happen when process is staring up) + """ (out,err),proc = rexec( "cat %(pidfile)s" % { 'pidfile' : pidfile, @@ -719,18 +735,17 @@ def rstatus(pid, ppid, identity = None, server_key = None): """ - Check the status of a process spawned with remote_spawn. + Returns a code representing the the status of a remote process + + :param pid: Process id of the process + :type pid: int + + :param ppid: Parent process id of process + :type ppid: int - Parameters: - pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid - - host/port/user/agent/identity: see rexec + :rtype: int (One of NOT_STARTED, RUNNING, FINISHED) - Returns: - - One of NOT_STARTED, RUNNING, FINISHED """ - (out,err),proc = rexec( # Check only by pid. pid+ppid does not always work (especially with sudo) " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % { @@ -746,7 +761,7 @@ def rstatus(pid, ppid, ) if proc.wait(): - return NOT_STARTED + return ProcStatus.NOT_STARTED status = False if err: @@ -755,8 +770,8 @@ def rstatus(pid, ppid, elif out: status = (out.strip() == 'wait') else: - return NOT_STARTED - return RUNNING if status else FINISHED + return ProcStatus.NOT_STARTED + return ProcStatus.RUNNING if status else ProcStatus.FINISHED @eintr_retry def rkill(pid, ppid, @@ -769,23 +784,21 @@ def rkill(pid, ppid, server_key = None, nowait = False): """ - Kill a process spawned with remote_spawn. - + Sends a kill signal to a remote process. + First tries a SIGTERM, and if the process does not end in 10 seconds, it sends a SIGKILL. - - Parameters: - pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid - - sudo: whether the command was run with sudo - careful killing like this. - - host/port/user/agent/identity: see rexec - - Returns: + + :param pid: Process id of process to be killed + :type pid: int + + :param ppid: Parent process id of process to be killed + :type ppid: int + + :param sudo: Flag indicating if sudo should be used to kill the process + :type sudo: bool - Nothing, should have killed the process """ - subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid } cmd = """ SUBKILL="%(subkill)s" ; diff --git a/test/execution/ec.py b/test/execution/ec.py index 2a245472..46021483 100755 --- a/test/execution/ec.py +++ b/test/execution/ec.py @@ -72,7 +72,7 @@ class ExecuteControllersTestCase(unittest.TestCase): def test_schedule_exception(self): def raise_error(): - raise RuntimeError, "the error" + raise RuntimeError, "NOT A REAL ERROR. JUST TESTING!" ec = ExperimentController() ec.schedule("2s", raise_error) diff --git a/test/resources/linux/application.py b/test/resources/linux/application.py index 75a43d14..1393f6a6 100755 --- a/test/resources/linux/application.py +++ b/test/resources/linux/application.py @@ -34,13 +34,13 @@ import unittest class LinuxApplicationTestCase(unittest.TestCase): def setUp(self): - self.fedora_host = 'nepi2.pl.sophia.inria.fr' - self.fedora_user = 'inria_nepi' + self.fedora_host = "nepi2.pl.sophia.inria.fr" + self.fedora_user = "inria_nepi" - self.ubuntu_host = 'roseval.pl.sophia.inria.fr' - self.ubuntu_user = 'alina' + self.ubuntu_host = "roseval.pl.sophia.inria.fr" + self.ubuntu_user = "alina" - self.target = 'nepi5.pl.sophia.inria.fr' + self.target = "nepi5.pl.sophia.inria.fr" @skipIfNotAlive def t_stdout(self, host, user): @@ -69,7 +69,7 @@ class LinuxApplicationTestCase(unittest.TestCase): self.assertTrue(ec.state(node) == ResourceState.STARTED) self.assertTrue(ec.state(app) == ResourceState.FINISHED) - stdout = ec.trace(app, 'stdout') + stdout = ec.trace(app, "stdout") self.assertTrue(stdout.strip() == "HOLA") ec.shutdown() @@ -102,16 +102,16 @@ class LinuxApplicationTestCase(unittest.TestCase): self.assertTrue(ec.state(node) == ResourceState.STARTED) self.assertTrue(ec.state(app) == ResourceState.FINISHED) - stdout = ec.trace(app, 'stdout') - size = ec.trace(app, 'stdout', attr = TraceAttr.SIZE) + stdout = ec.trace(app, "stdout") + size = ec.trace(app, "stdout", attr = TraceAttr.SIZE) self.assertEquals(len(stdout), size) - block = ec.trace(app, 'stdout', attr = TraceAttr.STREAM, block = 5, offset = 1) + block = ec.trace(app, "stdout", attr = TraceAttr.STREAM, block = 5, offset = 1) self.assertEquals(block, stdout[5:10]) - path = ec.trace(app, 'stdout', attr = TraceAttr.PATH) + path = ec.trace(app, "stdout", attr = TraceAttr.PATH) rm = ec.get_resource(app) - p = os.path.join(rm.app_home, 'stdout') + p = os.path.join(rm.app_home, "stdout") self.assertEquals(path, p) ec.shutdown() @@ -202,7 +202,7 @@ class LinuxApplicationTestCase(unittest.TestCase): self.assertTrue(ec.state(server) == ResourceState.FINISHED) self.assertTrue(ec.state(client) == ResourceState.FINISHED) - stdout = ec.trace(client, 'stdout') + stdout = ec.trace(client, "stdout") self.assertTrue(stdout.strip() == "HOLA") ec.shutdown() @@ -222,8 +222,8 @@ class LinuxApplicationTestCase(unittest.TestCase): ec.set(node, "cleanHome", True) ec.set(node, "cleanProcesses", True) - sources = "http://nepi.inria.fr/attachment/wiki/WikiStart/pybindgen-r794.tar.gz " \ - "http://nepi.inria.fr/attachment/wiki/WikiStart/nepi_integration_framework.pdf" + sources = "http://nepi.inria.fr/code/nef/archive/tip.tar.gz " \ + " http://nepi.inria.fr/code/nef/raw-file/8ace577d4079/src/nef/images/menu/connect.png" app = ec.register_resource("LinuxApplication") ec.set(app, "sources", sources) @@ -237,12 +237,12 @@ class LinuxApplicationTestCase(unittest.TestCase): self.assertTrue(ec.state(node) == ResourceState.STARTED) self.assertTrue(ec.state(app) == ResourceState.FINISHED) - err = ec.trace(app, 'http_sources_err') - self.assertTrue(err == "") + exitcode = ec.trace(app, "http_sources_exitcode") + self.assertTrue(exitcode.strip() == "0") - out = ec.trace(app, 'http_sources_out') - self.assertTrue(out.find("pybindgen-r794.tar.gz") > -1) - self.assertTrue(out.find("nepi_integration_framework.pdf") > -1) + out = ec.trace(app, "http_sources_stdout") + self.assertTrue(out.find("tip.tar.gz") > -1) + self.assertTrue(out.find("connect.png") > -1) ec.shutdown() @@ -277,8 +277,6 @@ class LinuxApplicationTestCase(unittest.TestCase): self.t_http_sources(self.ubuntu_host, self.ubuntu_user) - # TODO: test compilation, sources, dependencies, etc!!! - if __name__ == '__main__': unittest.main() diff --git a/test/resources/linux/node.py b/test/resources/linux/node.py index 1a7fa099..259f5783 100755 --- a/test/resources/linux/node.py +++ b/test/resources/linux/node.py @@ -19,8 +19,8 @@ # Author: Alina Quereilhac -from nepi.resources.linux.node import LinuxNode -from nepi.util.sshfuncs import RUNNING, FINISHED +from nepi.resources.linux.node import LinuxNode, ExitCode +from nepi.util.sshfuncs import ProcStatus from test_utils import skipIfNotAlive, skipInteractive, create_node @@ -31,27 +31,13 @@ import unittest class LinuxNodeTestCase(unittest.TestCase): def setUp(self): - self.fedora_host = 'nepi2.pl.sophia.inria.fr' - self.fedora_user = 'inria_nepi' + self.fedora_host = "nepi2.pl.sophia.inria.fr" + self.fedora_user = "inria_nepi" - self.ubuntu_host = 'roseval.pl.sophia.inria.fr' - self.ubuntu_user = 'alina' + self.ubuntu_host = "roseval.pl.sophia.inria.fr" + self.ubuntu_user = "alina" - self.target = 'nepi5.pl.sophia.inria.fr' - - @skipIfNotAlive - def t_xterm(self, host, user): - node, ec = create_node(host, user) - - node.install_packages('xterm') - - (out, err), proc = node.execute('xterm', forward_x11 = True) - - self.assertEquals(out, "") - - (out, err), proc = node.remove_packages('xterm') - - self.assertEquals(out, "") + self.target = "nepi5.pl.sophia.inria.fr" @skipIfNotAlive def t_execute(self, host, user): @@ -74,14 +60,14 @@ class LinuxNodeTestCase(unittest.TestCase): command = "ping %s" % self.target node.run(command, app_home) - pid, ppid = node.checkpid(app_home) + pid, ppid = node.getpid(app_home) status = node.status(pid, ppid) - self.assertTrue(status, RUNNING) + self.assertTrue(status, ProcStatus.RUNNING) node.kill(pid, ppid) status = node.status(pid, ppid) - self.assertTrue(status, FINISHED) + self.assertTrue(status, ProcStatus.FINISHED) (out, err), proc = node.check_output(app_home, "stdout") @@ -91,22 +77,120 @@ class LinuxNodeTestCase(unittest.TestCase): node.rmdir(app_home) + @skipIfNotAlive + def t_exitcode_ok(self, host, user): + command = "echo 'OK!'" + + node, ec = create_node(host, user) + + app_home = os.path.join(node.exp_home, "my-app") + node.mkdir(app_home, clean = True) + + (out, err), proc = node.run_and_wait(command, app_home, + shfile = "cmd.sh", + pidfile = "pid", + ecodefile = "exitcode", + stdout = "stdout", + stderr = "stderr", + raise_on_error = True) + + # get the pid of the process + ecode = node.exitcode(app_home) + self.assertEquals(ecode, ExitCode.OK) + + @skipIfNotAlive + def t_exitcode_kill(self, host, user): + node, ec = create_node(host, user) + + app_home = os.path.join(node.exp_home, "my-app") + node.mkdir(app_home, clean = True) + + # Upload command that will not finish + command = "ping localhost" + (out, err), proc = node.upload_command(command, app_home, + shfile = "cmd.sh", + ecodefile = "exitcode") + + (out, err), proc = node.run(command, app_home, + pidfile = "pidfile", + stdout = "stdout", + stderr = "stderr") + + # Just wait to make sure the ping started + time.sleep(5) + + # The process is still running, so no retfile has been created yet + ecode = node.exitcode(app_home) + self.assertEquals(ecode, ExitCode.FILENOTFOUND) + + (out, err), proc = node.check_errors(app_home) + self.assertEquals(err, "") + + # Now kill the app + pid, ppid = node.getpid(app_home) + node.kill(pid, ppid) + + (out, err), proc = node.check_errors(app_home) + self.assertEquals(err, "") + + @skipIfNotAlive + def t_exitcode_error(self, host, user): + # Try to execute a command that doesn't exist + command = "unexistent-command" + + node, ec = create_node(host, user) + + app_home = os.path.join(node.exp_home, "my-app") + node.mkdir(app_home, clean = True) + + (out, err), proc = node.run_and_wait(command, app_home, + shfile = "cmd.sh", + pidfile = "pid", + ecodefile = "exitcode", + stdout = "stdout", + stderr = "stderr", + raise_on_error = False) + + # get the pid of the process + ecode = node.exitcode(app_home) + # bash erro 127 - command not found + self.assertEquals(ecode, 127) + + (out, err), proc = node.check_errors(app_home) + self.assertNotEquals(out, "") + @skipIfNotAlive def t_install(self, host, user): node, ec = create_node(host, user) - (out, err), proc = node.mkdir(node.node_home, clean=True) + (out, err), proc = node.mkdir(node.node_home, clean = True) self.assertEquals(out, "") - (out, err), proc = node.install_packages('gcc') + (out, err), proc = node.install_packages("gcc", node.node_home) self.assertEquals(out, "") - (out, err), proc = node.remove_packages('gcc') + (out, err), proc = node.remove_packages("gcc", node.node_home) self.assertEquals(out, "") (out, err), proc = node.rmdir(node.exp_home) self.assertEquals(out, "") + @skipIfNotAlive + def t_xterm(self, host, user): + node, ec = create_node(host, user) + + (out, err), proc = node.mkdir(node.node_home, clean = True) + self.assertEquals(out, "") + + node.install_packages("xterm", node.node_home) + self.assertEquals(out, "") + + (out, err), proc = node.execute("xterm", forward_x11 = True) + self.assertEquals(out, "") + + (out, err), proc = node.remove_packages("xterm", node.node_home) + self.assertEquals(out, "") + @skipIfNotAlive def t_compile(self, host, user): node, ec = create_node(host, user) @@ -128,7 +212,7 @@ main (void) node.upload(prog, dst, text = True) # install gcc - node.install_packages('gcc') + node.install_packages('gcc', app_home) # compile the program using gcc command = "cd %s; gcc -Wall hello.c -o hello" % app_home @@ -147,12 +231,12 @@ main (void) # retrieve the output file src = os.path.join(app_home, "hello.out") - f = tempfile.NamedTemporaryFile(delete=False) + f = tempfile.NamedTemporaryFile(delete = False) dst = f.name node.download(src, dst) f.close() - node.remove_packages('gcc') + node.remove_packages("gcc", app_home) node.rmdir(app_home) f = open(dst, "r") @@ -184,6 +268,24 @@ main (void) def test_compile_ubuntu(self): self.t_compile(self.ubuntu_host, self.ubuntu_user) + + def test_exitcode_ok_fedora(self): + self.t_exitcode_ok(self.fedora_host, self.fedora_user) + + def test_exitcode_ok_ubuntu(self): + self.t_exitcode_ok(self.ubuntu_host, self.ubuntu_user) + + def test_exitcode_kill_fedora(self): + self.t_exitcode_kill(self.fedora_host, self.fedora_user) + + def test_exitcode_kill_ubuntu(self): + self.t_exitcode_kill(self.ubuntu_host, self.ubuntu_user) + + def test_exitcode_error_fedora(self): + self.t_exitcode_error(self.fedora_host, self.fedora_user) + + def test_exitcode_error_ubuntu(self): + self.t_exitcode_error(self.ubuntu_host, self.ubuntu_user) @skipInteractive def test_xterm_ubuntu(self): diff --git a/test/util/sshfuncs.py b/test/util/sshfuncs.py index 3342157f..1a55b6b1 100755 --- a/test/util/sshfuncs.py +++ b/test/util/sshfuncs.py @@ -19,8 +19,8 @@ # Author: Alina Quereilhac -from nepi.util.sshfuncs import rexec, rcopy, rspawn, rcheckpid, rstatus, rkill,\ - RUNNING, FINISHED +from nepi.util.sshfuncs import rexec, rcopy, rspawn, rgetpid, rstatus, rkill,\ + ProcStatus import getpass import unittest @@ -231,7 +231,7 @@ class SSHfuncsTestCase(unittest.TestCase): time.sleep(2) - (pid, ppid) = rcheckpid(pidfile, + (pid, ppid) = rgetpid(pidfile, host = host, user = user, port = env.port, @@ -243,7 +243,7 @@ class SSHfuncsTestCase(unittest.TestCase): port = env.port, agent = True) - self.assertEquals(status, RUNNING) + self.assertEquals(status, ProcStatus.RUNNING) rkill(pid, ppid, host = host, @@ -257,7 +257,7 @@ class SSHfuncsTestCase(unittest.TestCase): port = env.port, agent = True) - self.assertEquals(status, FINISHED) + self.assertEquals(status, ProcStatus.FINISHED) if __name__ == '__main__': -- 2.43.0