# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Alina Quereilhac import os import subprocess import logging from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit_copy, \ ResourceState from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus, STDOUT from nepi.util.timefuncs import tnow, tdiffsec # to debug, just use # logging.getLogger('application').setLevel(logging.DEBUG) logger = logging.getLogger("application") # TODO: Resolve wildcards in commands!! # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!! @clsinit_copy class LinuxApplication(ResourceManager): """ .. class:: Class Args : :param ec: The Experiment controller :type ec: ExperimentController :param guid: guid of the RM :type guid: int .. note:: A LinuxApplication RM represents a process that can be executed in a remote Linux host using SSH. The LinuxApplication RM takes care of uploadin sources and any files needed to run the experiment, to the remote host. It also allows to provide source compilation (build) and installation instructions, and takes care of automating the sources build and installation tasks for the user. It is important to note that files uploaded to the remote host have two possible scopes: single-experiment or multi-experiment. Single experiment files are those that will not be re-used by other experiments. Multi-experiment files are those that will. Sources and shared files are always made available to all experiments. Directory structure: The directory structure used by LinuxApplication RM at the Linux host is the following: ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files | ${LIB} |- /lib --> Base directory for libraries ${BIN} |- /bin --> Base directory for binary files ${SRC} |- /src --> Base directory for sources ${SHARE} |- /share --> Base directory for other files ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files | ${EXP_HOME} |- / --> Base directory for experiment exp-id | ${APP_HOME} |- / --> Base directory for application | specific files (e.g. command.sh, input) | ${RUN_HOME} |- / --> Base directory for run specific """ _rtype = "linux::Application" _help = "Runs an application on a Linux host with a BASH command " _platform = "linux" @classmethod def _register_attributes(cls): cls._register_attribute( Attribute("command", "Command to execute at application start. " "Note that commands will be executed in the ${RUN_HOME} directory, " "make sure to take this into account when using relative paths. ", flags = Flags.Design)) cls._register_attribute( Attribute("forwardX11", "Enables X11 forwarding for SSH connections", flags = Flags.Design)) cls._register_attribute( Attribute("env", "Environment variables string for command execution", flags = Flags.Design)) cls._register_attribute( Attribute("sudo", "Run with root privileges", flags = Flags.Design)) cls._register_attribute( Attribute("depends", "Space-separated list of packages required to run the application", flags = Flags.Design)) cls._register_attribute( Attribute("sources", "semi-colon separated list of regular files to be uploaded to ${SRC} " "directory prior to building. Archives won't be expanded automatically. " "Sources are globally available for all experiments unless " "cleanHome is set to True (This will delete all sources). ", flags = Flags.Design)) cls._register_attribute( Attribute("files", "semi-colon separated list of regular miscellaneous files to be uploaded " "to ${SHARE} directory. " "Files are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", flags = Flags.Design)) cls._register_attribute( Attribute("libs", "semi-colon separated list of libraries (e.g. .so files) to be uploaded " "to ${LIB} directory. " "Libraries are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", flags = Flags.Design)) cls._register_attribute( Attribute("bins", "semi-colon separated list of binary files to be uploaded " "to ${BIN} directory. " "Binaries are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", flags = Flags.Design)) cls._register_attribute( Attribute("code", "Plain text source code to be uploaded to the ${APP_HOME} directory. ", flags = Flags.Design)) cls._register_attribute( Attribute("build", "Build commands to execute after deploying the sources. " "Sources are uploaded to the ${SRC} directory and code " "is uploaded to the ${APP_HOME} directory. \n" "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && " "./configure && make && make clean.\n" "Make sure to make the build commands return with a nonzero exit " "code on error.", flags = Flags.Design)) cls._register_attribute( Attribute("install", "Commands to transfer built files to their final destinations. " "Install commands are executed after build commands. ", flags = Flags.Design)) cls._register_attribute( Attribute("stdin", "Standard input for the 'command'", flags = Flags.Design)) cls._register_attribute( Attribute("tearDown", "Command to be executed just before releasing the resource", flags = Flags.Design)) cls._register_attribute( Attribute("splitStderr", "requests stderr to be retrieved separately", default = False)) @classmethod def _register_traces(cls): cls._register_trace( Trace("stdout", "Standard output stream", enabled = True)) cls._register_trace( Trace("stderr", "Standard error stream", enabled = True)) def __init__(self, ec, guid): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None self._node = None self._home = "app-{}".format(self.guid) # whether the command should run in foreground attached # to a terminal self._in_foreground = False # whether to use sudo to kill the application process self._sudo_kill = False # keep a reference to the running process handler when # the command is not executed as remote daemon in background self._proc = None # timestamp of last state check of the application self._last_state_check = tnow() def log_message(self, msg): return " guid {} - host {} - {} "\ .format(self.guid, self.node.get("hostname"), msg) @property def node(self): if not self._node: node = self.get_connected(LinuxNode.get_rtype()) if not node: msg = "Application {} guid {} NOT connected to Node"\ .format(self._rtype, self.guid) raise RuntimeError(msg) self._node = node[0] return self._node @property def app_home(self): return os.path.join(self.node.exp_home, self._home) @property def run_home(self): return os.path.join(self.app_home, self.ec.run_id) @property def pid(self): return self._pid @property def ppid(self): return self._ppid @property def in_foreground(self): """ Returns True if the command needs to be executed in foreground. This means that command will be executed using 'execute' instead of 'run' ('run' executes a command in background and detached from the terminal) When using X11 forwarding option, the command can not run in background and detached from a terminal, since we need to keep the terminal attached to interact with it. """ return self.get("forwardX11") or self._in_foreground def trace_filepath(self, filename): return os.path.join(self.run_home, filename) def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '{}' trace {} ".format(name, attr)) path = self.trace_filepath(name) logger.debug("trace: path= {}".format(path)) command = "(test -f {} && echo 'success') || echo 'error'".format(path) (out, err), proc = self.node.execute(command) if (err and proc.poll()) or out.find("error") != -1: msg = " Couldn't find trace {} ".format(name) self.error(msg, out, err) return None if attr == TraceAttr.PATH: return path if attr == TraceAttr.ALL: (out, err), proc = self.node.check_output(self.run_home, name) if proc.poll(): msg = " Couldn't read trace {} ".format(name) self.error(msg, out, err) return None return out if attr == TraceAttr.STREAM: cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset) elif attr == TraceAttr.SIZE: cmd = "stat -c {} ".format(path) (out, err), proc = self.node.execute(cmd) if proc.poll(): msg = " Couldn't find trace {} ".format(name) self.error(msg, out, err) return None if attr == TraceAttr.SIZE: out = int(out.strip()) return out def do_provision(self): # take a snapshot of the system if user is root # to ensure that cleanProcess will not kill # pre-existent processes if self.node.get("username") == 'root': import pickle procs = dict() ps_aux = "ps aux | awk '{print $2,$11}'" (out, err), proc = self.node.execute(ps_aux) if len(out) != 0: for line in out.strip().split("\n"): parts = line.strip().split(" ") procs[parts[0]] = parts[1] with open("/tmp/save.proc", "wb") as pickle_file: pickle.dump(procs, pickle_file) # create run dir for application self.node.mkdir(self.run_home) # List of all the provision methods to invoke steps = [ # upload sources self.upload_sources, # upload files self.upload_files, # upload binaries self.upload_binaries, # upload libraries self.upload_libraries, # upload code self.upload_code, # upload stdin self.upload_stdin, # install dependencies self.install_dependencies, # build self.build, # Install self.install, ] command = [] # Since provisioning takes a long time, before # each step we check that the EC is still for step in steps: if self.ec.abort: self.debug("Interrupting provisioning. EC says 'ABORT") return ret = step() if ret: command.append(ret) # upload deploy script deploy_command = ";".join(command) self.execute_deploy_command(deploy_command) # upload start script self.upload_start_command() self.info("Provisioning finished") super(LinuxApplication, self).do_provision() def upload_start_command(self, overwrite = False): # Upload command to remote bash script # - only if command can be executed in background and detached command = self.get("command") if command and not self.in_foreground: # self.info("Uploading command '{}'".format(command)) # replace application specific paths in the command command = self.replace_paths(command) # replace application specific paths in the environment env = self.get("env") env = env and self.replace_paths(env) shfile = os.path.join(self.app_home, "start.sh") self.node.upload_command(command, shfile = shfile, env = env, overwrite = overwrite) def execute_deploy_command(self, command, prefix="deploy"): if command: # replace application specific paths in the command command = self.replace_paths(command) # replace application specific paths in the environment env = self.get("env") env = env and self.replace_paths(env) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) shfile = os.path.join(self.app_home, "{}.sh".format(prefix)) # low-level spawn tools in both sshfuncs and execfuncs # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged stderr = "{}_stderr".format(prefix) \ if self.get("splitStderr") \ else STDOUT print("{} : prefix = {}, command={}, stderr={}" .format(self, prefix, command, stderr)) self.node.run_and_wait(command, self.run_home, shfile = shfile, overwrite = False, pidfile = "{}_pidfile".format(prefix), ecodefile = "{}_exitcode".format(prefix), stdout = "{}_stdout".format(prefix), stderr = stderr) def upload_sources(self, sources = None, src_dir = None): if not sources: sources = self.get("sources") command = "" if not src_dir: src_dir = self.node.src_dir if sources: self.info("Uploading sources ") sources = [str.strip(source) for source in sources.split(";")] # Separate sources that should be downloaded from # the web, from sources that should be uploaded from # the local machine command = [] for source in list(sources): if source.startswith("http") or source.startswith("https"): # remove the hhtp source from the sources list sources.remove(source) command.append( " ( " # Check if the source already exists " ls {src_dir}/{basename} " " || ( " # If source doesn't exist, download it and check # that it it downloaded ok " wget -c --directory-prefix={src_dir} {source} && " " ls {src_dir}/{basename} " " ) ) ".format( basename = os.path.basename(source), source = source, src_dir = src_dir )) command = " && ".join(command) # replace application specific paths in the command command = self.replace_paths(command) if sources: sources = ';'.join(sources) self.node.upload(sources, src_dir, overwrite = False) return command def upload_files(self, files = None): if not files: files = self.get("files") if files: self.info("Uploading files {} ".format(files)) self.node.upload(files, self.node.share_dir, overwrite = False) def upload_libraries(self, libs = None): if not libs: libs = self.get("libs") if libs: self.info("Uploading libraries {} ".format(libs)) self.node.upload(libs, self.node.lib_dir, overwrite = False) def upload_binaries(self, bins = None): if not bins: bins = self.get("bins") if bins: self.info("Uploading binaries {} ".format(bins)) self.node.upload(bins, self.node.bin_dir, overwrite = False) def upload_code(self, code = None): if not code: code = self.get("code") if code: self.info("Uploading code") dst = os.path.join(self.app_home, "code") self.node.upload(code, dst, overwrite = False, text = True, executable = True) def upload_stdin(self, stdin = None): if not stdin: stdin = self.get("stdin") if stdin: # create dir for sources self.info("Uploading stdin") # upload stdin file to ${SHARE_DIR} directory if os.path.isfile(stdin): basename = os.path.basename(stdin) dst = os.path.join(self.node.share_dir, basename) else: dst = os.path.join(self.app_home, "stdin") self.node.upload(stdin, dst, overwrite = False, text = True) # create "stdin" symlink on ${APP_HOME} directory command = "( cd {app_home} ; [ ! -f stdin ] && ln -s {stdin} stdin )"\ .format(app_home = self.app_home, stdin = dst) return command def install_dependencies(self, depends = None): if not depends: depends = self.get("depends") if depends: self.info("Installing dependencies {}".format(depends)) return self.node.install_packages_command(depends) def build(self, build = None): if not build: build = self.get("build") if build: self.info("Building sources ") # replace application specific paths in the command return self.replace_paths(build) def install(self, install = None): if not install: install = self.get("install") if install: self.info("Installing sources ") # replace application specific paths in the command return self.replace_paths(install) def do_deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state)) self.ec.schedule(self.reschedule_delay, self.deploy) else: command = self.get("command") or "" self.info("Deploying command '{}' ".format(command)) self.do_discover() self.do_provision() super(LinuxApplication, self).do_deploy() def do_start(self): command = self.get("command") self.info("Starting command '{}'".format(command)) if not command: # If no command was given (i.e. Application was used for dependency # installation), then the application is directly marked as STOPPED super(LinuxApplication, self).set_stopped() else: if self.in_foreground: self._run_in_foreground() else: self._run_in_background() super(LinuxApplication, self).do_start() def _run_in_foreground(self): command = self.get("command") sudo = self.get("sudo") or False x11 = self.get("forwardX11") env = self.get("env") # Command will be launched in foreground and attached to the # terminal using the node 'execute' in non blocking mode. # We save the reference to the process in self._proc # to be able to kill the process from the stop method. # We also set blocking = False, since we don't want the # thread to block until the execution finishes. (out, err), self._proc = self.execute_command(command, env = env, sudo = sudo, forward_x11 = x11, blocking = False) if self._proc.poll(): self.error(msg, out, err) raise RuntimeError(msg) def _run_in_background(self): command = self.get("command") env = self.get("env") sudo = self.get("sudo") or False stdout = "stdout" # low-level spawn tools in both sshfuncs and execfuncs # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged stderr = "stderr" \ if self.get("splitStderr") \ else STDOUT stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \ else None # Command will be run as a daemon in baground and detached from any # terminal. # The command to run was previously uploaded to a bash script # during deployment, now we launch the remote script using 'run' # method from the node. cmd = "bash {}".format(os.path.join(self.app_home, "start.sh")) (out, err), proc = self.node.run(cmd, self.run_home, stdin = stdin, stdout = stdout, stderr = stderr, sudo = sudo) # check if execution errors occurred msg = " Failed to start command '{}' ".format(command) if proc.poll(): self.error(msg, out, err) raise RuntimeError(msg) # Wait for pid file to be generated pid, ppid = self.node.wait_pid(self.run_home) if pid: self._pid = int(pid) if ppid: self._ppid = int(ppid) # If the process is not running, check for error information # on the remote machine if not self.pid or not self.ppid: (out, err), proc = self.node.check_errors(self.run_home, stderr = stderr) # Out is what was written in the stderr file if err: msg = " Failed to start command '{}' ".format(command) self.error(msg, out, err) raise RuntimeError(msg) def do_stop(self): """ Stops application execution """ command = self.get('command') or '' if self.state == ResourceState.STARTED: self.info("Stopping command '{}' ".format(command)) # If the command is running in foreground (it was launched using # the node 'execute' method), then we use the handler to the Popen # process to kill it. Else we send a kill signal using the pid and ppid # retrieved after running the command with the node 'run' method if self._proc: self._proc.kill() else: # Only try to kill the process if the pid and ppid # were retrieved if self.pid and self.ppid: (out, err), proc = self.node.kill(self.pid, self.ppid, sudo = self._sudo_kill) """ # TODO: check if execution errors occurred if (proc and proc.poll()) or err: msg = " Failed to STOP command '{}' ".format(self.get("command")) self.error(msg, out, err) """ super(LinuxApplication, self).do_stop() def do_release(self): self.info("Releasing resource") self.do_stop() tear_down = self.get("tearDown") if tear_down: self.node.execute(tear_down) hard_release = self.get("hardRelease") if hard_release: self.node.rmdir(self.app_home) super(LinuxApplication, self).do_release() @property def state(self): """ Returns the state of the application """ if self._state == ResourceState.STARTED: if self.in_foreground: # Check if the process we used to execute the command # is still running ... retcode = self._proc.poll() # retcode == None -> running # retcode > 0 -> error # retcode == 0 -> finished if retcode: out = "" msg = " Failed to execute command '{}'".format(self.get("command")) err = self._proc.stderr.read() self.error(msg, out, err) self.do_fail() elif retcode == 0: self.set_stopped() else: # We need to query the status of the command we launched in # background. In order to avoid overwhelming the remote host and # the local processor with too many ssh queries, the state is only # requested every 'state_check_delay' seconds. state_check_delay = 0.5 if tdiffsec(tnow(), self._last_state_check) > state_check_delay: if self.pid and self.ppid: # Make sure the process is still running in background status = self.node.status(self.pid, self.ppid) if status == ProcStatus.FINISHED: # If the program finished, check if execution # errors occurred (out, err), proc \ = self.node.check_errors(self.run_home) if err: # Thierry : there's nothing wrong with a non-empty # stderr, is there ? #msg = "Failed to execute command '{}'"\ # .format(self.get("command")) #self.error(msg, out, err) #self.do_fail() # xxx TODO OTOH it would definitely make sense # to check the exitcode pass else: self.set_stopped() self._last_state_check = tnow() return self._state def execute_command(self, command, env = None, sudo = False, tty = False, forward_x11 = False, blocking = False): environ = "" if env: environ = self.node.format_environment(env, inline = True) command = environ + command command = self.replace_paths(command) return self.node.execute(command, sudo = sudo, tty = tty, forward_x11 = forward_x11, blocking = blocking) def replace_paths(self, command, node = None, app_home = None, run_home = None): """ Replace all special path tags with shell-escaped actual paths. """ if not node: node = self.node if not app_home: app_home = self.app_home if not run_home: run_home = self.run_home return ( command .replace("${USR}", node.usr_dir) .replace("${LIB}", node.lib_dir) .replace("${BIN}", node.bin_dir) .replace("${SRC}", node.src_dir) .replace("${SHARE}", node.share_dir) .replace("${EXP}", node.exp_dir) .replace("${EXP_HOME}", node.exp_home) .replace("${APP_HOME}", app_home) .replace("${RUN_HOME}", run_home) .replace("${NODE_HOME}", node.node_home) .replace("${HOME}", node.home_dir) # a shortcut to refer to the file uploaded as 'code = ' .replace("${CODE}", "{}/code".format(app_home)) ) def valid_connection(self, guid): # TODO: Validate! return True