X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fapplication.py;h=89a9cfb1cefde070fd0c6e231222535db031e07f;hb=ed43f6aedfff63aaa7c7e8e58064993e7c11fc60;hp=506285ab02c8602b46f2002befb9168d2dbe0779;hpb=9bc3d34df11fdaf2138df236ff7ea842cda06215;p=nepi.git diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 506285ab..89a9cfb1 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -19,8 +19,8 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr -from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ - reschedule_delay +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState from nepi.resources.linux.node import LinuxNode from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -29,14 +29,9 @@ import os import subprocess # TODO: Resolve wildcards in commands!! -# TODO: During provisioning, everything that is not scp could be -# uploaded to a same script, http_sources download, etc... -# and like that require performing less ssh connections!!! -# TODO: Make stdin be a symlink to the original file in ${SHARE} -# - later use md5sum to check wether the file needs to be re-upload +# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!! - -@clsinit +@clsinit_copy class LinuxApplication(ResourceManager): """ .. class:: Class Args : @@ -48,34 +43,34 @@ class LinuxApplication(ResourceManager): .. note:: - A LinuxApplication RM represents a process that can be executed in - a remote Linux host using SSH. + A LinuxApplication RM represents a process that can be executed in + a remote Linux host using SSH. - The LinuxApplication RM takes care of uploadin sources and any files - needed to run the experiment, to the remote host. - It also allows to provide source compilation (build) and installation - instructions, and takes care of automating the sources build and - installation tasks for the user. + The LinuxApplication RM takes care of uploadin sources and any files + needed to run the experiment, to the remote host. + It also allows to provide source compilation (build) and installation + instructions, and takes care of automating the sources build and + installation tasks for the user. - It is important to note that files uploaded to the remote host have - two possible scopes: single-experiment or multi-experiment. - Single experiment files are those that will not be re-used by other - experiments. Multi-experiment files are those that will. - Sources and shared files are always made available to all experiments. + It is important to note that files uploaded to the remote host have + two possible scopes: single-experiment or multi-experiment. + Single experiment files are those that will not be re-used by other + experiments. Multi-experiment files are those that will. + Sources and shared files are always made available to all experiments. - Directory structure: + Directory structure: - The directory structure used by LinuxApplication RM at the Linux - host is the following: + The directory structure used by LinuxApplication RM at the Linux + host is the following: - ${HOME}/nepi-usr --> Base directory for multi-experiment files + ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files | ${LIB} |- /lib --> Base directory for libraries ${BIN} |- /bin --> Base directory for binary files ${SRC} |- /src --> Base directory for sources ${SHARE} |- /share --> Base directory for other files - ${HOME}/nepi-exp --> Base directory for single-experiment files + ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files | ${EXP_HOME} |- / --> Base directory for experiment exp-id | @@ -86,50 +81,52 @@ class LinuxApplication(ResourceManager): """ - _rtype = "LinuxApplication" + _rtype = "linux::Application" + _help = "Runs an application on a Linux host with a BASH command " + _platform = "linux" @classmethod def _register_attributes(cls): command = Attribute("command", "Command to execute at application start. " "Note that commands will be executed in the ${RUN_HOME} directory, " "make sure to take this into account when using relative paths. ", - flags = Flags.ExecReadOnly) + flags = Flags.Design) forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", - flags = Flags.ExecReadOnly) + flags = Flags.Design) env = Attribute("env", "Environment variables string for command execution", - flags = Flags.ExecReadOnly) + flags = Flags.Design) sudo = Attribute("sudo", "Run with root privileges", - flags = Flags.ExecReadOnly) + flags = Flags.Design) depends = Attribute("depends", "Space-separated list of packages required to run the application", - flags = Flags.ExecReadOnly) + flags = Flags.Design) sources = Attribute("sources", - "Space-separated list of regular files to be uploaded to ${SRC} " + "semi-colon separated list of regular files to be uploaded to ${SRC} " "directory prior to building. Archives won't be expanded automatically. " "Sources are globally available for all experiments unless " "cleanHome is set to True (This will delete all sources). ", - flags = Flags.ExecReadOnly) + flags = Flags.Design) files = Attribute("files", - "Space-separated list of regular miscellaneous files to be uploaded " + "semi-colon separated list of regular miscellaneous files to be uploaded " "to ${SHARE} directory. " "Files are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", - flags = Flags.ExecReadOnly) + flags = Flags.Design) libs = Attribute("libs", - "Space-separated list of libraries (e.g. .so files) to be uploaded " + "semi-colon separated list of libraries (e.g. .so files) to be uploaded " "to ${LIB} directory. " "Libraries are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", - flags = Flags.ExecReadOnly) + flags = Flags.Design) bins = Attribute("bins", - "Space-separated list of binary files to be uploaded " + "semi-colon separated list of binary files to be uploaded " "to ${BIN} directory. " "Binaries are globally available for all experiments unless " "cleanHome is set to True (This will delete all files). ", - flags = Flags.ExecReadOnly) + flags = Flags.Design) code = Attribute("code", "Plain text source code to be uploaded to the ${APP_HOME} directory. ", - flags = Flags.ExecReadOnly) + flags = Flags.Design) build = Attribute("build", "Build commands to execute after deploying the sources. " "Sources are uploaded to the ${SRC} directory and code " @@ -138,16 +135,16 @@ class LinuxApplication(ResourceManager): "./configure && make && make clean.\n" "Make sure to make the build commands return with a nonzero exit " "code on error.", - flags = Flags.ReadOnly) + flags = Flags.Design) install = Attribute("install", "Commands to transfer built files to their final destinations. " "Install commands are executed after build commands. ", - flags = Flags.ReadOnly) + flags = Flags.Design) stdin = Attribute("stdin", "Standard input for the 'command'", - flags = Flags.ExecReadOnly) + flags = Flags.Design) tear_down = Attribute("tearDown", "Command to be executed just before " "releasing the resource", - flags = Flags.ReadOnly) + flags = Flags.Design) cls._register_attribute(command) cls._register_attribute(forward_x11) @@ -166,8 +163,8 @@ class LinuxApplication(ResourceManager): @classmethod def _register_traces(cls): - stdout = Trace("stdout", "Standard output stream") - stderr = Trace("stderr", "Standard error stream") + stdout = Trace("stdout", "Standard output stream", enabled = True) + stderr = Trace("stderr", "Standard error stream", enabled = True) cls._register_trace(stdout) cls._register_trace(stderr) @@ -176,25 +173,39 @@ class LinuxApplication(ResourceManager): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None + self._node = None self._home = "app-%s" % self.guid + + # whether the command should run in foreground attached + # to a terminal self._in_foreground = False + # whether to use sudo to kill the application process + self._sudo_kill = False + # keep a reference to the running process handler when # the command is not executed as remote daemon in background self._proc = None # timestamp of last state check of the application self._last_state_check = tnow() - + def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.node.get("hostname"), msg) @property def node(self): - node = self.get_connected(LinuxNode.rtype()) - if node: return node[0] - return None + if not self._node: + node = self.get_connected(LinuxNode.get_rtype()) + if not node: + msg = "Application %s guid %d NOT connected to Node" % ( + self._rtype, self.guid) + raise RuntimeError, msg + + self._node = node[0] + + return self._node @property def app_home(self): @@ -225,10 +236,13 @@ class LinuxApplication(ResourceManager): """ return self.get("forwardX11") or self._in_foreground + def trace_filepath(self, filename): + return os.path.join(self.run_home, filename) + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) - path = os.path.join(self.run_home, name) + path = self.trace_filepath(name) command = "(test -f %s && echo 'success') || echo 'error'" % path (out, err), proc = self.node.execute(command) @@ -244,7 +258,7 @@ class LinuxApplication(ResourceManager): if attr == TraceAttr.ALL: (out, err), proc = self.node.check_output(self.run_home, name) - if err and proc.poll(): + if proc.poll(): msg = " Couldn't read trace %s " % name self.error(msg, out, err) return None @@ -258,7 +272,7 @@ class LinuxApplication(ResourceManager): (out, err), proc = self.node.execute(cmd) - if err and proc.poll(): + if proc.poll(): msg = " Couldn't find trace %s " % name self.error(msg, out, err) return None @@ -267,8 +281,22 @@ class LinuxApplication(ResourceManager): out = int(out.strip()) return out + + def do_provision(self): + # take a snapshot of the system if user is root + # to ensure that cleanProcess will not kill + # pre-existent processes + if self.node.get("username") == 'root': + import pickle + procs = dict() + ps_aux = "ps aux |awk '{print $2,$11}'" + (out, err), proc = self.node.execute(ps_aux) + if len(out) != 0: + for line in out.strip().split("\n"): + parts = line.strip().split(" ") + procs[parts[0]] = parts[1] + pickle.dump(procs, open("/tmp/save.proc", "wb")) - def provision(self): # create run dir for application self.node.mkdir(self.run_home) @@ -298,8 +326,9 @@ class LinuxApplication(ResourceManager): # Since provisioning takes a long time, before # each step we check that the EC is still for step in steps: - if self.ec.finished: - raise RuntimeError, "EC finished" + if self.ec.abort: + self.debug("Interrupting provisioning. EC says 'ABORT") + return ret = step() if ret: @@ -314,9 +343,9 @@ class LinuxApplication(ResourceManager): self.info("Provisioning finished") - super(LinuxApplication, self).provision() + super(LinuxApplication, self).do_provision() - def upload_start_command(self): + def upload_start_command(self, overwrite = False): # Upload command to remote bash script # - only if command can be executed in background and detached command = self.get("command") @@ -335,31 +364,43 @@ class LinuxApplication(ResourceManager): self.node.upload_command(command, shfile = shfile, - env = env) + env = env, + overwrite = overwrite) - def execute_deploy_command(self, command): + def execute_deploy_command(self, command, prefix="deploy"): if command: + # replace application specific paths in the command + command = self.replace_paths(command) + + # replace application specific paths in the environment + env = self.get("env") + env = env and self.replace_paths(env) + # Upload the command to a bash script and run it # in background ( but wait until the command has # finished to continue ) - shfile = os.path.join(self.app_home, "deploy.sh") + shfile = os.path.join(self.app_home, "%s.sh" % prefix) self.node.run_and_wait(command, self.run_home, shfile = shfile, overwrite = False, - pidfile = "deploy_pidfile", - ecodefile = "deploy_exitcode", - stdout = "deploy_stdout", - stderr = "deploy_stderr") - - def upload_sources(self): - sources = self.get("sources") + pidfile = "%s_pidfile" % prefix, + ecodefile = "%s_exitcode" % prefix, + stdout = "%s_stdout" % prefix, + stderr = "%s_stderr" % prefix) + + def upload_sources(self, sources = None, src_dir = None): + if not sources: + sources = self.get("sources") command = "" + if not src_dir: + src_dir = self.node.src_dir + if sources: self.info("Uploading sources ") - sources = sources.split(' ') + sources = map(str.strip, sources.split(";")) # Separate sources that should be downloaded from # the web, from sources that should be uploaded from @@ -372,15 +413,16 @@ class LinuxApplication(ResourceManager): command.append( " ( " # Check if the source already exists - " ls ${SRC}/%(basename)s " + " ls %(src_dir)s/%(basename)s " " || ( " # If source doesn't exist, download it and check # that it it downloaded ok - " wget -c --directory-prefix=${SRC} %(source)s && " - " ls ${SRC}/%(basename)s " + " wget -c --directory-prefix=%(src_dir)s %(source)s && " + " ls %(src_dir)s/%(basename)s " " ) ) " % { "basename": os.path.basename(source), - "source": source + "source": source, + "src_dir": src_dir }) command = " && ".join(command) @@ -389,34 +431,38 @@ class LinuxApplication(ResourceManager): command = self.replace_paths(command) if sources: - sources = ' '.join(sources) - self.node.upload(sources, self.node.src_dir, overwrite = False) + sources = ';'.join(sources) + self.node.upload(sources, src_dir, overwrite = False) return command - def upload_files(self): - files = self.get("files") + def upload_files(self, files = None): + if not files: + files = self.get("files") if files: self.info("Uploading files %s " % files) self.node.upload(files, self.node.share_dir, overwrite = False) - def upload_libraries(self): - libs = self.get("libs") + def upload_libraries(self, libs = None): + if not libs: + libs = self.get("libs") if libs: self.info("Uploading libraries %s " % libaries) self.node.upload(libs, self.node.lib_dir, overwrite = False) - def upload_binaries(self): - bins = self.get("bins") + def upload_binaries(self, bins = None): + if not bins: + bins = self.get("bins") if bins: self.info("Uploading binaries %s " % binaries) self.node.upload(bins, self.node.bin_dir, overwrite = False) - def upload_code(self): - code = self.get("code") + def upload_code(self, code = None): + if not code: + code = self.get("code") if code: self.info("Uploading code") @@ -424,23 +470,41 @@ class LinuxApplication(ResourceManager): dst = os.path.join(self.app_home, "code") self.node.upload(code, dst, overwrite = False, text = True) - def upload_stdin(self): - stdin = self.get("stdin") + def upload_stdin(self, stdin = None): + if not stdin: + stdin = self.get("stdin") + if stdin: # create dir for sources self.info("Uploading stdin") - dst = os.path.join(self.app_home, "stdin") + # upload stdin file to ${SHARE_DIR} directory + if os.path.isfile(stdin): + basename = os.path.basename(stdin) + dst = os.path.join(self.node.share_dir, basename) + else: + dst = os.path.join(self.app_home, "stdin") + self.node.upload(stdin, dst, overwrite = False, text = True) - def install_dependencies(self): - depends = self.get("depends") + # create "stdin" symlink on ${APP_HOME} directory + command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({ + "app_home": self.app_home, + "stdin": dst }) + + return command + + def install_dependencies(self, depends = None): + if not depends: + depends = self.get("depends") + if depends: self.info("Installing dependencies %s" % depends) - self.node.install_packages(depends, self.app_home, self.run_home) + return self.node.install_packages_command(depends) - def build(self): - build = self.get("build") + def build(self, build = None): + if not build: + build = self.get("build") if build: self.info("Building sources ") @@ -448,8 +512,9 @@ class LinuxApplication(ResourceManager): # replace application specific paths in the command return self.replace_paths(build) - def install(self): - install = self.get("install") + def install(self, install = None): + if not install: + install = self.get("install") if install: self.info("Installing sources ") @@ -457,76 +522,61 @@ class LinuxApplication(ResourceManager): # replace application specific paths in the command return self.replace_paths(install) - def deploy(self): + def do_deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) - self.ec.schedule(reschedule_delay, self.deploy) + self.ec.schedule(self.reschedule_delay, self.deploy) else: - try: - command = self.get("command") or "" - self.info("Deploying command '%s' " % command) - self.discover() - self.provision() - except: - self.fail() - raise - - super(LinuxApplication, self).deploy() - - def start(self): + command = self.get("command") or "" + self.info("Deploying command '%s' " % command) + self.do_discover() + self.do_provision() + + super(LinuxApplication, self).do_deploy() + + def do_start(self): command = self.get("command") self.info("Starting command '%s'" % command) if not command: # If no command was given (i.e. Application was used for dependency - # installation), then the application is directly marked as FINISHED - self._state = ResourceState.FINISHED + # installation), then the application is directly marked as STOPPED + super(LinuxApplication, self).set_stopped() else: - if self.in_foreground: - self._start_in_foreground() + self._run_in_foreground() else: - self._start_in_background() + self._run_in_background() - super(LinuxApplication, self).start() + super(LinuxApplication, self).do_start() - def _start_in_foreground(self): + def _run_in_foreground(self): command = self.get("command") sudo = self.get("sudo") or False x11 = self.get("forwardX11") - - # For a command being executed in foreground, if there is stdin, - # it is expected to be text string not a file or pipe - stdin = self.get("stdin") or None + env = self.get("env") # Command will be launched in foreground and attached to the # terminal using the node 'execute' in non blocking mode. - # Export environment - env = self.get("env") - environ = self.node.format_environment(env, inline = True) - command = environ + command - command = self.replace_paths(command) - # We save the reference to the process in self._proc # to be able to kill the process from the stop method. # We also set blocking = False, since we don't want the # thread to block until the execution finishes. - (out, err), self._proc = self.node.execute(command, + (out, err), self._proc = self.execute_command(command, + env = env, sudo = sudo, - stdin = stdin, forward_x11 = x11, blocking = False) if self._proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg - def _start_in_background(self): + def _run_in_background(self): command = self.get("command") env = self.get("env") sudo = self.get("sudo") or False @@ -552,7 +602,6 @@ class LinuxApplication(ResourceManager): msg = " Failed to start command '%s' " % command if proc.poll(): - self.fail() self.error(msg, out, err) raise RuntimeError, msg @@ -569,56 +618,56 @@ class LinuxApplication(ResourceManager): # Out is what was written in the stderr file if err: - self.fail() msg = " Failed to start command '%s' " % command self.error(msg, out, err) raise RuntimeError, msg - - def stop(self): + + def do_stop(self): """ Stops application execution """ command = self.get('command') or '' if self.state == ResourceState.STARTED: - stopped = True - - self.info("Stopping command '%s'" % command) + + self.info("Stopping command '%s' " % command) # If the command is running in foreground (it was launched using # the node 'execute' method), then we use the handler to the Popen # process to kill it. Else we send a kill signal using the pid and ppid # retrieved after running the command with the node 'run' method - if self._proc: self._proc.kill() else: # Only try to kill the process if the pid and ppid # were retrieved if self.pid and self.ppid: - (out, err), proc = self.node.kill(self.pid, self.ppid) + (out, err), proc = self.node.kill(self.pid, self.ppid, + sudo = self._sudo_kill) - if out or err: - # check if execution errors occurred + """ + # TODO: check if execution errors occurred + if (proc and proc.poll()) or err: msg = " Failed to STOP command '%s' " % self.get("command") self.error(msg, out, err) - self.fail() - stopped = False + """ - if stopped: - super(LinuxApplication, self).stop() + super(LinuxApplication, self).do_stop() - def release(self): + def do_release(self): self.info("Releasing resource") + self.do_stop() + tear_down = self.get("tearDown") if tear_down: self.node.execute(tear_down) - self.stop() + hard_release = self.get("hardRelease") + if hard_release: + self.node.rmdir(self.app_home) - if self.state == ResourceState.STOPPED: - super(LinuxApplication, self).release() - + super(LinuxApplication, self).do_release() + @property def state(self): """ Returns the state of the application @@ -637,53 +686,83 @@ class LinuxApplication(ResourceManager): msg = " Failed to execute command '%s'" % self.get("command") err = self._proc.stderr.read() self.error(msg, out, err) - self.fail() - elif retcode == 0: - self._state = ResourceState.FINISHED + self.do_fail() + elif retcode == 0: + self.set_stopped() else: # We need to query the status of the command we launched in - # background. In oredr to avoid overwhelming the remote host and + # background. In order to avoid overwhelming the remote host and # the local processor with too many ssh queries, the state is only # requested every 'state_check_delay' seconds. state_check_delay = 0.5 if tdiffsec(tnow(), self._last_state_check) > state_check_delay: - # check if execution errors occurred - (out, err), proc = self.node.check_errors(self.run_home) - - if err: - msg = " Failed to execute command '%s'" % self.get("command") - self.error(msg, out, err) - self.fail() - - elif self.pid and self.ppid: - # No execution errors occurred. Make sure the background - # process with the recorded pid is still running. + if self.pid and self.ppid: + # Make sure the process is still running in background status = self.node.status(self.pid, self.ppid) if status == ProcStatus.FINISHED: - self._state = ResourceState.FINISHED + # If the program finished, check if execution + # errors occurred + (out, err), proc = self.node.check_errors( + self.run_home) + + if err: + msg = "Failed to execute command '%s'" % \ + self.get("command") + self.error(msg, out, err) + self.do_fail() + else: + self.set_stopped() self._last_state_check = tnow() return self._state - def replace_paths(self, command): + def execute_command(self, command, + env=None, + sudo=False, + tty=False, + forward_x11=False, + blocking=False): + + environ = "" + if env: + environ = self.node.format_environment(env, inline=True) + command = environ + command + command = self.replace_paths(command) + + return self.node.execute(command, + sudo=sudo, + tty=tty, + forward_x11=forward_x11, + blocking=blocking) + + def replace_paths(self, command, node=None, app_home=None, run_home=None): """ Replace all special path tags with shell-escaped actual paths. """ + if not node: + node=self.node + + if not app_home: + app_home=self.app_home + + if not run_home: + run_home = self.run_home + return ( command - .replace("${USR}", self.node.usr_dir) - .replace("${LIB}", self.node.lib_dir) - .replace("${BIN}", self.node.bin_dir) - .replace("${SRC}", self.node.src_dir) - .replace("${SHARE}", self.node.share_dir) - .replace("${EXP}", self.node.exp_dir) - .replace("${EXP_HOME}", self.node.exp_home) - .replace("${APP_HOME}", self.app_home) - .replace("${RUN_HOME}", self.run_home) - .replace("${NODE_HOME}", self.node.node_home) - .replace("${HOME}", self.node.home_dir) + .replace("${USR}", node.usr_dir) + .replace("${LIB}", node.lib_dir) + .replace("${BIN}", node.bin_dir) + .replace("${SRC}", node.src_dir) + .replace("${SHARE}", node.share_dir) + .replace("${EXP}", node.exp_dir) + .replace("${EXP_HOME}", node.exp_home) + .replace("${APP_HOME}", app_home) + .replace("${RUN_HOME}", run_home) + .replace("${NODE_HOME}", node.node_home) + .replace("${HOME}", node.home_dir) ) def valid_connection(self, guid):