# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import strfnow, strfdiff import os import subprocess # TODO: Resolve wildcards in commands!! # TODO: compare_hash for all files that are uploaded! @clsinit class LinuxApplication(ResourceManager): _rtype = "LinuxApplication" @classmethod def _register_attributes(cls): command = Attribute("command", "Command to execute", flags = Flags.ExecReadOnly) forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", flags = Flags.ExecReadOnly) env = Attribute("env", "Environment variables string for command execution", flags = Flags.ExecReadOnly) sudo = Attribute("sudo", "Run with root privileges", flags = Flags.ExecReadOnly) depends = Attribute("depends", "Space-separated list of packages required to run the application", flags = Flags.ExecReadOnly) sources = Attribute("sources", "Space-separated list of regular files to be deployed in the working " "path prior to building. Archives won't be expanded automatically.", flags = Flags.ExecReadOnly) code = Attribute("code", "Plain text source code to be uploaded to the server. It will be stored " "under ${SOURCES}/code", flags = Flags.ExecReadOnly) build = Attribute("build", "Build commands to execute after deploying the sources. " "Sources will be in the ${SOURCES} folder. " "Example: tar xzf ${SOURCES}/my-app.tgz && cd my-app && ./configure && make && make clean.\n" "Try to make the commands return with a nonzero exit code on error.\n" "Also, do not install any programs here, use the 'install' attribute. This will " "help keep the built files constrained to the build folder (which may " "not be the home folder), and will result in faster deployment. Also, " "make sure to clean up temporary files, to reduce bandwidth usage between " "nodes when transferring built packages.", flags = Flags.ReadOnly) install = Attribute("install", "Commands to transfer built files to their final destinations. " "Sources will be in the initial working folder, and a special " "tag ${SOURCES} can be used to reference the experiment's " "home folder (where the application commands will run).\n" "ALL sources and targets needed for execution must be copied there, " "if building has been enabled.\n" "That is, 'slave' nodes will not automatically get any source files. " "'slave' nodes don't get build dependencies either, so if you need " "make and other tools to install, be sure to provide them as " "actual dependencies instead.", flags = Flags.ReadOnly) stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly) stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly) stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly) tear_down = Attribute("tearDown", "Bash script to be executed before " "releasing the resource", flags = Flags.ReadOnly) cls._register_attribute(command) cls._register_attribute(forward_x11) cls._register_attribute(env) cls._register_attribute(sudo) cls._register_attribute(depends) cls._register_attribute(sources) cls._register_attribute(code) cls._register_attribute(build) cls._register_attribute(install) cls._register_attribute(stdin) cls._register_attribute(stdout) cls._register_attribute(stderr) cls._register_attribute(tear_down) @classmethod def _register_traces(cls): stdout = Trace("stdout", "Standard output stream") stderr = Trace("stderr", "Standard error stream") cls._register_trace(stdout) cls._register_trace(stderr) def __init__(self, ec, guid): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None self._home = "app-%s" % self.guid self._in_foreground = False # keep a reference to the running process handler when # the command is not executed as remote daemon in background self._proc = None # timestamp of last state check of the application self._last_state_check = strfnow() def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.node.get("hostname"), msg) @property def node(self): node = self.get_connected(LinuxNode.rtype()) if node: return node[0] return None @property def app_home(self): return os.path.join(self.node.exp_home, self._home) @property def src_dir(self): return os.path.join(self.app_home, 'src') @property def build_dir(self): return os.path.join(self.app_home, 'build') @property def pid(self): return self._pid @property def ppid(self): return self._ppid @property def in_foreground(self): """ Returns True if the command needs to be executed in foreground. This means that command will be executed using 'execute' instead of 'run' ('run' executes a command in background and detached from the terminal) When using X11 forwarding option, the command can not run in background and detached from a terminal, since we need to keep the terminal attached to interact with it. """ return self.get("forwardX11") or self._in_foreground def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) path = os.path.join(self.app_home, name) command = "(test -f %s && echo 'success') || echo 'error'" % path (out, err), proc = self.node.execute(command) if (err and proc.poll()) or out.find("error") != -1: msg = " Couldn't find trace %s " % name self.error(msg, out, err) return None if attr == TraceAttr.PATH: return path if attr == TraceAttr.ALL: (out, err), proc = self.node.check_output(self.app_home, name) if err and proc.poll(): msg = " Couldn't read trace %s " % name self.error(msg, out, err) return None return out if attr == TraceAttr.STREAM: cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset) elif attr == TraceAttr.SIZE: cmd = "stat -c%%s %s " % path (out, err), proc = self.node.execute(cmd) if err and proc.poll(): msg = " Couldn't find trace %s " % name self.error(msg, out, err) return None if attr == TraceAttr.SIZE: out = int(out.strip()) return out def provision(self): # create home dir for application self.node.mkdir(self.app_home) # upload sources self.upload_sources() # upload code self.upload_code() # upload stdin self.upload_stdin() # install dependencies self.install_dependencies() # build self.build() # Install self.install() # Upload command to remote bash script # - only if command can be executed in background and detached command = self.get("command") if command and not self.in_foreground: self.info("Uploading command '%s'" % command) # replace application specific paths in the command command = self.replace_paths(command) # replace application specific paths in the environment env = self.get("env") env = env and self.replace_paths(env) self.node.upload_command(command, self.app_home, shfile = "app.sh", env = env) self.info("Provisioning finished") super(LinuxApplication, self).provision() def upload_sources(self): sources = self.get("sources") if sources: self.info("Uploading sources ") # create dir for sources self.node.mkdir(self.src_dir) sources = sources.split(' ') http_sources = list() for source in list(sources): if source.startswith("http") or source.startswith("https"): http_sources.append(source) sources.remove(source) # Download http sources remotely if http_sources: command = [" wget -c --directory-prefix=${SOURCES} "] check = [] for source in http_sources: command.append(" %s " % (source)) check.append(" ls ${SOURCES}/%s " % os.path.basename(source)) command = " ".join(command) check = " ; ".join(check) # Append the command to check that the sources were downloaded command += " ; %s " % check # replace application specific paths in the command command = self.replace_paths(command) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) self.node.run_and_wait(command, self.app_home, shfile = "http_sources.sh", pidfile = "http_sources_pidfile", ecodefile = "http_sources_exitcode", stdout = "http_sources_stdout", stderr = "http_sources_stderr") if sources: self.node.upload(sources, self.src_dir) def upload_code(self): code = self.get("code") if code: # create dir for sources self.node.mkdir(self.src_dir) self.info("Uploading code ") dst = os.path.join(self.src_dir, "code") self.node.upload(sources, dst, text = True) def upload_stdin(self): stdin = self.get("stdin") if stdin: # create dir for sources self.info(" Uploading stdin ") dst = os.path.join(self.app_home, "stdin") # If what we are uploading is a file, check whether # the same file already exists (using md5sum) if self.compare_hash(stdin, dst): return self.node.upload(stdin, dst, text = True) def install_dependencies(self): depends = self.get("depends") if depends: self.info("Installing dependencies %s" % depends) self.node.install_packages(depends, self.app_home) def build(self): build = self.get("build") if build: self.info("Building sources ") # create dir for build self.node.mkdir(self.build_dir) # replace application specific paths in the command command = self.replace_paths(build) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) self.node.run_and_wait(command, self.app_home, shfile = "build.sh", pidfile = "build_pidfile", ecodefile = "build_exitcode", stdout = "build_stdout", stderr = "build_stderr") def install(self): install = self.get("install") if install: self.info("Installing sources ") # replace application specific paths in the command command = self.replace_paths(install) # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) self.node.run_and_wait(command, self.app_home, shfile = "install.sh", pidfile = "install_pidfile", ecodefile = "install_exitcode", stdout = "install_stdout", stderr = "install_stderr") def deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) reschedule_delay = "0.5s" self.ec.schedule(reschedule_delay, self.deploy) else: try: command = self.get("command") or "" self.info("Deploying command '%s' " % command) self.discover() self.provision() except: self._state = ResourceState.FAILED raise super(LinuxApplication, self).deploy() def start(self): command = self.get("command") self.info("Starting command '%s'" % command) if not command: # If no command was given (i.e. Application was used for dependency # installation), then the application is directly marked as FINISHED self._state = ResourceState.FINISHED else: if self.in_foreground: self._start_in_foreground() else: self._start_in_background() super(LinuxApplication, self).start() def _start_in_foreground(self): command = self.get("command") stdin = "stdin" if self.get("stdin") else None sudo = self.get("sudo") or False x11 = self.get("forwardX11") # Command will be launched in foreground and attached to the # terminal using the node 'execute' in non blocking mode. # Export environment env = self.get("env") environ = self.node.format_environment(env, inline = True) command = environ + command command = self.replace_paths(command) # We save the reference to the process in self._proc # to be able to kill the process from the stop method. # We also set blocking = False, since we don't want the # thread to block until the execution finishes. (out, err), self._proc = self.node.execute(command, sudo = sudo, stdin = stdin, forward_x11 = x11, blocking = False) if self._proc.poll(): self._state = ResourceState.FAILED self.error(msg, out, err) raise RuntimeError, msg def _start_in_background(self): command = self.get("command") env = self.get("env") stdin = "stdin" if self.get("stdin") else None stdout = "stdout" if self.get("stdout") else "stdout" stderr = "stderr" if self.get("stderr") else "stderr" sudo = self.get("sudo") or False # Command will be as a daemon in baground and detached from any terminal. # The real command to run was previously uploaded to a bash script # during deployment, now launch the remote script using 'run' # method from the node cmd = "bash ./app.sh" (out, err), proc = self.node.run(cmd, self.app_home, stdin = stdin, stdout = stdout, stderr = stderr, sudo = sudo) # check if execution errors occurred msg = " Failed to start command '%s' " % command if proc.poll(): self._state = ResourceState.FAILED self.error(msg, out, err) raise RuntimeError, msg # Wait for pid file to be generated pid, ppid = self.node.wait_pid(self.app_home) if pid: self._pid = int(pid) if ppid: self._ppid = int(ppid) # If the process is not running, check for error information # on the remote machine if not self.pid or not self.ppid: (out, err), proc = self.node.check_errors(self.app_home, stderr = stderr) # Out is what was written in the stderr file if err: self._state = ResourceState.FAILED msg = " Failed to start command '%s' " % command self.error(msg, out, err) raise RuntimeError, msg def stop(self): """ Stops application execution """ command = self.get('command') or '' if self.state == ResourceState.STARTED: stopped = True self.info("Stopping command '%s'" % command) # If the command is running in foreground (it was launched using # the node 'execute' method), then we use the handler to the Popen # process to kill it. Else we send a kill signal using the pid and ppid # retrieved after running the command with the node 'run' method if self._proc: self._proc.kill() else: # Only try to kill the process if the pid and ppid # were retrieved if self.pid and self.ppid: (out, err), proc = self.node.kill(self.pid, self.ppid) if out or err: # check if execution errors occurred msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) self._state = ResourceState.FAILED stopped = False if stopped: super(LinuxApplication, self).stop() def release(self): self.info("Releasing resource") tear_down = self.get("tearDown") if tear_down: self.node.execute(tear_down) self.stop() if self.state == ResourceState.STOPPED: super(LinuxApplication, self).release() @property def state(self): """ Returns the state of the application """ if self._state == ResourceState.STARTED: if self.in_foreground: # Check if the process we used to execute the command # is still running ... retcode = self._proc.poll() # retcode == None -> running # retcode > 0 -> error # retcode == 0 -> finished if retcode: out = "" msg = " Failed to execute command '%s'" % self.get("command") err = self._proc.stderr.read() self.error(msg, out, err) self._state = ResourceState.FAILED elif retcode == 0: self._state = ResourceState.FINISHED else: # We need to query the status of the command we launched in # background. In oredr to avoid overwhelming the remote host and # the local processor with too many ssh queries, the state is only # requested every 'state_check_delay' seconds. state_check_delay = 0.5 if strfdiff(strfnow(), self._last_state_check) > state_check_delay: # check if execution errors occurred (out, err), proc = self.node.check_errors(self.app_home) if err: msg = " Failed to execute command '%s'" % self.get("command") self.error(msg, out, err) self._state = ResourceState.FAILED elif self.pid and self.ppid: # No execution errors occurred. Make sure the background # process with the recorded pid is still running. status = self.node.status(self.pid, self.ppid) if status == ProcStatus.FINISHED: self._state = ResourceState.FINISHED self._last_state_check = strfnow() return self._state def replace_paths(self, command): """ Replace all special path tags with shell-escaped actual paths. """ def absolute_dir(d): return d if d.startswith("/") else os.path.join("${HOME}", d) return ( command .replace("${SOURCES}", absolute_dir(self.src_dir)) .replace("${BUILD}", absolute_dir(self.build_dir)) .replace("${APP_HOME}", absolute_dir(self.app_home)) .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) ) def compare_hash(self, local, remote): # getting md5sum from remote file (out, err), proc = self.node.execute("md5sum %s " % remote) if proc.poll() == 0: #OK if not os.path.isfile(local): # store to a tmp file f = tempfile.NamedTemporaryFile() f.write(local) f.flush() local = f.name lproc = subprocess.Popen(["md5sum", local], stdout = subprocess.PIPE, stderr = subprocess.PIPE) # getting md5sum from local file (lout, lerr) = lproc.communicate() # files are the same, no need to upload lchk = lout.strip().split(" ")[0] rchk = out.strip().split(" ")[0] msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % ( local, lchk, remote, rchk) self.debug(msg) if lchk == rchk: return True return False def valid_connection(self, guid): # TODO: Validate! return True