import logging
import os
-DELAY ="1s"
+reschedule_delay = "0.5s"
# TODO: Resolve wildcards in commands!!
stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
- update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
- "re-upload before starting experiment. If not keep the same directory",
- default = True,
- type = Types.Bool,
- flags = Flags.ExecReadOnly)
-
tear_down = Attribute("tearDown", "Bash script to be executed before "
"releasing the resource",
flags = Flags.ReadOnly)
cls._register_attribute(stdin)
cls._register_attribute(stdout)
cls._register_attribute(stderr)
- cls._register_attribute(update_home)
cls._register_attribute(tear_down)
@classmethod
self._ppid = None
self._home = "app-%s" % self.guid
- self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
+ self._logger = logging.getLogger("LinuxApplication")
+
+ def log_message(self, msg):
+ return " guid %d - host %s - %s " % (self.guid,
+ self.node.get("hostname"), msg)
@property
def node(self):
return None
@property
- def home(self):
+ def app_home(self):
return os.path.join(self.node.exp_dir, self._home)
@property
def src_dir(self):
- return os.path.join(self.home, 'src')
+ return os.path.join(self.app_home, 'src')
@property
def build_dir(self):
- return os.path.join(self.home, 'build')
+ return os.path.join(self.app_home, 'build')
@property
def pid(self):
return self._ppid
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
- path = os.path.join(self.home, name)
+ self.info("Retrieving '%s' trace %s " % (name, attr))
+
+ path = os.path.join(self.app_home, name)
cmd = "(test -f %s && echo 'success') || echo 'error'" % path
(out, err), proc = self.node.execute(cmd)
if (err and proc.poll()) or out.find("error") != -1:
- err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
- name, self.node.get("hostname"), err)
- self.logger.error(err_msg)
+ msg = " Couldn't find trace %s " % name
+ self.error(msg, out, err)
return None
if attr == TraceAttr.PATH:
return path
if attr == TraceAttr.ALL:
- (out, err), proc = self.node.check_output(self.home, name)
+ (out, err), proc = self.node.check_output(self.app_home, name)
if err and proc.poll():
- err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
- name, self.node.get("hostname"), err)
- self.logger.error(err_msg)
+ msg = " Couldn't read trace %s " % name
+ self.error(msg, out, err)
return None
return out
(out, err), proc = self.node.execute(cmd)
if err and proc.poll():
- err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
- name, self.node.get("hostname"), err)
- self.logger.error(err_msg)
+ msg = " Couldn't find trace %s " % name
+ self.error(msg, out, err)
return None
if attr == TraceAttr.SIZE:
return out
def provision(self, filters = None):
- # TODO: verify home hash or clean home
-
# create home dir for application
- self.node.mkdir(self.home)
+ self.node.mkdir(self.app_home)
# upload sources
self.upload_sources()
# Install
self.install()
+ command = self.replace_paths(self.get("command"))
+ x11 = self.get("forwardX11") or False
+ if not x11:
+ self.info("Uploading command '%s'" % command)
+
+ # If the command runs asynchronous, pre upload the command
+ # to the app.sh file in the remote host
+ dst = os.path.join(self.app_home, "app.sh")
+ self.node.upload(command, dst, text = True)
+
super(LinuxApplication, self).provision()
def upload_sources(self):
- # check if sources need to be uploaded and upload them
+ # TODO: check if sources need to be uploaded and upload them
sources = self.get("sources")
if sources:
- self.logger.debug(" Uploading sources %s" % sources)
+ self.info(" Uploading sources ")
# create dir for sources
self.node.mkdir(self.src_dir)
- sources = self.sources.split(' ')
+ sources = sources.split(' ')
http_sources = list()
for source in list(sources):
# Download http sources
for source in http_sources:
dst = os.path.join(self.src_dir, source.split("/")[-1])
+ # TODO: Check if the tar.gz is already downloaded using a hash
+ # and don't download twice !!
command = "wget -o %s %s" % (dst, source)
self.node.execute(command)
# create dir for sources
self.node.mkdir(self.src_dir)
- self.logger.debug(" Uploading code '%s'" % code)
+ self.info(" Uploading code ")
dst = os.path.join(self.src_dir, "code")
self.node.upload(sources, dst, text = True)
def install_dependencies(self):
depends = self.get("depends")
if depends:
- self.logger.debug(" Installing dependencies %s" % depends)
- self.node.install_packages(depends, home = self.home)
+ self.info(" Installing dependencies %s" % depends)
+ self.node.install_packages(depends, home = self.app_home)
def build(self):
build = self.get("build")
if build:
- self.logger.debug(" Building sources '%s'" % build)
+ self.info(" Building sources ")
# create dir for build
self.node.mkdir(self.build_dir)
cmd = self.replace_paths(build)
- (out, err), proc = self.run_and_wait(cmd, self.home,
+ (out, err), proc = self.run_and_wait(cmd, self.app_home,
pidfile = "build_pid",
- stdout = "build_log",
+ stdout = "build_out",
stderr = "build_err",
raise_on_error = True)
def install(self):
install = self.get("install")
if install:
- self.logger.debug(" Installing sources '%s'" % install)
+ self.info(" Installing sources ")
cmd = self.replace_paths(install)
- (out, err), proc = self.run_and_wait(cmd, self.home,
+ (out, err), proc = self.run_and_wait(cmd, self.app_home,
pidfile = "install_pid",
- stdout = "install_log",
+ stdout = "install_out",
stderr = "install_err",
raise_on_error = True)
def deploy(self):
+ command = self.replace_paths(self.get("command"))
+
+ self.info(" Deploying command '%s' " % command)
+
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
- self.ec.schedule(DELAY, self.deploy)
+ self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+ self.ec.schedule(reschedule_delay, self.deploy)
else:
- self.discover()
- self.provision()
+ try:
+ self.discover()
+ self.provision()
+ except:
+ self._state = ResourceState.FAILED
+ raise
super(LinuxApplication, self).deploy()
stdin = 'stdin' if self.get("stdin") else None
sudo = self.get('sudo') or False
x11 = self.get("forwardX11") or False
- err_msg = "Failed to run command %s on host %s" % (
- command, self.node.get("hostname"))
failed = False
super(LinuxApplication, self).start()
+ self.info("Starting command '%s'" % command)
+
if x11:
(out, err), proc = self.node.execute(command,
sudo = sudo,
if proc.poll() and err:
failed = True
else:
- (out, err), proc = self.node.run(command, self.home,
+ # Run the command asynchronously
+ command = "bash ./app.sh"
+ (out, err), proc = self.node.run(command, self.app_home,
stdin = stdin,
sudo = sudo)
failed = True
if not failed:
- pid, ppid = self.node.wait_pid(home = self.home)
+ pid, ppid = self.node.wait_pid(home = self.app_home)
if pid: self._pid = int(pid)
if ppid: self._ppid = int(ppid)
if not self.pid or not self.ppid:
failed = True
- (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
+ (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
if failed or out or chkerr:
# check if execution errors occurred
+ msg = " Failed to start command '%s' " % command
+ out = out
if err:
- err_msg = "%s. Proc error: %s" % (err_msg, err)
+ err = err
+ elif chkerr:
+ err = chkerr
- err_msg = "%s. Run error: %s " % (err_msg, out)
+ self.error(msg, out, err)
- if chkerr:
- err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
+ msg2 = " Setting state to Failed"
+ self.debug(msg2)
+ self._state = ResourceState.FAILED
- self.logger.error(err_msg)
- self.state = ResourceState.FAILED
+ raise RuntimeError, msg
def stop(self):
state = self.state
if state == ResourceState.STARTED:
+ self.info("Stopping command %s" % command)
+
(out, err), proc = self.node.kill(self.pid, self.ppid)
if out or err:
# check if execution errors occurred
- err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
- self.get("command"), self.node.get("hostname"), err, out)
- self.logger.error(err_msg)
+ msg = " Failed to STOP command '%s' " % self.get("command")
+ self.error(msg, out, err)
self._state = ResourceState.FAILED
stopped = False
else:
super(LinuxApplication, self).stop()
def release(self):
+ self.info("Releasing resource")
+
tear_down = self.get("tearDown")
if tear_down:
self.node.execute(tear_down)
@property
def state(self):
if self._state == ResourceState.STARTED:
- (out, err), proc = self.node.check_output(self.home, 'stderr')
+ (out, err), proc = self.node.check_output(self.app_home, 'stderr')
if out or err:
+ if err.find("No such file or directory") >= 0 :
+ # The resource is marked as started, but the
+ # command was not yet executed
+ return ResourceState.READY
+
# check if execution errors occurred
- err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
- self.get("command"), self.node.get("hostname"), err, out)
- self.logger.error(err_msg)
+ msg = " Failed to execute command '%s'" % self.get("command")
+ self.error(msg, out, err)
self._state = ResourceState.FAILED
elif self.pid and self.ppid:
return ( command
.replace("${SOURCES}", self.src_dir)
.replace("${BUILD}", self.build_dir)
- .replace("${APPHOME}", self.home)
+ .replace("${APPHOME}", self.app_home)
.replace("${NODEHOME}", self.node.home) )