from neco.execution.resource import ResourceManager, clsinit, ResourceState
from neco.resources.linux.node import LinuxNode
from neco.util import sshfuncs
+from neco.util.timefuncs import strfnow, strfdiff
import logging
import os
reschedule_delay = "0.5s"
+state_check_delay = 1
# TODO: Resolve wildcards in commands!!
self._ppid = None
self._home = "app-%s" % self.guid
+ # timestamp of last state check of the application
+ self._last_state_check = strfnow()
+
self._logger = logging.getLogger("LinuxApplication")
def log_message(self, msg):
# upload code
self.upload_code()
+ # upload stdin
+ self.upload_stdin()
+
# install dependencies
self.install_dependencies()
x11 = self.get("forwardX11")
if not x11 and command:
self.info("Uploading command '%s'" % command)
-
- # TODO: missing set PATH and PYTHONPATH!!
+
+ # Export environment
+ environ = ""
+ env = self.get("env") or ""
+ for var in env.split(" "):
+ environ += 'export %s\n' % var
+
+ command = environ + command
# If the command runs asynchronous, pre upload the command
# to the app.sh file in the remote host
dst = os.path.join(self.src_dir, "code")
self.node.upload(sources, dst, text = True)
+ def upload_stdin(self):
+ stdin = self.get("stdin")
+ if stdin:
+ # create dir for sources
+ self.info(" Uploading stdin ")
+
+ dst = os.path.join(self.app_home, "stdin")
+ self.node.upload(stdin, dst, text = True)
+
def install_dependencies(self):
depends = self.get("depends")
if depends:
super(LinuxApplication, self).deploy()
def start(self):
- command = self.get("command")
- env = self.get("env")
- stdin = 'stdin' if self.get("stdin") else None
- stdout = 'stdout' if self.get("stdout") else 'stdout'
- stderr = 'stderr' if self.get("stderr") else 'stderr'
+ command = self.get('command')
+ env = self.get('env')
+ stdin = 'stdin' if self.get('stdin') else None
+ stdout = 'stdout' if self.get('stdout') else 'stdout'
+ stderr = 'stderr' if self.get('stderr') else 'stderr'
sudo = self.get('sudo') or False
- x11 = self.get("forwardX11") or False
+ x11 = self.get('forwardX11') or False
failed = False
super(LinuxApplication, self).start()
self.info("Starting command '%s'" % command)
if x11:
+ if env:
+ # Export environment
+ environ = ""
+ for var in env.split(" "):
+ environ += ' %s ' % var
+
+ command = "(" + environ + " ; " + command + ")"
+ command = self.replace_paths(command)
+
# If the command requires X11 forwarding, we
# can't run it asynchronously
(out, err), proc = self.node.execute(command,
sudo = sudo,
stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- env = env,
forward_x11 = x11)
+ self._state = ResourceState.FINISHED
+
if proc.poll() and err:
failed = True
else:
# Command was previously uploaded, now run the remote
# bash file asynchronously
- if env:
- env = self.replace_paths(env)
-
cmd = "bash ./app.sh"
(out, err), proc = self.node.run(cmd, self.app_home,
stdin = stdin,
raise RuntimeError, msg
def stop(self):
+ command = self.get('command') or ''
state = self.state
+
if state == ResourceState.STARTED:
- self.info("Stopping command %s" % command)
+ self.info("Stopping command '%s'" % command)
(out, err), proc = self.node.kill(self.pid, self.ppid)
@property
def state(self):
if self._state == ResourceState.STARTED:
- (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+ # To avoid overwhelming the remote hosts and the local processor
+ # with too many ssh queries, the state is only requested
+ # every 'state_check_delay' .
+ if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ # check if execution errors occurred
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
- if out or err:
- if err.find("No such file or directory") >= 0 :
- # The resource is marked as started, but the
- # command was not yet executed
- return ResourceState.READY
+ if out or err:
+ if err.find("No such file or directory") >= 0 :
+ # The resource is marked as started, but the
+ # command was not yet executed
+ return ResourceState.READY
- # check if execution errors occurred
- msg = " Failed to execute command '%s'" % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
+
+ elif self.pid and self.ppid:
+ status = self.node.status(self.pid, self.ppid)
+
+ if status == sshfuncs.FINISHED:
+ self._state = ResourceState.FINISHED
- elif self.pid and self.ppid:
- status = self.node.status(self.pid, self.ppid)
- if status == sshfuncs.FINISHED:
- self._state = ResourceState.FINISHED
+ self._last_state_check = strfnow()
return self._state
.replace("${BUILD}", absolute_dir(self.build_dir))
.replace("${APP_HOME}", absolute_dir(self.app_home))
.replace("${NODE_HOME}", absolute_dir(self.node.node_home))
- .replace("${EXP_HOME}", self.node.exp_home) )
+ .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
+ )
def valid_connection(self, guid):
# TODO: Validate!
self._node = resources[0] if len(resources) == 1 else None
return self._node
- def hash_app(self):
- """ Generates a hash representing univokely the application.
- Is used to determine whether the home directory should be cleaned
- or not.
-
- """
- command = self.get("command")
- forwards_x11 = self.get("forwardX11")
- env = self.get("env")
- sudo = self.get("sudo")
- depends = self.get("depends")
- sources = self.get("sources")
- cls._register_attribute(sources)
- cls._register_attribute(build)
- cls._register_attribute(install)
- cls._register_attribute(stdin)
- cls._register_attribute(stdout)
- cls._register_attribute(stderr)
- cls._register_attribute(tear_down)
- skey = "".join(map(str, args))
- return hashlib.md5(skey).hexdigest()
-