from nepi.util.timefuncs import strfnow, strfdiff
import os
+import subprocess
# TODO: Resolve wildcards in commands!!
+# TODO: compare_hash for all files that are uploaded!
@clsinit
def _register_attributes(cls):
command = Attribute("command", "Command to execute",
flags = Flags.ExecReadOnly)
- forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
+ forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
flags = Flags.ExecReadOnly)
env = Attribute("env", "Environment variables string for command execution",
flags = Flags.ExecReadOnly)
def _register_traces(cls):
stdout = Trace("stdout", "Standard output stream")
stderr = Trace("stderr", "Standard error stream")
- buildlog = Trace("buildlog", "Output of the build process")
cls._register_trace(stdout)
cls._register_trace(stderr)
- cls._register_trace(buildlog)
def __init__(self, ec, guid):
super(LinuxApplication, self).__init__(ec, guid)
self._pid = None
self._ppid = None
self._home = "app-%s" % self.guid
+ self._in_foreground = False
+
+ # keep a reference to the running process handler when
+ # the command is not executed as remote daemon in background
+ self._proc = None
# timestamp of last state check of the application
self._last_state_check = strfnow()
def ppid(self):
return self._ppid
+ @property
+ def in_foreground(self):
+ """ Returns True if the command needs to be executed in foreground.
+ This means that command will be executed using 'execute' instead of
+ 'run' ('run' executes a command in background and detached from the
+ terminal)
+
+ When using X11 forwarding option, the command can not run in background
+ and detached from a terminal, since we need to keep the terminal attached
+ to interact with it.
+ """
+ return self.get("forwardX11") or self._in_foreground
+
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
self.info("Retrieving '%s' trace %s " % (name, attr))
# Install
self.install()
- # Upload command
+ # Upload command to remote bash script
+ # - only if command can be executed in background and detached
command = self.get("command")
- x11 = self.get("forwardX11")
- env = self.get("env")
-
- if command and not x11:
+
+ if command and not self.in_foreground:
self.info("Uploading command '%s'" % command)
# replace application specific paths in the command
command = self.replace_paths(command)
+
+ # replace application specific paths in the environment
+ env = self.get("env")
+ env = env and self.replace_paths(env)
self.node.upload_command(command, self.app_home,
shfile = "app.sh",
env = env)
+ self.info("Provisioning finished")
+
super(LinuxApplication, self).provision()
def upload_sources(self):
- # TODO: check if sources need to be uploaded and upload them
sources = self.get("sources")
if sources:
- self.info(" Uploading sources ")
+ self.info("Uploading sources ")
# create dir for sources
self.node.mkdir(self.src_dir)
# Download http sources remotely
if http_sources:
- command = " wget -c --directory-prefix=${SOURCES} "
- check = ""
+ command = [" wget -c --directory-prefix=${SOURCES} "]
+ check = []
for source in http_sources:
- command += " %s " % (source)
- check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+ command.append(" %s " % (source))
+ check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
+ command = " ".join(command)
+ check = " ; ".join(check)
+
# Append the command to check that the sources were downloaded
command += " ; %s " % check
# replace application specific paths in the command
command = self.replace_paths(command)
- # Upload the command to a file, and execute asynchronously
+ # Upload the command to a bash script and run it
+ # in background ( but wait until the command has
+ # finished to continue )
self.node.run_and_wait(command, self.app_home,
shfile = "http_sources.sh",
pidfile = "http_sources_pidfile",
# create dir for sources
self.node.mkdir(self.src_dir)
- self.info(" Uploading code ")
+ self.info("Uploading code ")
dst = os.path.join(self.src_dir, "code")
self.node.upload(sources, dst, text = True)
if stdin:
# create dir for sources
self.info(" Uploading stdin ")
-
+
dst = os.path.join(self.app_home, "stdin")
+
+ # If what we are uploading is a file, check whether
+ # the same file already exists (using md5sum)
+ if self.compare_hash(stdin, dst):
+ return
+
self.node.upload(stdin, dst, text = True)
def install_dependencies(self):
depends = self.get("depends")
if depends:
- self.info(" Installing dependencies %s" % depends)
+ self.info("Installing dependencies %s" % depends)
self.node.install_packages(depends, self.app_home)
def build(self):
build = self.get("build")
if build:
- self.info(" Building sources ")
+ self.info("Building sources ")
# create dir for build
self.node.mkdir(self.build_dir)
# replace application specific paths in the command
- command = self.replace_paths(command)
+ command = self.replace_paths(build)
- # Upload the command to a file, and execute asynchronously
+ # Upload the command to a bash script and run it
+ # in background ( but wait until the command has
+ # finished to continue )
self.node.run_and_wait(command, self.app_home,
shfile = "build.sh",
pidfile = "build_pidfile",
def install(self):
install = self.get("install")
if install:
- self.info(" Installing sources ")
+ self.info("Installing sources ")
# replace application specific paths in the command
- command = self.replace_paths(command)
+ command = self.replace_paths(install)
- # Upload the command to a file, and execute asynchronously
+ # Upload the command to a bash script and run it
+ # in background ( but wait until the command has
+ # finished to continue )
self.node.run_and_wait(command, self.app_home,
shfile = "install.sh",
pidfile = "install_pidfile",
super(LinuxApplication, self).deploy()
def start(self):
- command = self.get('command')
- env = self.get('env')
- stdin = 'stdin' if self.get('stdin') else None
- stdout = 'stdout' if self.get('stdout') else 'stdout'
- stderr = 'stderr' if self.get('stderr') else 'stderr'
- sudo = self.get('sudo') or False
- x11 = self.get('forwardX11') or False
- failed = False
+ command = self.get("command")
+
+ self.info("Starting command '%s'" % command)
if not command:
- # If no command was given, then the application
- # is directly marked as FINISHED
+ # If no command was given (i.e. Application was used for dependency
+ # installation), then the application is directly marked as FINISHED
self._state = ResourceState.FINISHED
else:
- super(LinuxApplication, self).start()
-
- self.info("Starting command '%s'" % command)
- if x11:
- # If X11 forwarding was specified, then the application
- # can not run detached, so instead of invoking asynchronous
- # 'run' we invoke synchronous 'execute'.
- if not command:
- msg = "No command is defined but X11 forwarding has been set"
- self.error(msg)
- self._state = ResourceState.FAILED
- raise RuntimeError, msg
+ if self.in_foreground:
+ self._start_in_foreground()
+ else:
+ self._start_in_background()
- if env:
- # Export environment
- environ = ""
- for var in env.split(" "):
- environ += ' %s ' % var
+ super(LinuxApplication, self).start()
- command = "(" + environ + " ; " + command + ")"
- command = self.replace_paths(command)
+ def _start_in_foreground(self):
+ command = self.get("command")
+ stdin = "stdin" if self.get("stdin") else None
+ sudo = self.get("sudo") or False
+ x11 = self.get("forwardX11")
- # If the command requires X11 forwarding, we
- # can't run it asynchronously
- (out, err), proc = self.node.execute(command,
- sudo = sudo,
- stdin = stdin,
- forward_x11 = x11)
+ # Command will be launched in foreground and attached to the
+ # terminal using the node 'execute' in non blocking mode.
- self._state = ResourceState.FINISHED
+ # Export environment
+ env = self.get("env")
+ environ = self.node.format_environment(env, inline = True)
+ command = environ + command
+ command = self.replace_paths(command)
+
+ # We save the reference to the process in self._proc
+ # to be able to kill the process from the stop method.
+ # We also set blocking = False, since we don't want the
+ # thread to block until the execution finishes.
+ (out, err), self._proc = self.node.execute(command,
+ sudo = sudo,
+ stdin = stdin,
+ forward_x11 = x11,
+ blocking = False)
+
+ if self._proc.poll():
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
- if proc.poll() and err:
- failed = True
- else:
- # Command was previously uploaded, now run the remote
- # bash file asynchronously
- cmd = "bash ./app.sh"
- (out, err), proc = self.node.run(cmd, self.app_home,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- sudo = sudo)
-
- # check if execution errors occurred
- msg = " Failed to start command '%s' " % command
-
- if proc.poll() and err:
- self.error(msg, out, err)
- raise RuntimeError, msg
+ def _start_in_background(self):
+ command = self.get("command")
+ env = self.get("env")
+ stdin = "stdin" if self.get("stdin") else None
+ stdout = "stdout" if self.get("stdout") else "stdout"
+ stderr = "stderr" if self.get("stderr") else "stderr"
+ sudo = self.get("sudo") or False
+
+ # Command will be as a daemon in baground and detached from any terminal.
+ # The real command to run was previously uploaded to a bash script
+ # during deployment, now launch the remote script using 'run'
+ # method from the node
+ cmd = "bash ./app.sh"
+ (out, err), proc = self.node.run(cmd, self.app_home,
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ sudo = sudo)
+
+ # check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
- # Check status of process running in background
- pid, ppid = self.node.wait_pid(self.app_home)
- if pid: self._pid = int(pid)
- if ppid: self._ppid = int(ppid)
-
- # If the process is not running, check for error information
- # on the remote machine
- if not self.pid or not self.ppid:
- (out, err), proc = self.node.check_output(self.app_home, 'stderr')
- self.error(msg, out, err)
-
- msg2 = " Setting state to Failed"
- self.debug(msg2)
+ if proc.poll():
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ # Wait for pid file to be generated
+ pid, ppid = self.node.wait_pid(self.app_home)
+ if pid: self._pid = int(pid)
+ if ppid: self._ppid = int(ppid)
+
+ # If the process is not running, check for error information
+ # on the remote machine
+ if not self.pid or not self.ppid:
+ (out, err), proc = self.node.check_errors(self.app_home,
+ stderr = stderr)
+
+ # Out is what was written in the stderr file
+ if err:
self._state = ResourceState.FAILED
-
+ msg = " Failed to start command '%s' " % command
+ self.error(msg, out, err)
raise RuntimeError, msg
-
+
def stop(self):
+ """ Stops application execution
+ """
command = self.get('command') or ''
- state = self.state
-
- if state == ResourceState.STARTED:
- self.info("Stopping command '%s'" % command)
- (out, err), proc = self.node.kill(self.pid, self.ppid)
+ if self.state == ResourceState.STARTED:
+ stopped = True
- if out or err:
- # check if execution errors occurred
- msg = " Failed to STOP command '%s' " % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
- stopped = False
+ self.info("Stopping command '%s'" % command)
+
+ # If the command is running in foreground (it was launched using
+ # the node 'execute' method), then we use the handler to the Popen
+ # process to kill it. Else we send a kill signal using the pid and ppid
+ # retrieved after running the command with the node 'run' method
+
+ if self._proc:
+ self._proc.kill()
else:
+ # Only try to kill the process if the pid and ppid
+ # were retrieved
+ if self.pid and self.ppid:
+ (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+ if out or err:
+ # check if execution errors occurred
+ msg = " Failed to STOP command '%s' " % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+ stopped = False
+
+ if stopped:
super(LinuxApplication, self).stop()
def release(self):
self.node.execute(tear_down)
self.stop()
+
if self.state == ResourceState.STOPPED:
super(LinuxApplication, self).release()
@property
def state(self):
+ """ Returns the state of the application
+ """
if self._state == ResourceState.STARTED:
- # To avoid overwhelming the remote hosts and the local processor
- # with too many ssh queries, the state is only requested
- # every 'state_check_delay' seconds.
- state_check_delay = 0.5
- if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
- # check if execution errors occurred
- (out, err), proc = self.node.check_errors(self.app_home)
-
- if out or err:
- if err.find("No such file or directory") >= 0 :
- # The resource is marked as started, but the
- # command was not yet executed
- return ResourceState.READY
-
+ if self.in_foreground:
+ # Check if the process we used to execute the command
+ # is still running ...
+ retcode = self._proc.poll()
+
+ # retcode == None -> running
+ # retcode > 0 -> error
+ # retcode == 0 -> finished
+ if retcode:
+ out = ""
msg = " Failed to execute command '%s'" % self.get("command")
+ err = self._proc.stderr.read()
self.error(msg, out, err)
self._state = ResourceState.FAILED
+ elif retcode == 0:
+ self._state = ResourceState.FINISHED
- elif self.pid and self.ppid:
- status = self.node.status(self.pid, self.ppid)
-
- if status == ProcStatus.FINISHED:
- self._state = ResourceState.FINISHED
-
-
- self._last_state_check = strfnow()
+ else:
+ # We need to query the status of the command we launched in
+ # background. In oredr to avoid overwhelming the remote host and
+ # the local processor with too many ssh queries, the state is only
+ # requested every 'state_check_delay' seconds.
+ state_check_delay = 0.5
+ if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ # check if execution errors occurred
+ (out, err), proc = self.node.check_errors(self.app_home)
+
+ if err:
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+
+ elif self.pid and self.ppid:
+ # No execution errors occurred. Make sure the background
+ # process with the recorded pid is still running.
+ status = self.node.status(self.pid, self.ppid)
+
+ if status == ProcStatus.FINISHED:
+ self._state = ResourceState.FINISHED
+
+ self._last_state_check = strfnow()
return self._state
.replace("${NODE_HOME}", absolute_dir(self.node.node_home))
.replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
)
-
+
+ def compare_hash(self, local, remote):
+ # getting md5sum from remote file
+ (out, err), proc = self.node.execute("md5sum %s " % remote)
+
+ if proc.poll() == 0: #OK
+ if not os.path.isfile(local):
+ # store to a tmp file
+ f = tempfile.NamedTemporaryFile()
+ f.write(local)
+ f.flush()
+ local = f.name
+
+ lproc = subprocess.Popen(["md5sum", local],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ # getting md5sum from local file
+ (lout, lerr) = lproc.communicate()
+
+ # files are the same, no need to upload
+ lchk = lout.strip().split(" ")[0]
+ rchk = out.strip().split(" ")[0]
+
+ msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
+ local, lchk, remote, rchk)
+ self.debug(msg)
+
+ if lchk == rchk:
+ return True
+
+ return False
+
def valid_connection(self, guid):
# TODO: Validate!
return True
- # XXX: What if it is connected to more than one node?
- resources = self.find_resources(exact_tags = [tags.NODE])
- self._node = resources[0] if len(resources) == 1 else None
- return self._node