from neco.execution.resource import ResourceManager, clsinit, ResourceState
from neco.resources.linux.node import LinuxNode
from neco.util import sshfuncs
+from neco.util.timefuncs import strfnow, strfdiff
import logging
import os
-DELAY ="1s"
+reschedule_delay = "0.5s"
+state_check_delay = 1
# TODO: Resolve wildcards in commands!!
stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
- update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
- "re-upload before starting experiment. If not keep the same directory",
- default = True,
- type = Types.Bool,
- flags = Flags.ExecReadOnly)
-
tear_down = Attribute("tearDown", "Bash script to be executed before "
"releasing the resource",
flags = Flags.ReadOnly)
cls._register_attribute(stdin)
cls._register_attribute(stdout)
cls._register_attribute(stderr)
- cls._register_attribute(update_home)
cls._register_attribute(tear_down)
@classmethod
self._ppid = None
self._home = "app-%s" % self.guid
+ # timestamp of last state check of the application
+ self._last_state_check = strfnow()
+
self._logger = logging.getLogger("LinuxApplication")
def log_message(self, msg):
return None
@property
- def home(self):
- return os.path.join(self.node.exp_dir, self._home)
+ def app_home(self):
+ return os.path.join(self.node.exp_home, self._home)
@property
def src_dir(self):
- return os.path.join(self.home, 'src')
+ return os.path.join(self.app_home, 'src')
@property
def build_dir(self):
- return os.path.join(self.home, 'build')
+ return os.path.join(self.app_home, 'build')
@property
def pid(self):
return self._ppid
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
- path = os.path.join(self.home, name)
+ self.info("Retrieving '%s' trace %s " % (name, attr))
+
+ path = os.path.join(self.app_home, name)
- cmd = "(test -f %s && echo 'success') || echo 'error'" % path
- (out, err), proc = self.node.execute(cmd)
+ command = "(test -f %s && echo 'success') || echo 'error'" % path
+ (out, err), proc = self.node.execute(command)
if (err and proc.poll()) or out.find("error") != -1:
msg = " Couldn't find trace %s " % name
return path
if attr == TraceAttr.ALL:
- (out, err), proc = self.node.check_output(self.home, name)
+ (out, err), proc = self.node.check_output(self.app_home, name)
if err and proc.poll():
msg = " Couldn't read trace %s " % name
return out
def provision(self, filters = None):
- # TODO: verify home hash or clean home
-
# create home dir for application
- self.node.mkdir(self.home)
+ self.node.mkdir(self.app_home)
# upload sources
self.upload_sources()
# upload code
self.upload_code()
+ # upload stdin
+ self.upload_stdin()
+
# install dependencies
self.install_dependencies()
# Install
self.install()
+ command = self.get("command")
+ x11 = self.get("forwardX11")
+ if not x11 and command:
+ self.info("Uploading command '%s'" % command)
+
+ # Export environment
+ environ = ""
+ env = self.get("env") or ""
+ for var in env.split(" "):
+ environ += 'export %s\n' % var
+
+ command = environ + command
+
+ # If the command runs asynchronous, pre upload the command
+ # to the app.sh file in the remote host
+ dst = os.path.join(self.app_home, "app.sh")
+ command = self.replace_paths(command)
+ self.node.upload(command, dst, text = True)
+
super(LinuxApplication, self).provision()
def upload_sources(self):
- # check if sources need to be uploaded and upload them
+ # TODO: check if sources need to be uploaded and upload them
sources = self.get("sources")
if sources:
self.info(" Uploading sources ")
# create dir for sources
self.node.mkdir(self.src_dir)
- sources = self.sources.split(' ')
+ sources = sources.split(' ')
http_sources = list()
for source in list(sources):
sources.remove(source)
# Download http sources
- for source in http_sources:
- dst = os.path.join(self.src_dir, source.split("/")[-1])
- command = "wget -o %s %s" % (dst, source)
- self.node.execute(command)
-
- self.node.upload(sources, self.src_dir)
+ if http_sources:
+ cmd = " wget -c --directory-prefix=${SOURCES} "
+ verif = ""
+
+ for source in http_sources:
+ cmd += " %s " % (source)
+ verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+
+ # Wget output goes to stderr :S
+ cmd += " 2> /dev/null ; "
+
+ # Add verification
+ cmd += " %s " % verif
+
+ # Upload the command to a file, and execute asynchronously
+ self.upload_and_run(cmd,
+ "http_sources.sh", "http_sources_pid",
+ "http_sources_out", "http_sources_err")
+ if sources:
+ self.node.upload(sources, self.src_dir)
def upload_code(self):
code = self.get("code")
dst = os.path.join(self.src_dir, "code")
self.node.upload(sources, dst, text = True)
+ def upload_stdin(self):
+ stdin = self.get("stdin")
+ if stdin:
+ # create dir for sources
+ self.info(" Uploading stdin ")
+
+ dst = os.path.join(self.app_home, "stdin")
+ self.node.upload(stdin, dst, text = True)
+
def install_dependencies(self):
depends = self.get("depends")
if depends:
self.info(" Installing dependencies %s" % depends)
- self.node.install_packages(depends, home = self.home)
+ self.node.install_packages(depends, home = self.app_home)
def build(self):
build = self.get("build")
# create dir for build
self.node.mkdir(self.build_dir)
- cmd = self.replace_paths(build)
-
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "build_pid",
- stdout = "build_log",
- stderr = "build_err",
- raise_on_error = True)
+ # Upload the command to a file, and execute asynchronously
+ self.upload_and_run(build,
+ "build.sh", "build_pid",
+ "build_out", "build_err")
def install(self):
install = self.get("install")
if install:
self.info(" Installing sources ")
- cmd = self.replace_paths(install)
-
- (out, err), proc = self.run_and_wait(cmd, self.home,
- pidfile = "install_pid",
- stdout = "install_log",
- stderr = "install_err",
- raise_on_error = True)
+ # Upload the command to a file, and execute asynchronously
+ self.upload_and_run(install,
+ "install.sh", "install_pid",
+ "install_out", "install_err")
def deploy(self):
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
- self.ec.schedule(DELAY, self.deploy)
+ self.ec.schedule(reschedule_delay, self.deploy)
else:
try:
+ command = self.get("command") or ""
+ self.info(" Deploying command '%s' " % command)
self.discover()
self.provision()
except:
super(LinuxApplication, self).deploy()
def start(self):
- command = self.replace_paths(self.get("command"))
- env = self.get("env")
- stdin = 'stdin' if self.get("stdin") else None
+ command = self.get('command')
+ env = self.get('env')
+ stdin = 'stdin' if self.get('stdin') else None
+ stdout = 'stdout' if self.get('stdout') else 'stdout'
+ stderr = 'stderr' if self.get('stderr') else 'stderr'
sudo = self.get('sudo') or False
- x11 = self.get("forwardX11") or False
+ x11 = self.get('forwardX11') or False
failed = False
super(LinuxApplication, self).start()
- self.info("Starting command %s" % command)
+ if not command:
+ self.info("No command to start ")
+ self._state = ResourceState.FINISHED
+ return
+
+ self.info("Starting command '%s'" % command)
if x11:
+ if env:
+ # Export environment
+ environ = ""
+ for var in env.split(" "):
+ environ += ' %s ' % var
+
+ command = "(" + environ + " ; " + command + ")"
+ command = self.replace_paths(command)
+
+ # If the command requires X11 forwarding, we
+ # can't run it asynchronously
(out, err), proc = self.node.execute(command,
sudo = sudo,
stdin = stdin,
- stdout = 'stdout',
- stderr = 'stderr',
- env = env,
forward_x11 = x11)
+ self._state = ResourceState.FINISHED
+
if proc.poll() and err:
failed = True
else:
- (out, err), proc = self.node.run(command, self.home,
+ # Command was previously uploaded, now run the remote
+ # bash file asynchronously
+ cmd = "bash ./app.sh"
+ (out, err), proc = self.node.run(cmd, self.app_home,
stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
sudo = sudo)
if proc.poll() and err:
failed = True
if not failed:
- pid, ppid = self.node.wait_pid(home = self.home)
+ pid, ppid = self.node.wait_pid(home = self.app_home)
if pid: self._pid = int(pid)
if ppid: self._ppid = int(ppid)
if not self.pid or not self.ppid:
failed = True
- (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
+ (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
- if failed or out or chkerr:
- # check if execution errors occurred
- msg = " Failed to start command '%s' " % command
- out = out
- if err:
- err = err
- elif chkerr:
- err = chkerr
+ if failed or out or chkerr:
+ # check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+ out = out
+ if err:
+ err = err
+ elif chkerr:
+ err = chkerr
- self.error(msg, out, err)
+ self.error(msg, out, err)
- msg2 = " Setting state to Failed"
- self.debug(msg2)
- self._state = ResourceState.FAILED
+ msg2 = " Setting state to Failed"
+ self.debug(msg2)
+ self._state = ResourceState.FAILED
- raise RuntimeError, msg
+ raise RuntimeError, msg
def stop(self):
+ command = self.get('command') or ''
state = self.state
+
if state == ResourceState.STARTED:
- self.info("Stopping command %s" % command)
+ self.info("Stopping command '%s'" % command)
(out, err), proc = self.node.kill(self.pid, self.ppid)
@property
def state(self):
if self._state == ResourceState.STARTED:
- (out, err), proc = self.node.check_output(self.home, 'stderr')
+ # To avoid overwhelming the remote hosts and the local processor
+ # with too many ssh queries, the state is only requested
+ # every 'state_check_delay' .
+ if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+ # check if execution errors occurred
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
- if out or err:
- if err.find("No such file or directory") >= 0 :
- # The resource is marked as started, but the
- # command was not yet executed
- return ResourceState.READY
+ if out or err:
+ if err.find("No such file or directory") >= 0 :
+ # The resource is marked as started, but the
+ # command was not yet executed
+ return ResourceState.READY
- # check if execution errors occurred
- msg = " Failed to execute command '%s'" % self.get("command")
- self.error(msg, out, err)
- self._state = ResourceState.FAILED
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
+ self._state = ResourceState.FAILED
- elif self.pid and self.ppid:
- status = self.node.status(self.pid, self.ppid)
+ elif self.pid and self.ppid:
+ status = self.node.status(self.pid, self.ppid)
- if status == sshfuncs.FINISHED:
- self._state = ResourceState.FINISHED
+ if status == sshfuncs.FINISHED:
+ self._state = ResourceState.FINISHED
- return self._state
- def valid_connection(self, guid):
- # TODO: Validate!
- return True
- # XXX: What if it is connected to more than one node?
- resources = self.find_resources(exact_tags = [tags.NODE])
- self._node = resources[0] if len(resources) == 1 else None
- return self._node
+ self._last_state_check = strfnow()
- def hash_app(self):
- """ Generates a hash representing univokely the application.
- Is used to determine whether the home directory should be cleaned
- or not.
+ return self._state
- """
- command = self.get("command")
- forwards_x11 = self.get("forwardX11")
- env = self.get("env")
- sudo = self.get("sudo")
- depends = self.get("depends")
- sources = self.get("sources")
- cls._register_attribute(sources)
- cls._register_attribute(build)
- cls._register_attribute(install)
- cls._register_attribute(stdin)
- cls._register_attribute(stdout)
- cls._register_attribute(stderr)
- cls._register_attribute(tear_down)
- skey = "".join(map(str, args))
- return hashlib.md5(skey).hexdigest()
+ def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
+ dst = os.path.join(self.app_home, fname)
+ cmd = self.replace_paths(cmd)
+ self.node.upload(cmd, dst, text = True)
+
+ cmd = "bash ./%s" % fname
+ (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
+ pidfile = pidfile,
+ stdout = outfile,
+ stderr = errfile,
+ raise_on_error = True)
def replace_paths(self, command):
"""
Replace all special path tags with shell-escaped actual paths.
"""
- return ( command
- .replace("${SOURCES}", self.src_dir)
- .replace("${BUILD}", self.build_dir)
- .replace("${APPHOME}", self.home)
- .replace("${NODEHOME}", self.node.home) )
+ def absolute_dir(d):
+ return d if d.startswith("/") else os.path.join("${HOME}", d)
+ return ( command
+ .replace("${SOURCES}", absolute_dir(self.src_dir))
+ .replace("${BUILD}", absolute_dir(self.build_dir))
+ .replace("${APP_HOME}", absolute_dir(self.app_home))
+ .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
+ .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
+ )
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+ # XXX: What if it is connected to more than one node?
+ resources = self.find_resources(exact_tags = [tags.NODE])
+ self._node = resources[0] if len(resources) == 1 else None
+ return self._node