from nepi.util.timefuncs import strfnow, strfdiff
import os
+import subprocess
# TODO: Resolve wildcards in commands!!
+# TODO: compare_hash for all files that are uploaded!
@clsinit
def _register_attributes(cls):
command = Attribute("command", "Command to execute",
flags = Flags.ExecReadOnly)
- forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections",
+ forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
flags = Flags.ExecReadOnly)
env = Attribute("env", "Environment variables string for command execution",
flags = Flags.ExecReadOnly)
self._pid = None
self._ppid = None
self._home = "app-%s" % self.guid
+ self._in_foreground = False
# keep a reference to the running process handler when
# the command is not executed as remote daemon in background
This means that command will be executed using 'execute' instead of
'run' ('run' executes a command in background and detached from the
terminal)
-
+
When using X11 forwarding option, the command can not run in background
and detached from a terminal, since we need to keep the terminal attached
to interact with it.
"""
- return self.get("forwardX11") or False
+ return self.get("forwardX11") or self._in_foreground
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
self.info("Retrieving '%s' trace %s " % (name, attr))
if stdin:
# create dir for sources
self.info(" Uploading stdin ")
-
+
dst = os.path.join(self.app_home, "stdin")
+
+ # If what we are uploading is a file, check whether
+ # the same file already exists (using md5sum)
+ if self.compare_hash(stdin, dst):
+ return
+
self.node.upload(stdin, dst, text = True)
def install_dependencies(self):
def start(self):
command = self.get("command")
- env = self.get("env")
- stdin = "stdin" if self.get("stdin") else None
- stdout = "stdout" if self.get("stdout") else "stdout"
- stderr = "stderr" if self.get("stderr") else "stderr"
- sudo = self.get("sudo") or False
- failed = False
self.info("Starting command '%s'" % command)
- if self.in_foreground:
- # If command should run in foreground, we invoke 'execute' method
- # of the node
- if not command:
- msg = "No command is defined but X11 forwarding has been set"
- self.error(msg)
- self._state = ResourceState.FAILED
- raise RuntimeError, msg
+ if not command:
+ # If no command was given (i.e. Application was used for dependency
+ # installation), then the application is directly marked as FINISHED
+ self._state = ResourceState.FINISHED
+ else:
- # Export environment
- environ = self.node.format_environment(env, inline = True)
+ if self.in_foreground:
+ self._start_in_foreground()
+ else:
+ self._start_in_background()
- command = environ + command
- command = self.replace_paths(command)
-
- x11 = self.get("forwardX11")
-
- # We save the reference to the process in self._proc
- # to be able to kill the process from the stop method.
- # We also set blocking = False, since we don't want the
- # thread to block until the execution finishes.
- (out, err), self._proc = self.node.execute(command,
- sudo = sudo,
- stdin = stdin,
- forward_x11 = x11,
- blocking = False)
-
- if self._proc.poll():
- out = ""
- err = self._proc.stderr.read()
- self._state = ResourceState.FAILED
- self.error(msg, out, err)
- raise RuntimeError, msg
-
super(LinuxApplication, self).start()
- elif command:
- # If command is set (i.e. application is not used only for dependency
- # installation), and it does not need to run in foreground, then we
- # invoke the 'run' method of the node to launch the application as a
- # daemon in background
-
- # The real command to execute was previously uploaded to a remote bash
- # script during deployment, now launch the remote script using 'run'
- # method from the node
- cmd = "bash ./app.sh"
- (out, err), proc = self.node.run(cmd, self.app_home,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- sudo = sudo)
-
- # check if execution errors occurred
- msg = " Failed to start command '%s' " % command
-
- if proc.poll():
+ def _start_in_foreground(self):
+ command = self.get("command")
+ stdin = "stdin" if self.get("stdin") else None
+ sudo = self.get("sudo") or False
+ x11 = self.get("forwardX11")
+
+ # Command will be launched in foreground and attached to the
+ # terminal using the node 'execute' in non blocking mode.
+
+ # Export environment
+ env = self.get("env")
+ environ = self.node.format_environment(env, inline = True)
+ command = environ + command
+ command = self.replace_paths(command)
+
+ # We save the reference to the process in self._proc
+ # to be able to kill the process from the stop method.
+ # We also set blocking = False, since we don't want the
+ # thread to block until the execution finishes.
+ (out, err), self._proc = self.node.execute(command,
+ sudo = sudo,
+ stdin = stdin,
+ forward_x11 = x11,
+ blocking = False)
+
+ if self._proc.poll():
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ def _start_in_background(self):
+ command = self.get("command")
+ env = self.get("env")
+ stdin = "stdin" if self.get("stdin") else None
+ stdout = "stdout" if self.get("stdout") else "stdout"
+ stderr = "stderr" if self.get("stderr") else "stderr"
+ sudo = self.get("sudo") or False
+
+ # Command will be as a daemon in baground and detached from any terminal.
+ # The real command to run was previously uploaded to a bash script
+ # during deployment, now launch the remote script using 'run'
+ # method from the node
+ cmd = "bash ./app.sh"
+ (out, err), proc = self.node.run(cmd, self.app_home,
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ sudo = sudo)
+
+ # check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+
+ if proc.poll():
+ self._state = ResourceState.FAILED
+ self.error(msg, out, err)
+ raise RuntimeError, msg
+
+ # Wait for pid file to be generated
+ pid, ppid = self.node.wait_pid(self.app_home)
+ if pid: self._pid = int(pid)
+ if ppid: self._ppid = int(ppid)
+
+ # If the process is not running, check for error information
+ # on the remote machine
+ if not self.pid or not self.ppid:
+ (out, err), proc = self.node.check_errors(self.app_home,
+ stderr = stderr)
+
+ # Out is what was written in the stderr file
+ if err:
+ self._state = ResourceState.FAILED
+ msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
- # Wait for pid file to be generated
- pid, ppid = self.node.wait_pid(self.app_home)
- if pid: self._pid = int(pid)
- if ppid: self._ppid = int(ppid)
-
- # If the process is not running, check for error information
- # on the remote machine
- if not self.pid or not self.ppid:
- (out, err), proc = self.check_errors(home, ecodefile, stderr)
-
- # Out is what was written in the stderr file
- if err:
- msg = " Failed to start command '%s' " % command
- self.error(msg, out, err)
- raise RuntimeError, msg
-
- super(LinuxApplication, self).start()
-
- else:
- # If no command was given (i.e. Application was used for dependency
- # installation), then the application is directly marked as FINISHED
- self._state = ResourceState.FINISHED
-
def stop(self):
""" Stops application execution
"""
command = self.get('command') or ''
- state = self.state
- if state == ResourceState.STARTED:
+ if self.state == ResourceState.STARTED:
stopped = True
self.info("Stopping command '%s'" % command)
if self._proc:
self._proc.kill()
else:
- (out, err), proc = self.node.kill(self.pid, self.ppid)
-
- if out or err:
- # check if execution errors occurred
- msg = " Failed to STOP command '%s' " % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
- stopped = False
+ # Only try to kill the process if the pid and ppid
+ # were retrieved
+ if self.pid and self.ppid:
+ (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+ if out or err:
+ # check if execution errors occurred
+ msg = " Failed to STOP command '%s' " % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+ stopped = False
if stopped:
super(LinuxApplication, self).stop()
self.node.execute(tear_down)
self.stop()
+
if self.state == ResourceState.STOPPED:
super(LinuxApplication, self).release()
# Check if the process we used to execute the command
# is still running ...
retcode = self._proc.poll()
-
+
# retcode == None -> running
# retcode > 0 -> error
# retcode == 0 -> finished
.replace("${NODE_HOME}", absolute_dir(self.node.node_home))
.replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
)
-
+
+ def compare_hash(self, local, remote):
+ # getting md5sum from remote file
+ (out, err), proc = self.node.execute("md5sum %s " % remote)
+
+ if proc.poll() == 0: #OK
+ if not os.path.isfile(local):
+ # store to a tmp file
+ f = tempfile.NamedTemporaryFile()
+ f.write(local)
+ f.flush()
+ local = f.name
+
+ lproc = subprocess.Popen(["md5sum", local],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ # getting md5sum from local file
+ (lout, lerr) = lproc.communicate()
+
+ # files are the same, no need to upload
+ lchk = lout.strip().split(" ")[0]
+ rchk = out.strip().split(" ")[0]
+
+ msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
+ local, lchk, remote, rchk)
+ self.debug(msg)
+
+ if lchk == rchk:
+ return True
+
+ return False
+
def valid_connection(self, guid):
# TODO: Validate!
return True