X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fapplication.py;h=ea4b0a4b7792e10682816e83a153f04719942cfe;hb=9ff58aeb6f61e082787e8df4531ae7c5e546e88e;hp=080c3a402ceed83eb1731ca7b8d90c819b6997cf;hpb=62a8f5c90afe9033f532206c811cff8ea76b2c09;p=nepi.git diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index 080c3a40..ea4b0a4b 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -21,16 +21,15 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.trace import Trace, TraceAttr from nepi.execution.resource import ResourceManager, clsinit, ResourceState from nepi.resources.linux.node import LinuxNode -from nepi.util import sshfuncs +from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import strfnow, strfdiff import os - -reschedule_delay = "0.5s" -state_check_delay = 1 +import subprocess # TODO: Resolve wildcards in commands!! -# TODO: If command is not set give a warning but do not generate an error! +# TODO: compare_hash for all files that are uploaded! + @clsinit class LinuxApplication(ResourceManager): @@ -40,7 +39,7 @@ class LinuxApplication(ResourceManager): def _register_attributes(cls): command = Attribute("command", "Command to execute", flags = Flags.ExecReadOnly) - forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", + forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", flags = Flags.ExecReadOnly) env = Attribute("env", "Environment variables string for command execution", flags = Flags.ExecReadOnly) @@ -105,17 +104,20 @@ class LinuxApplication(ResourceManager): def _register_traces(cls): stdout = Trace("stdout", "Standard output stream") stderr = Trace("stderr", "Standard error stream") - buildlog = Trace("buildlog", "Output of the build process") cls._register_trace(stdout) cls._register_trace(stderr) - cls._register_trace(buildlog) def __init__(self, ec, guid): super(LinuxApplication, self).__init__(ec, guid) self._pid = None self._ppid = None self._home = "app-%s" % self.guid + self._in_foreground = False + + # keep a reference to the running process handler when + # the command is not executed as remote daemon in background + self._proc = None # timestamp of last state check of the application self._last_state_check = strfnow() @@ -150,6 +152,19 @@ class LinuxApplication(ResourceManager): def ppid(self): return self._ppid + @property + def in_foreground(self): + """ Returns True if the command needs to be executed in foreground. + This means that command will be executed using 'execute' instead of + 'run' ('run' executes a command in background and detached from the + terminal) + + When using X11 forwarding option, the command can not run in background + and detached from a terminal, since we need to keep the terminal attached + to interact with it. + """ + return self.get("forwardX11") or self._in_foreground + def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0): self.info("Retrieving '%s' trace %s " % (name, attr)) @@ -215,32 +230,32 @@ class LinuxApplication(ResourceManager): # Install self.install() + # Upload command to remote bash script + # - only if command can be executed in background and detached command = self.get("command") - x11 = self.get("forwardX11") - if not x11 and command: - self.info("Uploading command '%s'" % command) - - # Export environment - environ = "" - if self.get("env"): - for var in self.get("env").split(" "): - environ += 'export %s\n' % var - command = environ + command + if command and not self.in_foreground: + self.info("Uploading command '%s'" % command) - # If the command runs asynchronous, pre upload the command - # to the app.sh file in the remote host - dst = os.path.join(self.app_home, "app.sh") + # replace application specific paths in the command command = self.replace_paths(command) - self.node.upload(command, dst, text = True) + + # replace application specific paths in the environment + env = self.get("env") + env = env and self.replace_paths(env) + + self.node.upload_command(command, self.app_home, + shfile = "app.sh", + env = env) + + self.info("Provisioning finished") super(LinuxApplication, self).provision() def upload_sources(self): - # TODO: check if sources need to be uploaded and upload them sources = self.get("sources") if sources: - self.info(" Uploading sources ") + self.info("Uploading sources ") # create dir for sources self.node.mkdir(self.src_dir) @@ -253,25 +268,34 @@ class LinuxApplication(ResourceManager): http_sources.append(source) sources.remove(source) - # Download http sources + # Download http sources remotely if http_sources: - cmd = " wget -c --directory-prefix=${SOURCES} " - verif = "" + command = [" wget -c --directory-prefix=${SOURCES} "] + check = [] for source in http_sources: - cmd += " %s " % (source) - verif += " ls ${SOURCES}/%s ;" % os.path.basename(source) + command.append(" %s " % (source)) + check.append(" ls ${SOURCES}/%s " % os.path.basename(source)) - # Wget output goes to stderr :S - cmd += " 2> /dev/null ; " + command = " ".join(command) + check = " ; ".join(check) - # Add verification - cmd += " %s " % verif + # Append the command to check that the sources were downloaded + command += " ; %s " % check + + # replace application specific paths in the command + command = self.replace_paths(command) + + # Upload the command to a bash script and run it + # in background ( but wait until the command has + # finished to continue ) + self.node.run_and_wait(command, self.app_home, + shfile = "http_sources.sh", + pidfile = "http_sources_pidfile", + ecodefile = "http_sources_exitcode", + stdout = "http_sources_stdout", + stderr = "http_sources_stderr") - # Upload the command to a file, and execute asynchronously - self.upload_and_run(cmd, - "http_sources.sh", "http_sources_pid", - "http_sources_out", "http_sources_err") if sources: self.node.upload(sources, self.src_dir) @@ -281,7 +305,7 @@ class LinuxApplication(ResourceManager): # create dir for sources self.node.mkdir(self.src_dir) - self.info(" Uploading code ") + self.info("Uploading code ") dst = os.path.join(self.src_dir, "code") self.node.upload(sources, dst, text = True) @@ -291,44 +315,68 @@ class LinuxApplication(ResourceManager): if stdin: # create dir for sources self.info(" Uploading stdin ") - + dst = os.path.join(self.app_home, "stdin") + + # If what we are uploading is a file, check whether + # the same file already exists (using md5sum) + if self.compare_hash(stdin, dst): + return + self.node.upload(stdin, dst, text = True) def install_dependencies(self): depends = self.get("depends") if depends: - self.info(" Installing dependencies %s" % depends) - self.node.install_packages(depends, home = self.app_home) + self.info("Installing dependencies %s" % depends) + self.node.install_packages(depends, self.app_home) def build(self): build = self.get("build") if build: - self.info(" Building sources ") + self.info("Building sources ") # create dir for build self.node.mkdir(self.build_dir) - # Upload the command to a file, and execute asynchronously - self.upload_and_run(build, - "build.sh", "build_pid", - "build_out", "build_err") + # replace application specific paths in the command + command = self.replace_paths(build) + + # Upload the command to a bash script and run it + # in background ( but wait until the command has + # finished to continue ) + self.node.run_and_wait(command, self.app_home, + shfile = "build.sh", + pidfile = "build_pidfile", + ecodefile = "build_exitcode", + stdout = "build_stdout", + stderr = "build_stderr") def install(self): install = self.get("install") if install: - self.info(" Installing sources ") + self.info("Installing sources ") + + # replace application specific paths in the command + command = self.replace_paths(install) - # Upload the command to a file, and execute asynchronously - self.upload_and_run(install, - "install.sh", "install_pid", - "install_out", "install_err") + # Upload the command to a bash script and run it + # in background ( but wait until the command has + # finished to continue ) + self.node.run_and_wait(command, self.app_home, + shfile = "install.sh", + pidfile = "install_pidfile", + ecodefile = "install_exitcode", + stdout = "install_stdout", + stderr = "install_stderr") def deploy(self): # Wait until node is associated and deployed node = self.node if not node or node.state < ResourceState.READY: self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state ) + + reschedule_delay = "0.5s" self.ec.schedule(reschedule_delay, self.deploy) else: try: @@ -343,101 +391,129 @@ class LinuxApplication(ResourceManager): super(LinuxApplication, self).deploy() def start(self): - command = self.get('command') - env = self.get('env') - stdin = 'stdin' if self.get('stdin') else None - stdout = 'stdout' if self.get('stdout') else 'stdout' - stderr = 'stderr' if self.get('stderr') else 'stderr' - sudo = self.get('sudo') or False - x11 = self.get('forwardX11') or False - failed = False + command = self.get("command") - super(LinuxApplication, self).start() + self.info("Starting command '%s'" % command) if not command: - self.info("No command to start ") + # If no command was given (i.e. Application was used for dependency + # installation), then the application is directly marked as FINISHED self._state = ResourceState.FINISHED - return - - self.info("Starting command '%s'" % command) + else: - if x11: - if env: - # Export environment - environ = "" - for var in env.split(" "): - environ += ' %s ' % var + if self.in_foreground: + self._start_in_foreground() + else: + self._start_in_background() - command = "(" + environ + " ; " + command + ")" - command = self.replace_paths(command) + super(LinuxApplication, self).start() - # If the command requires X11 forwarding, we - # can't run it asynchronously - (out, err), proc = self.node.execute(command, - sudo = sudo, - stdin = stdin, - forward_x11 = x11) + def _start_in_foreground(self): + command = self.get("command") + stdin = "stdin" if self.get("stdin") else None + sudo = self.get("sudo") or False + x11 = self.get("forwardX11") - self._state = ResourceState.FINISHED + # Command will be launched in foreground and attached to the + # terminal using the node 'execute' in non blocking mode. + + # Export environment + env = self.get("env") + environ = self.node.format_environment(env, inline = True) + command = environ + command + command = self.replace_paths(command) + + # We save the reference to the process in self._proc + # to be able to kill the process from the stop method. + # We also set blocking = False, since we don't want the + # thread to block until the execution finishes. + (out, err), self._proc = self.node.execute(command, + sudo = sudo, + stdin = stdin, + forward_x11 = x11, + blocking = False) + + if self._proc.poll(): + self._state = ResourceState.FAILED + self.error(msg, out, err) + raise RuntimeError, msg - if proc.poll() and err: - failed = True - else: - # Command was previously uploaded, now run the remote - # bash file asynchronously - cmd = "bash ./app.sh" - (out, err), proc = self.node.run(cmd, self.app_home, - stdin = stdin, - stdout = stdout, - stderr = stderr, - sudo = sudo) - - if proc.poll() and err: - failed = True + def _start_in_background(self): + command = self.get("command") + env = self.get("env") + stdin = "stdin" if self.get("stdin") else None + stdout = "stdout" if self.get("stdout") else "stdout" + stderr = "stderr" if self.get("stderr") else "stderr" + sudo = self.get("sudo") or False + + # Command will be as a daemon in baground and detached from any terminal. + # The real command to run was previously uploaded to a bash script + # during deployment, now launch the remote script using 'run' + # method from the node + cmd = "bash ./app.sh" + (out, err), proc = self.node.run(cmd, self.app_home, + stdin = stdin, + stdout = stdout, + stderr = stderr, + sudo = sudo) + + # check if execution errors occurred + msg = " Failed to start command '%s' " % command - if not failed: - pid, ppid = self.node.wait_pid(home = self.app_home) - if pid: self._pid = int(pid) - if ppid: self._ppid = int(ppid) - - if not self.pid or not self.ppid: - failed = True - - (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr') - - if failed or out or chkerr: - # check if execution errors occurred + if proc.poll(): + self._state = ResourceState.FAILED + self.error(msg, out, err) + raise RuntimeError, msg + + # Wait for pid file to be generated + pid, ppid = self.node.wait_pid(self.app_home) + if pid: self._pid = int(pid) + if ppid: self._ppid = int(ppid) + + # If the process is not running, check for error information + # on the remote machine + if not self.pid or not self.ppid: + (out, err), proc = self.node.check_errors(self.app_home, + stderr = stderr) + + # Out is what was written in the stderr file + if err: + self._state = ResourceState.FAILED msg = " Failed to start command '%s' " % command - out = out - if err: - err = err - elif chkerr: - err = chkerr - self.error(msg, out, err) - - msg2 = " Setting state to Failed" - self.debug(msg2) - self._state = ResourceState.FAILED - raise RuntimeError, msg - + def stop(self): + """ Stops application execution + """ command = self.get('command') or '' - state = self.state - - if state == ResourceState.STARTED: - self.info("Stopping command '%s'" % command) - (out, err), proc = self.node.kill(self.pid, self.ppid) + if self.state == ResourceState.STARTED: + stopped = True - if out or err: - # check if execution errors occurred - msg = " Failed to STOP command '%s' " % self.get("command") - self.error(msg, out, err) - self._state = ResourceState.FAILED - stopped = False + self.info("Stopping command '%s'" % command) + + # If the command is running in foreground (it was launched using + # the node 'execute' method), then we use the handler to the Popen + # process to kill it. Else we send a kill signal using the pid and ppid + # retrieved after running the command with the node 'run' method + + if self._proc: + self._proc.kill() else: + # Only try to kill the process if the pid and ppid + # were retrieved + if self.pid and self.ppid: + (out, err), proc = self.node.kill(self.pid, self.ppid) + + if out or err: + # check if execution errors occurred + msg = " Failed to STOP command '%s' " % self.get("command") + self.error(msg, out, err) + self._state = ResourceState.FAILED + stopped = False + + if stopped: super(LinuxApplication, self).stop() def release(self): @@ -448,52 +524,59 @@ class LinuxApplication(ResourceManager): self.node.execute(tear_down) self.stop() + if self.state == ResourceState.STOPPED: super(LinuxApplication, self).release() @property def state(self): + """ Returns the state of the application + """ if self._state == ResourceState.STARTED: - # To avoid overwhelming the remote hosts and the local processor - # with too many ssh queries, the state is only requested - # every 'state_check_delay' . - if strfdiff(strfnow(), self._last_state_check) > state_check_delay: - # check if execution errors occurred - (out, err), proc = self.node.check_output(self.app_home, 'stderr') - - if out or err: - if err.find("No such file or directory") >= 0 : - # The resource is marked as started, but the - # command was not yet executed - return ResourceState.READY - + if self.in_foreground: + # Check if the process we used to execute the command + # is still running ... + retcode = self._proc.poll() + + # retcode == None -> running + # retcode > 0 -> error + # retcode == 0 -> finished + if retcode: + out = "" msg = " Failed to execute command '%s'" % self.get("command") + err = self._proc.stderr.read() self.error(msg, out, err) self._state = ResourceState.FAILED + elif retcode == 0: + self._state = ResourceState.FINISHED - elif self.pid and self.ppid: - status = self.node.status(self.pid, self.ppid) - - if status == sshfuncs.FINISHED: - self._state = ResourceState.FINISHED - - - self._last_state_check = strfnow() + else: + # We need to query the status of the command we launched in + # background. In oredr to avoid overwhelming the remote host and + # the local processor with too many ssh queries, the state is only + # requested every 'state_check_delay' seconds. + state_check_delay = 0.5 + if strfdiff(strfnow(), self._last_state_check) > state_check_delay: + # check if execution errors occurred + (out, err), proc = self.node.check_errors(self.app_home) + + if err: + msg = " Failed to execute command '%s'" % self.get("command") + self.error(msg, out, err) + self._state = ResourceState.FAILED + + elif self.pid and self.ppid: + # No execution errors occurred. Make sure the background + # process with the recorded pid is still running. + status = self.node.status(self.pid, self.ppid) + + if status == ProcStatus.FINISHED: + self._state = ResourceState.FINISHED + + self._last_state_check = strfnow() return self._state - def upload_and_run(self, cmd, fname, pidfile, outfile, errfile): - dst = os.path.join(self.app_home, fname) - cmd = self.replace_paths(cmd) - self.node.upload(cmd, dst, text = True) - - cmd = "bash ./%s" % fname - (out, err), proc = self.node.run_and_wait(cmd, self.app_home, - pidfile = pidfile, - stdout = outfile, - stderr = errfile, - raise_on_error = True) - def replace_paths(self, command): """ Replace all special path tags with shell-escaped actual paths. @@ -508,12 +591,40 @@ class LinuxApplication(ResourceManager): .replace("${NODE_HOME}", absolute_dir(self.node.node_home)) .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) ) ) - + + def compare_hash(self, local, remote): + # getting md5sum from remote file + (out, err), proc = self.node.execute("md5sum %s " % remote) + + if proc.poll() == 0: #OK + if not os.path.isfile(local): + # store to a tmp file + f = tempfile.NamedTemporaryFile() + f.write(local) + f.flush() + local = f.name + + lproc = subprocess.Popen(["md5sum", local], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + + # getting md5sum from local file + (lout, lerr) = lproc.communicate() + + # files are the same, no need to upload + lchk = lout.strip().split(" ")[0] + rchk = out.strip().split(" ")[0] + + msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % ( + local, lchk, remote, rchk) + self.debug(msg) + + if lchk == rchk: + return True + + return False + def valid_connection(self, guid): # TODO: Validate! return True - # XXX: What if it is connected to more than one node? - resources = self.find_resources(exact_tags = [tags.NODE]) - self._node = resources[0] if len(resources) == 1 else None - return self._node