X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fapplication.py;h=1574f63cbf85fbeaa1db7c5161c505b278ac46f6;hb=6285ca51026efb69642eea9dfc7c480e722d84a9;hp=882fb4c340a4690836220b03aa1da21655687d00;hpb=e41672c333f70c7beb4a9c9c208d77f213092aee;p=nepi.git diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 882fb4c3..1574f63c 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -1,72 +1,149 @@ -from neco.execution.attribute import Attribute, Flags, Types -from neco.execution.trace import Trace, TraceAttr -from neco.execution.resource import ResourceManager, clsinit, ResourceState -from neco.resources.linux.node import LinuxNode -from neco.util import sshfuncs -from neco.util.timefuncs import strfnow, strfdiff - -import logging -import os +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.trace import Trace, TraceAttr +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState +from nepi.resources.linux.node import LinuxNode +from nepi.util.sshfuncs import ProcStatus +from nepi.util.timefuncs import tnow, tdiffsec -reschedule_delay = "0.5s" -state_check_delay = 1 +import os +import subprocess -# TODO: Resolve wildcards in commands!! +# TODO: Resolve wildcards in commands!! +# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!! -@clsinit +@clsinit_copy class LinuxApplication(ResourceManager): - _rtype = "LinuxApplication" + """ + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + + .. note:: + + A LinuxApplication RM represents a process that can be executed in + a remote Linux host using SSH. + + The LinuxApplication RM takes care of uploadin sources and any files + needed to run the experiment, to the remote host. + It also allows to provide source compilation (build) and installation + instructions, and takes care of automating the sources build and + installation tasks for the user. + + It is important to note that files uploaded to the remote host have + two possible scopes: single-experiment or multi-experiment. + Single experiment files are those that will not be re-used by other + experiments. Multi-experiment files are those that will. + Sources and shared files are always made available to all experiments. + + Directory structure: + + The directory structure used by LinuxApplication RM at the Linux + host is the following: + + ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files + | + ${LIB} |- /lib --> Base directory for libraries + ${BIN} |- /bin --> Base directory for binary files + ${SRC} |- /src --> Base directory for sources + ${SHARE} |- /share --> Base directory for other files + + ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files + | + ${EXP_HOME} |- / --> Base directory for experiment exp-id + | + ${APP_HOME} |- / --> Base directory for application + | specific files (e.g. command.sh, input) + | + ${RUN_HOME} |- / --> Base directory for run specific + + """ + + _rtype = "linux::Application" + _help = "Runs an application on a Linux host with a BASH command " + _platform = "linux" @classmethod def _register_attributes(cls): - command = Attribute("command", "Command to execute", - flags = Flags.ExecReadOnly) - forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", - flags = Flags.ExecReadOnly) + command = Attribute("command", "Command to execute at application start. " + "Note that commands will be executed in the ${RUN_HOME} directory, " + "make sure to take this into account when using relative paths. ", + flags = Flags.Design) + forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", + flags = Flags.Design) env = Attribute("env", "Environment variables string for command execution", - flags = Flags.ExecReadOnly) + flags = Flags.Design) sudo = Attribute("sudo", "Run with root privileges", - flags = Flags.ExecReadOnly) + flags = Flags.Design) depends = Attribute("depends", "Space-separated list of packages required to run the application", - flags = Flags.ExecReadOnly) + flags = Flags.Design) sources = Attribute("sources", - "Space-separated list of regular files to be deployed in the working " - "path prior to building. Archives won't be expanded automatically.", - flags = Flags.ExecReadOnly) + "semi-colon separated list of regular files to be uploaded to ${SRC} " + "directory prior to building. Archives won't be expanded automatically. " + "Sources are globally available for all experiments unless " + "cleanHome is set to True (This will delete all sources). ", + flags = Flags.Design) + files = Attribute("files", + "semi-colon separated list of regular miscellaneous files to be uploaded " + "to ${SHARE} directory. " + "Files are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", + flags = Flags.Design) + libs = Attribute("libs", + "semi-colon separated list of libraries (e.g. .so files) to be uploaded " + "to ${LIB} directory. " + "Libraries are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", + flags = Flags.Design) + bins = Attribute("bins", + "semi-colon separated list of binary files to be uploaded " + "to ${BIN} directory. " + "Binaries are globally available for all experiments unless " + "cleanHome is set to True (This will delete all files). ", + flags = Flags.Design) code = Attribute("code", - "Plain text source code to be uploaded to the server. It will be stored " - "under ${SOURCES}/code", - flags = Flags.ExecReadOnly) + "Plain text source code to be uploaded to the ${APP_HOME} directory. ", + flags = Flags.Design) build = Attribute("build", "Build commands to execute after deploying the sources. " - "Sources will be in the ${SOURCES} folder. " - "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n" - "Try to make the commands return with a nonzero exit code on error.\n" - "Also, do not install any programs here, use the 'install' attribute. This will " - "help keep the built files constrained to the build folder (which may " - "not be the home folder), and will result in faster deployment. Also, " - "make sure to clean up temporary files, to reduce bandwidth usage between " - "nodes when transferring built packages.", - flags = Flags.ReadOnly) + "Sources are uploaded to the ${SRC} directory and code " + "is uploaded to the ${APP_HOME} directory. \n" + "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && " + "./configure && make && make clean.\n" + "Make sure to make the build commands return with a nonzero exit " + "code on error.", + flags = Flags.Design) install = Attribute("install", "Commands to transfer built files to their final destinations. " - "Sources will be in the initial working folder, and a special " - "tag ${SOURCES} can be used to reference the experiment's " - "home folder (where the application commands will run).\n" - "ALL sources and targets needed for execution must be copied there, " - "if building has been enabled.\n" - "That is, 'slave' nodes will not automatically get any source files. " - "'slave' nodes don't get build dependencies either, so if you need " - "make and other tools to install, be sure to provide them as " - "actual dependencies instead.", - flags = Flags.ReadOnly) - stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly) - stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly) - stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly) - tear_down = Attribute("tearDown", "Bash script to be executed before " + "Install commands are executed after build commands. ", + flags = Flags.Design) + stdin = Attribute("stdin", "Standard input for the 'command'", + flags = Flags.Design) + tear_down = Attribute("tearDown", "Command to be executed just before " "releasing the resource", - flags = Flags.ReadOnly) + flags = Flags.Design) cls._register_attribute(command) cls._register_attribute(forward_x11) @@ -75,55 +152,67 @@ class LinuxApplication(ResourceManager): cls._register_attribute(depends) cls._register_attribute(sources) cls._register_attribute(code) + cls._register_attribute(files) + cls._register_attribute(bins) + cls._register_attribute(libs) cls._register_attribute(build) cls._register_attribute(install) cls._register_attribute(stdin) - cls._register_attribute(stdout) - cls._register_attribute(stderr) cls._register_attribute(tear_down) @classmethod def _register_traces(cls): - stdout = Trace("stdout", "Standard output stream") - stderr = Trace("stderr", "Standard error stream") - buildlog = Trace("buildlog", "Output of the build process") + stdout = Trace("stdout", "Standard output stream", enabled = True) + stderr = Trace("stderr", "Standard error stream", enabled = True) cls._register_trace(stdout) cls._register_trace(stderr) - cls._register_trace(buildlog) def __init__(self, ec, guid): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None + self._node = None self._home = "app-%s" % self.guid - # timestamp of last state check of the application - self._last_state_check = strfnow() + # whether the command should run in foreground attached + # to a terminal + self._in_foreground = False - self._logger = logging.getLogger("LinuxApplication") - + # whether to use sudo to kill the application process + self._sudo_kill = False + + # keep a reference to the running process handler when + # the command is not executed as remote daemon in background + self._proc = None + + # timestamp of last state check of the application + self._last_state_check = tnow() + def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.node.get("hostname"), msg) @property def node(self): - node = self.get_connected(LinuxNode.rtype()) - if node: return node[0] - return None + if not self._node: + node = self.get_connected(LinuxNode.get_rtype()) + if not node: + msg = "Application %s guid %d NOT connected to Node" % ( + self._rtype, self.guid) + raise RuntimeError(msg) + + self._node = node[0] + + return self._node @property def app_home(self): return os.path.join(self.node.exp_home, self._home) @property - def src_dir(self): - return os.path.join(self.app_home, 'src') - - @property - def build_dir(self): - return os.path.join(self.app_home, 'build') + def run_home(self): + return os.path.join(self.app_home, self.ec.run_id) @property def pid(self): @@ -133,10 +222,26 @@ class LinuxApplication(ResourceManager): def ppid(self): return self._ppid + @property + def in_foreground(self): + """ Returns True if the command needs to be executed in foreground. + This means that command will be executed using 'execute' instead of + 'run' ('run' executes a command in background and detached from the + terminal) + + When using X11 forwarding option, the command can not run in background + and detached from a terminal, since we need to keep the terminal attached + to interact with it. + """ + return self.get("forwardX11") or self._in_foreground + + def trace_filepath(self, filename): + return os.path.join(self.run_home, filename) + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) - path = os.path.join(self.app_home, name) + path = self.trace_filepath(name) command = "(test -f %s && echo 'success') || echo 'error'" % path (out, err), proc = self.node.execute(command) @@ -150,9 +255,9 @@ class LinuxApplication(ResourceManager): return path if attr == TraceAttr.ALL: - (out, err), proc = self.node.check_output(self.app_home, name) + (out, err), proc = self.node.check_output(self.run_home, name) - if err and proc.poll(): + if proc.poll(): msg = " Couldn't read trace %s " % name self.error(msg, out, err) return None @@ -166,7 +271,7 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.execute(cmd) - if err and proc.poll(): + if proc.poll(): msg = " Couldn't find trace %s " % name self.error(msg, out, err) return None @@ -175,328 +280,491 @@ class LinuxApplication(ResourceManager): out = int(out.strip()) return out - - def provision(self, filters = None): - # create home dir for application - self.node.mkdir(self.app_home) - - # upload sources - self.upload_sources() - - # upload code - self.upload_code() - # upload stdin - self.upload_stdin() + def do_provision(self): + # take a snapshot of the system if user is root + # to ensure that cleanProcess will not kill + # pre-existent processes + if self.node.get("username") == 'root': + import pickle + procs = dict() + ps_aux = "ps aux |awk '{print $2,$11}'" + (out, err), proc = self.node.execute(ps_aux) + if len(out) != 0: + for line in out.strip().split("\n"): + parts = line.strip().split(" ") + procs[parts[0]] = parts[1] + with open("/tmp/save.proc", "wb") as pickle_file: + pickle.dump(procs, pickle_file) + + # create run dir for application + self.node.mkdir(self.run_home) + + # List of all the provision methods to invoke + steps = [ + # upload sources + self.upload_sources, + # upload files + self.upload_files, + # upload binaries + self.upload_binaries, + # upload libraries + self.upload_libraries, + # upload code + self.upload_code, + # upload stdin + self.upload_stdin, + # install dependencies + self.install_dependencies, + # build + self.build, + # Install + self.install] + + command = [] + + # Since provisioning takes a long time, before + # each step we check that the EC is still + for step in steps: + if self.ec.abort: + self.debug("Interrupting provisioning. EC says 'ABORT") + return + + ret = step() + if ret: + command.append(ret) - # install dependencies - self.install_dependencies() + # upload deploy script + deploy_command = ";".join(command) + self.execute_deploy_command(deploy_command) - # build - self.build() + # upload start script + self.upload_start_command() + + self.info("Provisioning finished") - # Install - self.install() + super(LinuxApplication, self).do_provision() + def upload_start_command(self, overwrite = False): + # Upload command to remote bash script + # - only if command can be executed in background and detached command = self.get("command") - x11 = self.get("forwardX11") - if not x11 and command: + + if command and not self.in_foreground: self.info("Uploading command '%s'" % command) - # Export environment - environ = "" - env = self.get("env") or "" - for var in env.split(" "): - environ += 'export %s\n' % var + # replace application specific paths in the command + command = self.replace_paths(command) + # replace application specific paths in the environment + env = self.get("env") + env = env and self.replace_paths(env) + + shfile = os.path.join(self.app_home, "start.sh") - command = environ + command + self.node.upload_command(command, + shfile = shfile, + env = env, + overwrite = overwrite) - # If the command runs asynchronous, pre upload the command - # to the app.sh file in the remote host - dst = os.path.join(self.app_home, "app.sh") + def execute_deploy_command(self, command, prefix="deploy"): + if command: + # replace application specific paths in the command command = self.replace_paths(command) - self.node.upload(command, dst, text = True) - - super(LinuxApplication, self).provision() + + # replace application specific paths in the environment + env = self.get("env") + env = env and self.replace_paths(env) + + # Upload the command to a bash script and run it + # in background ( but wait until the command has + # finished to continue ) + shfile = os.path.join(self.app_home, "%s.sh" % prefix) + self.node.run_and_wait(command, self.run_home, + shfile = shfile, + overwrite = False, + pidfile = "%s_pidfile" % prefix, + ecodefile = "%s_exitcode" % prefix, + stdout = "%s_stdout" % prefix, + stderr = "%s_stderr" % prefix) + + def upload_sources(self, sources = None, src_dir = None): + if not sources: + sources = self.get("sources") + + command = "" + + if not src_dir: + src_dir = self.node.src_dir - def upload_sources(self): - # TODO: check if sources need to be uploaded and upload them - sources = self.get("sources") if sources: - self.info(" Uploading sources ") + self.info("Uploading sources ") - # create dir for sources - self.node.mkdir(self.src_dir) - - sources = sources.split(' ') + sources = map(str.strip, sources.split(";")) - http_sources = list() + # Separate sources that should be downloaded from + # the web, from sources that should be uploaded from + # the local machine + command = [] for source in list(sources): if source.startswith("http") or source.startswith("https"): - http_sources.append(source) + # remove the hhtp source from the sources list sources.remove(source) - # Download http sources - if http_sources: - cmd = " wget -c --directory-prefix=${SOURCES} " - verif = "" - - for source in http_sources: - cmd += " %s " % (source) - verif += " ls ${SOURCES}/%s ;" % os.path.basename(source) - - # Wget output goes to stderr :S - cmd += " 2> /dev/null ; " - - # Add verification - cmd += " %s " % verif - - # Upload the command to a file, and execute asynchronously - self.upload_and_run(cmd, - "http_sources.sh", "http_sources_pid", - "http_sources_out", "http_sources_err") + command.append( " ( " + # Check if the source already exists + " ls %(src_dir)s/%(basename)s " + " || ( " + # If source doesn't exist, download it and check + # that it it downloaded ok + " wget -c --directory-prefix=%(src_dir)s %(source)s && " + " ls %(src_dir)s/%(basename)s " + " ) ) " % { + "basename": os.path.basename(source), + "source": source, + "src_dir": src_dir + }) + + command = " && ".join(command) + + # replace application specific paths in the command + command = self.replace_paths(command) + if sources: - self.node.upload(sources, self.src_dir) + sources = ';'.join(sources) + self.node.upload(sources, src_dir, overwrite = False) + + return command + + def upload_files(self, files = None): + if not files: + files = self.get("files") + + if files: + self.info("Uploading files %s " % files) + self.node.upload(files, self.node.share_dir, overwrite = False) + + def upload_libraries(self, libs = None): + if not libs: + libs = self.get("libs") + + if libs: + self.info("Uploading libraries %s " % libaries) + self.node.upload(libs, self.node.lib_dir, overwrite = False) + + def upload_binaries(self, bins = None): + if not bins: + bins = self.get("bins") + + if bins: + self.info("Uploading binaries %s " % binaries) + self.node.upload(bins, self.node.bin_dir, overwrite = False) + + def upload_code(self, code = None): + if not code: + code = self.get("code") - def upload_code(self): - code = self.get("code") if code: - # create dir for sources - self.node.mkdir(self.src_dir) + self.info("Uploading code") - self.info(" Uploading code ") + dst = os.path.join(self.app_home, "code") + self.node.upload(code, dst, overwrite = False, text = True) - dst = os.path.join(self.src_dir, "code") - self.node.upload(sources, dst, text = True) + def upload_stdin(self, stdin = None): + if not stdin: + stdin = self.get("stdin") - def upload_stdin(self): - stdin = self.get("stdin") if stdin: # create dir for sources - self.info(" Uploading stdin ") + self.info("Uploading stdin") + + # upload stdin file to ${SHARE_DIR} directory + if os.path.isfile(stdin): + basename = os.path.basename(stdin) + dst = os.path.join(self.node.share_dir, basename) + else: + dst = os.path.join(self.app_home, "stdin") + + self.node.upload(stdin, dst, overwrite = False, text = True) + + # create "stdin" symlink on ${APP_HOME} directory + command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({ + "app_home": self.app_home, + "stdin": dst }) + + return command - dst = os.path.join(self.app_home, "stdin") - self.node.upload(stdin, dst, text = True) + def install_dependencies(self, depends = None): + if not depends: + depends = self.get("depends") - def install_dependencies(self): - depends = self.get("depends") if depends: - self.info(" Installing dependencies %s" % depends) - self.node.install_packages(depends, home = self.app_home) + self.info("Installing dependencies %s" % depends) + return self.node.install_packages_command(depends) + + def build(self, build = None): + if not build: + build = self.get("build") - def build(self): - build = self.get("build") if build: - self.info(" Building sources ") + self.info("Building sources ") - # create dir for build - self.node.mkdir(self.build_dir) - - # Upload the command to a file, and execute asynchronously - self.upload_and_run(build, - "build.sh", "build_pid", - "build_out", "build_err") - - def install(self): - install = self.get("install") + # replace application specific paths in the command + return self.replace_paths(build) + + def install(self, install = None): + if not install: + install = self.get("install") + if install: - self.info(" Installing sources ") + self.info("Installing sources ") - # Upload the command to a file, and execute asynchronously - self.upload_and_run(install, - "install.sh", "install_pid", - "install_out", "install_err") + # replace application specific paths in the command + return self.replace_paths(install) - def deploy(self): + def do_deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: - self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) - self.ec.schedule(reschedule_delay, self.deploy) + self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state) + self.ec.schedule(self.reschedule_delay, self.deploy) else: - try: - command = self.get("command") or "" - self.info(" Deploying command '%s' " % command) - self.discover() - self.provision() - except: - self._state = ResourceState.FAILED - raise - - super(LinuxApplication, self).deploy() - - def start(self): - command = self.get('command') - env = self.get('env') - stdin = 'stdin' if self.get('stdin') else None - stdout = 'stdout' if self.get('stdout') else 'stdout' - stderr = 'stderr' if self.get('stderr') else 'stderr' - sudo = self.get('sudo') or False - x11 = self.get('forwardX11') or False - failed = False - - super(LinuxApplication, self).start() + command = self.get("command") or "" + self.info("Deploying command '%s' " % command) + self.do_discover() + self.do_provision() + + super(LinuxApplication, self).do_deploy() + + def do_start(self): + command = self.get("command") - if not command: - self.info("No command to start ") - self._state = ResourceState.FINISHED - return - self.info("Starting command '%s'" % command) - if x11: - if env: - # Export environment - environ = "" - for var in env.split(" "): - environ += ' %s ' % var - - command = "(" + environ + " ; " + command + ")" - command = self.replace_paths(command) + if not command: + # If no command was given (i.e. Application was used for dependency + # installation), then the application is directly marked as STOPPED + super(LinuxApplication, self).set_stopped() + else: + if self.in_foreground: + self._run_in_foreground() + else: + self._run_in_background() - # If the command requires X11 forwarding, we - # can't run it asynchronously - (out, err), proc = self.node.execute(command, - sudo = sudo, - stdin = stdin, - forward_x11 = x11) + super(LinuxApplication, self).do_start() - self._state = ResourceState.FINISHED + def _run_in_foreground(self): + command = self.get("command") + sudo = self.get("sudo") or False + x11 = self.get("forwardX11") + env = self.get("env") + + # Command will be launched in foreground and attached to the + # terminal using the node 'execute' in non blocking mode. + + # We save the reference to the process in self._proc + # to be able to kill the process from the stop method. + # We also set blocking = False, since we don't want the + # thread to block until the execution finishes. + (out, err), self._proc = self.execute_command(command, + env = env, + sudo = sudo, + forward_x11 = x11, + blocking = False) + + if self._proc.poll(): + self.error(msg, out, err) + raise RuntimeError(msg) - if proc.poll() and err: - failed = True - else: - # Command was previously uploaded, now run the remote - # bash file asynchronously - cmd = "bash ./app.sh" - (out, err), proc = self.node.run(cmd, self.app_home, - stdin = stdin, - stdout = stdout, - stderr = stderr, - sudo = sudo) - - if proc.poll() and err: - failed = True + def _run_in_background(self): + command = self.get("command") + env = self.get("env") + sudo = self.get("sudo") or False + + stdout = "stdout" + stderr = "stderr" + stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \ + else None + + # Command will be run as a daemon in baground and detached from any + # terminal. + # The command to run was previously uploaded to a bash script + # during deployment, now we launch the remote script using 'run' + # method from the node. + cmd = "bash %s" % os.path.join(self.app_home, "start.sh") + (out, err), proc = self.node.run(cmd, self.run_home, + stdin = stdin, + stdout = stdout, + stderr = stderr, + sudo = sudo) + + # check if execution errors occurred + msg = " Failed to start command '%s' " % command - if not failed: - pid, ppid = self.node.wait_pid(home = self.app_home) - if pid: self._pid = int(pid) - if ppid: self._ppid = int(ppid) - - if not self.pid or not self.ppid: - failed = True - - (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr') - - if failed or out or chkerr: - # check if execution errors occurred + if proc.poll(): + self.error(msg, out, err) + raise RuntimeError(msg) + + # Wait for pid file to be generated + pid, ppid = self.node.wait_pid(self.run_home) + if pid: self._pid = int(pid) + if ppid: self._ppid = int(ppid) + + # If the process is not running, check for error information + # on the remote machine + if not self.pid or not self.ppid: + (out, err), proc = self.node.check_errors(self.run_home, + stderr = stderr) + + # Out is what was written in the stderr file + if err: msg = " Failed to start command '%s' " % command - out = out - if err: - err = err - elif chkerr: - err = chkerr - self.error(msg, out, err) - - msg2 = " Setting state to Failed" - self.debug(msg2) - self._state = ResourceState.FAILED - - raise RuntimeError, msg - - def stop(self): + raise RuntimeError(msg) + + def do_stop(self): + """ Stops application execution + """ command = self.get('command') or '' - state = self.state - - if state == ResourceState.STARTED: - self.info("Stopping command '%s'" % command) - - (out, err), proc = self.node.kill(self.pid, self.ppid) - if out or err: - # check if execution errors occurred - msg = " Failed to STOP command '%s' " % self.get("command") - self.error(msg, out, err) - self._state = ResourceState.FAILED - stopped = False + if self.state == ResourceState.STARTED: + + self.info("Stopping command '%s' " % command) + + # If the command is running in foreground (it was launched using + # the node 'execute' method), then we use the handler to the Popen + # process to kill it. Else we send a kill signal using the pid and ppid + # retrieved after running the command with the node 'run' method + if self._proc: + self._proc.kill() else: - super(LinuxApplication, self).stop() - - def release(self): + # Only try to kill the process if the pid and ppid + # were retrieved + if self.pid and self.ppid: + (out, err), proc = self.node.kill(self.pid, self.ppid, + sudo = self._sudo_kill) + + """ + # TODO: check if execution errors occurred + if (proc and proc.poll()) or err: + msg = " Failed to STOP command '%s' " % self.get("command") + self.error(msg, out, err) + """ + + super(LinuxApplication, self).do_stop() + + def do_release(self): self.info("Releasing resource") + self.do_stop() + tear_down = self.get("tearDown") if tear_down: self.node.execute(tear_down) - self.stop() - if self.state == ResourceState.STOPPED: - super(LinuxApplication, self).release() - + hard_release = self.get("hardRelease") + if hard_release: + self.node.rmdir(self.app_home) + + super(LinuxApplication, self).do_release() + @property def state(self): + """ Returns the state of the application + """ if self._state == ResourceState.STARTED: - # To avoid overwhelming the remote hosts and the local processor - # with too many ssh queries, the state is only requested - # every 'state_check_delay' . - if strfdiff(strfnow(), self._last_state_check) > state_check_delay: - # check if execution errors occurred - (out, err), proc = self.node.check_output(self.app_home, 'stderr') - - if out or err: - if err.find("No such file or directory") >= 0 : - # The resource is marked as started, but the - # command was not yet executed - return ResourceState.READY - + if self.in_foreground: + # Check if the process we used to execute the command + # is still running ... + retcode = self._proc.poll() + + # retcode == None -> running + # retcode > 0 -> error + # retcode == 0 -> finished + if retcode: + out = "" msg = " Failed to execute command '%s'" % self.get("command") + err = self._proc.stderr.read() self.error(msg, out, err) - self._state = ResourceState.FAILED + self.do_fail() - elif self.pid and self.ppid: - status = self.node.status(self.pid, self.ppid) - - if status == sshfuncs.FINISHED: - self._state = ResourceState.FINISHED - - - self._last_state_check = strfnow() + elif retcode == 0: + self.set_stopped() + else: + # We need to query the status of the command we launched in + # background. In order to avoid overwhelming the remote host and + # the local processor with too many ssh queries, the state is only + # requested every 'state_check_delay' seconds. + state_check_delay = 0.5 + if tdiffsec(tnow(), self._last_state_check) > state_check_delay: + if self.pid and self.ppid: + # Make sure the process is still running in background + status = self.node.status(self.pid, self.ppid) + + if status == ProcStatus.FINISHED: + # If the program finished, check if execution + # errors occurred + (out, err), proc = self.node.check_errors( + self.run_home) + + if err: + msg = "Failed to execute command '%s'" % \ + self.get("command") + self.error(msg, out, err) + self.do_fail() + else: + self.set_stopped() + + self._last_state_check = tnow() return self._state - def upload_and_run(self, cmd, fname, pidfile, outfile, errfile): - dst = os.path.join(self.app_home, fname) - cmd = self.replace_paths(cmd) - self.node.upload(cmd, dst, text = True) - - cmd = "bash ./%s" % fname - (out, err), proc = self.node.run_and_wait(cmd, self.app_home, - pidfile = pidfile, - stdout = outfile, - stderr = errfile, - raise_on_error = True) - - def replace_paths(self, command): + def execute_command(self, command, + env=None, + sudo=False, + tty=False, + forward_x11=False, + blocking=False): + + environ = "" + if env: + environ = self.node.format_environment(env, inline=True) + command = environ + command + command = self.replace_paths(command) + + return self.node.execute(command, + sudo=sudo, + tty=tty, + forward_x11=forward_x11, + blocking=blocking) + + def replace_paths(self, command, node=None, app_home=None, run_home=None): """ Replace all special path tags with shell-escaped actual paths. """ - def absolute_dir(d): - return d if d.startswith("/") else os.path.join("${HOME}", d) + if not node: + node=self.node + + if not app_home: + app_home=self.app_home + + if not run_home: + run_home = self.run_home return ( command - .replace("${SOURCES}", absolute_dir(self.src_dir)) - .replace("${BUILD}", absolute_dir(self.build_dir)) - .replace("${APP_HOME}", absolute_dir(self.app_home)) - .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) - .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) + .replace("${USR}", node.usr_dir) + .replace("${LIB}", node.lib_dir) + .replace("${BIN}", node.bin_dir) + .replace("${SRC}", node.src_dir) + .replace("${SHARE}", node.share_dir) + .replace("${EXP}", node.exp_dir) + .replace("${EXP_HOME}", node.exp_home) + .replace("${APP_HOME}", app_home) + .replace("${RUN_HOME}", run_home) + .replace("${NODE_HOME}", node.node_home) + .replace("${HOME}", node.home_dir) ) - + def valid_connection(self, guid): # TODO: Validate! return True - # XXX: What if it is connected to more than one node? - resources = self.find_resources(exact_tags = [tags.NODE]) - self._node = resources[0] if len(resources) == 1 else None - return self._node