from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
from nepi.resources.linux.node import LinuxNode
from nepi.util.sshfuncs import ProcStatus
from nepi.util.timefuncs import tnow, tdiffsec
import subprocess
# TODO: Resolve wildcards in commands!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
-@clsinit
+@clsinit_copy
class LinuxApplication(ResourceManager):
"""
.. class:: Class Args :
"""
_rtype = "LinuxApplication"
+ _help = "Runs an application on a Linux host with a BASH command "
+ _backend_type = "linux"
@classmethod
def _register_attributes(cls):
@property
def node(self):
- node = self.get_connected(LinuxNode)
+ node = self.get_connected(LinuxNode.rtype())
if node: return node[0]
return None
out = int(out.strip())
return out
-
- def provision(self):
+
+ def do_provision(self):
# create run dir for application
self.node.mkdir(self.run_home)
# Since provisioning takes a long time, before
# each step we check that the EC is still
for step in steps:
- if self.ec.finished:
- raise RuntimeError, "EC finished"
+ if self.ec.abort:
+ self.debug("Interrupting provisioning. EC says 'ABORT")
+ return
ret = step()
if ret:
self.info("Provisioning finished")
- super(LinuxApplication, self).provision()
+ super(LinuxApplication, self).do_provision()
def upload_start_command(self):
# Upload command to remote bash script
self.node.upload_command(command,
shfile = shfile,
- env = env)
+ env = env,
+ overwrite = False)
def execute_deploy_command(self, command):
if command:
self.node.upload(stdin, dst, overwrite = False, text = True)
# create "stdin" symlink on ${APP_HOME} directory
- command = "( cd %s ; ln -s %s stdin )" % ( self.app_home, dst)
+ command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
+ "app_home": self.app_home,
+ "stdin": dst })
return command
depends = self.get("depends")
if depends:
self.info("Installing dependencies %s" % depends)
- self.node.install_packages(depends, self.app_home, self.run_home)
+ return self.node.install_packages_command(depends)
def build(self):
build = self.get("build")
# replace application specific paths in the command
return self.replace_paths(install)
- def deploy(self):
+ def do_deploy(self):
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
self.ec.schedule(reschedule_delay, self.deploy)
else:
- try:
- command = self.get("command") or ""
- self.info("Deploying command '%s' " % command)
- self.discover()
- self.provision()
- except:
- self.fail()
- raise
-
- super(LinuxApplication, self).deploy()
-
- def start(self):
+ command = self.get("command") or ""
+ self.info("Deploying command '%s' " % command)
+ self.do_discover()
+ self.do_provision()
+
+ super(LinuxApplication, self).do_deploy()
+
+ def do_start(self):
command = self.get("command")
self.info("Starting command '%s'" % command)
if not command:
# If no command was given (i.e. Application was used for dependency
# installation), then the application is directly marked as FINISHED
- self._state = ResourceState.FINISHED
+ super(LinuxApplication, self).do_finish()
else:
-
if self.in_foreground:
self._run_in_foreground()
else:
self._run_in_background()
- super(LinuxApplication, self).start()
+ super(LinuxApplication, self).do_start()
def _run_in_foreground(self):
command = self.get("command")
sudo = self.get("sudo") or False
x11 = self.get("forwardX11")
+ env = self.get("env")
# For a command being executed in foreground, if there is stdin,
# it is expected to be text string not a file or pipe
# to be able to kill the process from the stop method.
# We also set blocking = False, since we don't want the
# thread to block until the execution finishes.
- (out, err), self._proc = self.execute_command(self, command,
+ (out, err), self._proc = self.execute_command(command,
env = env,
sudo = sudo,
stdin = stdin,
blocking = False)
if self._proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
msg = " Failed to start command '%s' " % command
if proc.poll():
- self.fail()
self.error(msg, out, err)
raise RuntimeError, msg
# Out is what was written in the stderr file
if err:
- self.fail()
msg = " Failed to start command '%s' " % command
self.error(msg, out, err)
raise RuntimeError, msg
-
- def stop(self):
+
+ def do_stop(self):
""" Stops application execution
"""
command = self.get('command') or ''
if self.state == ResourceState.STARTED:
- stopped = True
-
- self.info("Stopping command '%s'" % command)
+ self.info("Stopping command '%s' " % command)
# If the command is running in foreground (it was launched using
# the node 'execute' method), then we use the handler to the Popen
# process to kill it. Else we send a kill signal using the pid and ppid
# retrieved after running the command with the node 'run' method
-
if self._proc:
self._proc.kill()
else:
# Only try to kill the process if the pid and ppid
# were retrieved
if self.pid and self.ppid:
- (out, err), proc = self.node.kill(self.pid, self.ppid, sudo =
- self._sudo_kill)
+ (out, err), proc = self.node.kill(self.pid, self.ppid,
+ sudo = self._sudo_kill)
- if out or err:
- # check if execution errors occurred
+ # TODO: check if execution errors occurred
+ if proc.poll() or err:
msg = " Failed to STOP command '%s' " % self.get("command")
self.error(msg, out, err)
- self.fail()
- stopped = False
-
- if stopped:
- super(LinuxApplication, self).stop()
+
+ super(LinuxApplication, self).do_stop()
- def release(self):
+ def do_release(self):
self.info("Releasing resource")
tear_down = self.get("tearDown")
if tear_down:
self.node.execute(tear_down)
- self.stop()
+ self.do_stop()
- if self.state == ResourceState.STOPPED:
- super(LinuxApplication, self).release()
-
+ super(LinuxApplication, self).do_release()
+
@property
def state(self):
""" Returns the state of the application
msg = " Failed to execute command '%s'" % self.get("command")
err = self._proc.stderr.read()
self.error(msg, out, err)
- self.fail()
- elif retcode == 0:
- self._state = ResourceState.FINISHED
+ self.do_fail()
+ elif retcode == 0:
+ self.do_finish()
else:
# We need to query the status of the command we launched in
- # background. In oredr to avoid overwhelming the remote host and
+ # background. In order to avoid overwhelming the remote host and
# the local processor with too many ssh queries, the state is only
# requested every 'state_check_delay' seconds.
state_check_delay = 0.5
if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
- # check if execution errors occurred
- (out, err), proc = self.node.check_errors(self.run_home)
-
- if err:
- msg = " Failed to execute command '%s'" % self.get("command")
- self.error(msg, out, err)
- self.fail()
-
- elif self.pid and self.ppid:
- # No execution errors occurred. Make sure the background
- # process with the recorded pid is still running.
+ if self.pid and self.ppid:
+ # Make sure the process is still running in background
status = self.node.status(self.pid, self.ppid)
if status == ProcStatus.FINISHED:
- self._state = ResourceState.FINISHED
+ # If the program finished, check if execution
+ # errors occurred
+ (out, err), proc = self.node.check_errors(
+ self.run_home)
+
+ if err:
+ msg = "Failed to execute command '%s'" % \
+ self.get("command")
+ self.error(msg, out, err)
+ self.do_fail()
+ else:
+ self.do_finish()
self._last_state_check = tnow()