# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec import os import subprocess # TODO: Resolve wildcards in commands!! # TODO: compare_hash for all files that are uploaded! @clsinit class LinuxApplication(ResourceManager): _rtype = "LinuxApplication" @classmethod def _register_attributes(cls): command = Attribute("command", "Command to execute", flags = Flags.ExecReadOnly) forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", flags = Flags.ExecReadOnly) env = Attribute("env", "Environment variables string for command execution", flags = Flags.ExecReadOnly) sudo = Attribute("sudo", "Run with root privileges", flags = Flags.ExecReadOnly) depends = Attribute("depends", "Space-separated list of packages required to run the application", flags = Flags.ExecReadOnly) sources = Attribute("sources", "Space-separated list of regular files to be deployed in the working " "path prior to building. Archives won't be expanded automatically.", flags = Flags.ExecReadOnly) code = Attribute("code", "Plain text source code to be uploaded to the server. It will be stored " "under ${SOURCES}/code", flags = Flags.ExecReadOnly) build = Attribute("build", "Build commands to execute after deploying the sources. " "Sources will be in the ${SOURCES} folder. " "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n" "Try to make the commands return with a nonzero exit code on error.\n" "Also, do not install any programs here, use the 'install' attribute. This will " "help keep the built files constrained to the build folder (which may " "not be the home folder), and will result in faster deployment. Also, " "make sure to clean up temporary files, to reduce bandwidth usage between " "nodes when transferring built packages.", flags = Flags.ReadOnly) install = Attribute("install", "Commands to transfer built files to their final destinations. " "Sources will be in the initial working folder, and a special " "tag ${SOURCES} can be used to reference the experiment's " "home folder (where the application commands will run).\n" "ALL sources and targets needed for execution must be copied there, " "if building has been enabled.\n" "That is, 'slave' nodes will not automatically get any source files. " "'slave' nodes don't get build dependencies either, so if you need " "make and other tools to install, be sure to provide them as " "actual dependencies instead.", flags = Flags.ReadOnly) stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly) stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly) stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly) tear_down = Attribute("tearDown", "Bash script to be executed before " "releasing the resource", flags = Flags.ReadOnly) cls._register_attribute(command) cls._register_attribute(forward_x11) cls._register_attribute(env) cls._register_attribute(sudo) cls._register_attribute(depends) cls._register_attribute(sources) cls._register_attribute(code) cls._register_attribute(build) cls._register_attribute(install) cls._register_attribute(stdin) cls._register_attribute(stdout) cls._register_attribute(stderr) cls._register_attribute(tear_down) @classmethod def _register_traces(cls): stdout = Trace("stdout", "Standard output stream") stderr = Trace("stderr", "Standard error stream") cls._register_trace(stdout) cls._register_trace(stderr) def __init__(self, ec, guid): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None self._home = "app-%s" % self.guid self._in_foreground = False # keep a reference to the running process handler when # the command is not executed as remote daemon in background self._proc = None # timestamp of last state check of the application self._last_state_check = tnow() def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.node.get("hostname"), msg) @property def node(self): node = self.get_connected(LinuxNode.rtype()) if node: return node[0] return None @property def app_home(self): return os.path.join(self.node.exp_home, self._home) @property def src_dir(self): return os.path.join(self.app_home, 'src') @property def build_dir(self): return os.path.join(self.app_home, 'build') @property def pid(self): return self._pid @property def ppid(self): return self._ppid @property def in_foreground(self): """ Returns True if the command needs to be executed in foreground. This means that command will be executed using 'execute' instead of 'run' ('run' executes a command in background and detached from the terminal) When using X11 forwarding option, the command can not run in background and detached from a terminal, since we need to keep the terminal attached to interact with it. """ return self.get("forwardX11") or self._in_foreground def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) path = os.path.join(self.app_home, name) command = "(test -f %s && echo 'success') || echo 'error'" % path (out, err), proc = self.node.execute(command) if (err and proc.poll()) or out.find("error") != -1: msg = " Couldn't find trace %s " % name self.error(msg, out, err) return None if attr == TraceAttr.PATH: return path if attr == TraceAttr.ALL: (out, err), proc = self.node.check_output(self.app_home, name) if err and proc.poll(): msg = " Couldn't read trace %s " % name self.error(msg, out, err) return None return out if attr == TraceAttr.STREAM: cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset) elif attr == TraceAttr.SIZE: cmd = "stat -c%%s %s " % path (out, err), proc = self.node.execute(cmd) if err and proc.poll(): msg = " Couldn't find trace %s " % name self.error(msg, out, err) return None if attr == TraceAttr.SIZE: out = int(out.strip()) return out def provision(self): # create home dir for application self.node.mkdir(self.app_home) # upload sources self.upload_sources() # upload code self.upload_code() # upload stdin self.upload_stdin() # install dependencies self.install_dependencies() # build self.build() # Install self.install() # Upload command to remote bash script # - only if command can be executed in background and detached command = self.get("command") if command and not self.in_foreground: self.info("Uploading command '%s'" % command) # replace application specific paths in the command command = self.replace_paths(command) # replace application specific paths in the environment env = self.get("env") env = env and self.replace_paths(env) self.node.upload_command(command, self.app_home, shfile = "app.sh", env = env) self.info("Provisioning finished") super(LinuxApplication, self).provision() def upload_sources(self): sources = self.get("sources") if sources: self.info("Uploading sources ") # create dir for sources self.node.mkdir(self.src_dir) sources = sources.split(' ') http_sources = list() for source in list(sources): if source.startswith("http") or source.startswith("https"): http_sources.append(source) sources.remove(source) # Download http sources remotely if http_sources: command = [" wget -c --directory-prefix=${SOURCES} "] check = [] for source in http_sources: command.append(" %s " % (source)) check.append(" ls ${SOURCES}/%s " % os.path.basename(source)) command = " ".join(command) check = " ; ".join(check) # Append the command to check that the sources were downloaded command += " ; %s " % check # replace application specific paths in the command command = self.replace_paths(command) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) self.node.run_and_wait(command, self.app_home, shfile = "http_sources.sh", pidfile = "http_sources_pidfile", ecodefile = "http_sources_exitcode", stdout = "http_sources_stdout", stderr = "http_sources_stderr") if sources: self.node.upload(sources, self.src_dir) def upload_code(self): code = self.get("code") if code: # create dir for sources self.node.mkdir(self.src_dir) self.info("Uploading code ") dst = os.path.join(self.src_dir, "code") self.node.upload(sources, dst, text = True) def upload_stdin(self): stdin = self.get("stdin") if stdin: # create dir for sources self.info(" Uploading stdin ") dst = os.path.join(self.app_home, "stdin") # If what we are uploading is a file, check whether # the same file already exists (using md5sum) if self.compare_hash(stdin, dst): return self.node.upload(stdin, dst, text = True) def install_dependencies(self): depends = self.get("depends") if depends: self.info("Installing dependencies %s" % depends) self.node.install_packages(depends, self.app_home) def build(self): build = self.get("build") if build: self.info("Building sources ") # create dir for build self.node.mkdir(self.build_dir) # replace application specific paths in the command command = self.replace_paths(build) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) self.node.run_and_wait(command, self.app_home, shfile = "build.sh", pidfile = "build_pidfile", ecodefile = "build_exitcode", stdout = "build_stdout", stderr = "build_stderr") def install(self): install = self.get("install") if install: self.info("Installing sources ") # replace application specific paths in the command command = self.replace_paths(install) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) self.node.run_and_wait(command, self.app_home, shfile = "install.sh", pidfile = "install_pidfile", ecodefile = "install_exitcode", stdout = "install_stdout", stderr = "install_stderr") def deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) self.ec.schedule(reschedule_delay, self.deploy) else: try: command = self.get("command") or "" self.info("Deploying command '%s' " % command) self.discover() self.provision() except: self._state = ResourceState.FAILED raise super(LinuxApplication, self).deploy() def start(self): command = self.get("command") self.info("Starting command '%s'" % command) if not command: # If no command was given (i.e. Application was used for dependency # installation), then the application is directly marked as FINISHED self._state = ResourceState.FINISHED else: if self.in_foreground: self._start_in_foreground() else: self._start_in_background() super(LinuxApplication, self).start() def _start_in_foreground(self): command = self.get("command") stdin = "stdin" if self.get("stdin") else None sudo = self.get("sudo") or False x11 = self.get("forwardX11") # Command will be launched in foreground and attached to the # terminal using the node 'execute' in non blocking mode. # Export environment env = self.get("env") environ = self.node.format_environment(env, inline = True) command = environ + command command = self.replace_paths(command) # We save the reference to the process in self._proc # to be able to kill the process from the stop method. # We also set blocking = False, since we don't want the # thread to block until the execution finishes. (out, err), self._proc = self.node.execute(command, sudo = sudo, stdin = stdin, forward_x11 = x11, blocking = False) if self._proc.poll(): self._state = ResourceState.FAILED self.error(msg, out, err) raise RuntimeError, msg def _start_in_background(self): command = self.get("command") env = self.get("env") stdin = "stdin" if self.get("stdin") else None stdout = "stdout" if self.get("stdout") else "stdout" stderr = "stderr" if self.get("stderr") else "stderr" sudo = self.get("sudo") or False # Command will be as a daemon in baground and detached from any terminal. # The real command to run was previously uploaded to a bash script # during deployment, now launch the remote script using 'run' # method from the node cmd = "bash ./app.sh" (out, err), proc = self.node.run(cmd, self.app_home, stdin = stdin, stdout = stdout, stderr = stderr, sudo = sudo) # check if execution errors occurred msg = " Failed to start command '%s' " % command if proc.poll(): self._state = ResourceState.FAILED self.error(msg, out, err) raise RuntimeError, msg # Wait for pid file to be generated pid, ppid = self.node.wait_pid(self.app_home) if pid: self._pid = int(pid) if ppid: self._ppid = int(ppid) # If the process is not running, check for error information # on the remote machine if not self.pid or not self.ppid: (out, err), proc = self.node.check_errors(self.app_home, stderr = stderr) # Out is what was written in the stderr file if err: self._state = ResourceState.FAILED msg = " Failed to start command '%s' " % command self.error(msg, out, err) raise RuntimeError, msg def stop(self): """ Stops application execution """ command = self.get('command') or '' if self.state == ResourceState.STARTED: stopped = True self.info("Stopping command '%s'" % command) # If the command is running in foreground (it was launched using # the node 'execute' method), then we use the handler to the Popen # process to kill it. Else we send a kill signal using the pid and ppid # retrieved after running the command with the node 'run' method if self._proc: self._proc.kill() else: # Only try to kill the process if the pid and ppid # were retrieved if self.pid and self.ppid: (out, err), proc = self.node.kill(self.pid, self.ppid) if out or err: # check if execution errors occurred msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) self._state = ResourceState.FAILED stopped = False if stopped: super(LinuxApplication, self).stop() def release(self): self.info("Releasing resource") tear_down = self.get("tearDown") if tear_down: self.node.execute(tear_down) self.stop() if self.state == ResourceState.STOPPED: super(LinuxApplication, self).release() @property def state(self): """ Returns the state of the application """ if self._state == ResourceState.STARTED: if self.in_foreground: # Check if the process we used to execute the command # is still running ... retcode = self._proc.poll() # retcode == None -> running # retcode > 0 -> error # retcode == 0 -> finished if retcode: out = "" msg = " Failed to execute command '%s'" % self.get("command") err = self._proc.stderr.read() self.error(msg, out, err) self._state = ResourceState.FAILED elif retcode == 0: self._state = ResourceState.FINISHED else: # We need to query the status of the command we launched in # background. In oredr to avoid overwhelming the remote host and # the local processor with too many ssh queries, the state is only # requested every 'state_check_delay' seconds. state_check_delay = 0.5 if tdiffsec(tnow(), self._last_state_check) > state_check_delay: # check if execution errors occurred (out, err), proc = self.node.check_errors(self.app_home) if err: msg = " Failed to execute command '%s'" % self.get("command") self.error(msg, out, err) self._state = ResourceState.FAILED elif self.pid and self.ppid: # No execution errors occurred. Make sure the background # process with the recorded pid is still running. status = self.node.status(self.pid, self.ppid) if status == ProcStatus.FINISHED: self._state = ResourceState.FINISHED self._last_state_check = tnow() return self._state def replace_paths(self, command): """ Replace all special path tags with shell-escaped actual paths. """ def absolute_dir(d): return d if d.startswith("/") else os.path.join("${HOME}", d) return ( command .replace("${SOURCES}", absolute_dir(self.src_dir)) .replace("${BUILD}", absolute_dir(self.build_dir)) .replace("${APP_HOME}", absolute_dir(self.app_home)) .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) ) def compare_hash(self, local, remote): # getting md5sum from remote file (out, err), proc = self.node.execute("md5sum %s " % remote) if proc.poll() == 0: #OK if not os.path.isfile(local): # store to a tmp file f = tempfile.NamedTemporaryFile() f.write(local) f.flush() local = f.name lproc = subprocess.Popen(["md5sum", local], stdout = subprocess.PIPE, stderr = subprocess.PIPE) # getting md5sum from local file (lout, lerr) = lproc.communicate() # files are the same, no need to upload lchk = lout.strip().split(" ")[0] rchk = out.strip().split(" ")[0] msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % ( local, lchk, remote, rchk) self.debug(msg) if lchk == rchk: return True return False def valid_connection(self, guid): # TODO: Validate! return True