#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+import os
+import subprocess
+import logging
+
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
ResourceState
from nepi.resources.linux.node import LinuxNode
-from nepi.util.sshfuncs import ProcStatus
+from nepi.util.sshfuncs import ProcStatus, STDOUT
from nepi.util.timefuncs import tnow, tdiffsec
-import os
-import subprocess
+# to debug, just use
+# logging.getLogger('application').setLevel(logging.DEBUG)
+logger = logging.getLogger("application")
# TODO: Resolve wildcards in commands!!
# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
@classmethod
def _register_attributes(cls):
- command = Attribute("command", "Command to execute at application start. "
- "Note that commands will be executed in the ${RUN_HOME} directory, "
- "make sure to take this into account when using relative paths. ",
- flags = Flags.Design)
- forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
- flags = Flags.Design)
- env = Attribute("env", "Environment variables string for command execution",
- flags = Flags.Design)
- sudo = Attribute("sudo", "Run with root privileges",
- flags = Flags.Design)
- depends = Attribute("depends",
- "Space-separated list of packages required to run the application",
- flags = Flags.Design)
- sources = Attribute("sources",
- "semi-colon separated list of regular files to be uploaded to ${SRC} "
- "directory prior to building. Archives won't be expanded automatically. "
- "Sources are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all sources). ",
- flags = Flags.Design)
- files = Attribute("files",
- "semi-colon separated list of regular miscellaneous files to be uploaded "
- "to ${SHARE} directory. "
- "Files are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all files). ",
- flags = Flags.Design)
- libs = Attribute("libs",
- "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
- "to ${LIB} directory. "
- "Libraries are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all files). ",
- flags = Flags.Design)
- bins = Attribute("bins",
- "semi-colon separated list of binary files to be uploaded "
- "to ${BIN} directory. "
- "Binaries are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all files). ",
- flags = Flags.Design)
- code = Attribute("code",
- "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
- flags = Flags.Design)
- build = Attribute("build",
- "Build commands to execute after deploying the sources. "
- "Sources are uploaded to the ${SRC} directory and code "
- "is uploaded to the ${APP_HOME} directory. \n"
- "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
- "./configure && make && make clean.\n"
- "Make sure to make the build commands return with a nonzero exit "
- "code on error.",
- flags = Flags.Design)
- install = Attribute("install",
- "Commands to transfer built files to their final destinations. "
- "Install commands are executed after build commands. ",
- flags = Flags.Design)
- stdin = Attribute("stdin", "Standard input for the 'command'",
- flags = Flags.Design)
- tear_down = Attribute("tearDown", "Command to be executed just before "
- "releasing the resource",
- flags = Flags.Design)
-
- cls._register_attribute(command)
- cls._register_attribute(forward_x11)
- cls._register_attribute(env)
- cls._register_attribute(sudo)
- cls._register_attribute(depends)
- cls._register_attribute(sources)
- cls._register_attribute(code)
- cls._register_attribute(files)
- cls._register_attribute(bins)
- cls._register_attribute(libs)
- cls._register_attribute(build)
- cls._register_attribute(install)
- cls._register_attribute(stdin)
- cls._register_attribute(tear_down)
+ cls._register_attribute(
+ Attribute("command", "Command to execute at application start. "
+ "Note that commands will be executed in the ${RUN_HOME} directory, "
+ "make sure to take this into account when using relative paths. ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("forwardX11",
+ "Enables X11 forwarding for SSH connections",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("env",
+ "Environment variables string for command execution",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("sudo",
+ "Run with root privileges",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("depends",
+ "Space-separated list of packages required to run the application",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("sources",
+ "semi-colon separated list of regular files to be uploaded to ${SRC} "
+ "directory prior to building. Archives won't be expanded automatically. "
+ "Sources are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all sources). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("files",
+ "semi-colon separated list of regular miscellaneous files to be uploaded "
+ "to ${SHARE} directory. "
+ "Files are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("libs",
+ "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
+ "to ${LIB} directory. "
+ "Libraries are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("bins",
+ "semi-colon separated list of binary files to be uploaded "
+ "to ${BIN} directory. "
+ "Binaries are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("code",
+ "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("build",
+ "Build commands to execute after deploying the sources. "
+ "Sources are uploaded to the ${SRC} directory and code "
+ "is uploaded to the ${APP_HOME} directory. \n"
+ "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
+ "./configure && make && make clean.\n"
+ "Make sure to make the build commands return with a nonzero exit "
+ "code on error.",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("install",
+ "Commands to transfer built files to their final destinations. "
+ "Install commands are executed after build commands. ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("stdin", "Standard input for the 'command'",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("tearDown",
+ "Command to be executed just before releasing the resource",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("splitStderr",
+ "requests stderr to be retrieved separately",
+ default = False))
@classmethod
def _register_traces(cls):
- stdout = Trace("stdout", "Standard output stream", enabled = True)
- stderr = Trace("stderr", "Standard error stream", enabled = True)
-
- cls._register_trace(stdout)
- cls._register_trace(stderr)
+ cls._register_trace(
+ Trace("stdout", "Standard output stream", enabled = True))
+ cls._register_trace(
+ Trace("stderr", "Standard error stream", enabled = True))
def __init__(self, ec, guid):
super(LinuxApplication, self).__init__(ec, guid)
self._pid = None
self._ppid = None
self._node = None
- self._home = "app-%s" % self.guid
+ self._home = "app-{}".format(self.guid)
# whether the command should run in foreground attached
# to a terminal
self._last_state_check = tnow()
def log_message(self, msg):
- return " guid %d - host %s - %s " % (self.guid,
- self.node.get("hostname"), msg)
+ return " guid {} - host {} - {} "\
+ .format(self.guid, self.node.get("hostname"), msg)
@property
def node(self):
if not self._node:
node = self.get_connected(LinuxNode.get_rtype())
if not node:
- msg = "Application %s guid %d NOT connected to Node" % (
- self._rtype, self.guid)
+ msg = "Application {} guid {} NOT connected to Node"\
+ .format(self._rtype, self.guid)
raise RuntimeError(msg)
self._node = node[0]
@property
def in_foreground(self):
- """ Returns True if the command needs to be executed in foreground.
+ """
+ Returns True if the command needs to be executed in foreground.
This means that command will be executed using 'execute' instead of
'run' ('run' executes a command in background and detached from the
terminal)
return os.path.join(self.run_home, filename)
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
- self.info("Retrieving '%s' trace %s " % (name, attr))
+ self.info("Retrieving '{}' trace {} ".format(name, attr))
path = self.trace_filepath(name)
+ logger.debug("trace: path= {}".format(path))
- command = "(test -f %s && echo 'success') || echo 'error'" % path
+ command = "(test -f {} && echo 'success') || echo 'error'".format(path)
(out, err), proc = self.node.execute(command)
if (err and proc.poll()) or out.find("error") != -1:
- msg = " Couldn't find trace %s " % name
+ msg = " Couldn't find trace {} ".format(name)
self.error(msg, out, err)
return None
(out, err), proc = self.node.check_output(self.run_home, name)
if proc.poll():
- msg = " Couldn't read trace %s " % name
+ msg = " Couldn't read trace {} ".format(name)
self.error(msg, out, err)
return None
return out
if attr == TraceAttr.STREAM:
- cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
+ cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset)
elif attr == TraceAttr.SIZE:
- cmd = "stat -c%%s %s " % path
+ cmd = "stat -c {} ".format(path)
(out, err), proc = self.node.execute(cmd)
if proc.poll():
- msg = " Couldn't find trace %s " % name
+ msg = " Couldn't find trace {} ".format(name)
self.error(msg, out, err)
return None
if self.node.get("username") == 'root':
import pickle
procs = dict()
- ps_aux = "ps aux |awk '{print $2,$11}'"
+ ps_aux = "ps aux | awk '{print $2,$11}'"
(out, err), proc = self.node.execute(ps_aux)
if len(out) != 0:
for line in out.strip().split("\n"):
# build
self.build,
# Install
- self.install]
+ self.install,
+ ]
command = []
command = self.get("command")
if command and not self.in_foreground:
- self.info("Uploading command '%s'" % command)
+# self.info("Uploading command '{}'".format(command))
# replace application specific paths in the command
command = self.replace_paths(command)
shfile = os.path.join(self.app_home, "start.sh")
self.node.upload_command(command,
- shfile = shfile,
- env = env,
- overwrite = overwrite)
+ shfile = shfile,
+ env = env,
+ overwrite = overwrite)
def execute_deploy_command(self, command, prefix="deploy"):
if command:
# Upload the command to a bash script and run it
# in background ( but wait until the command has
# finished to continue )
- shfile = os.path.join(self.app_home, "%s.sh" % prefix)
+ shfile = os.path.join(self.app_home, "{}.sh".format(prefix))
+ # low-level spawn tools in both sshfuncs and execfuncs
+ # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
+ stderr = "{}_stderr".format(prefix) \
+ if self.get("splitStderr") \
+ else STDOUT
+ print("{} : prefix = {}, command={}, stderr={}"
+ .format(self, prefix, command, stderr))
self.node.run_and_wait(command, self.run_home,
- shfile = shfile,
- overwrite = False,
- pidfile = "%s_pidfile" % prefix,
- ecodefile = "%s_exitcode" % prefix,
- stdout = "%s_stdout" % prefix,
- stderr = "%s_stderr" % prefix)
+ shfile = shfile,
+ overwrite = False,
+ pidfile = "{}_pidfile".format(prefix),
+ ecodefile = "{}_exitcode".format(prefix),
+ stdout = "{}_stdout".format(prefix),
+ stderr = stderr)
def upload_sources(self, sources = None, src_dir = None):
if not sources:
# remove the hhtp source from the sources list
sources.remove(source)
- command.append( " ( "
- # Check if the source already exists
- " ls %(src_dir)s/%(basename)s "
- " || ( "
- # If source doesn't exist, download it and check
- # that it it downloaded ok
- " wget -c --directory-prefix=%(src_dir)s %(source)s && "
- " ls %(src_dir)s/%(basename)s "
- " ) ) " % {
- "basename": os.path.basename(source),
- "source": source,
- "src_dir": src_dir
- })
+ command.append(
+ " ( "
+ # Check if the source already exists
+ " ls {src_dir}/{basename} "
+ " || ( "
+ # If source doesn't exist, download it and check
+ # that it it downloaded ok
+ " wget -c --directory-prefix={src_dir} {source} && "
+ " ls {src_dir}/{basename} "
+ " ) ) ".format(
+ basename = os.path.basename(source),
+ source = source,
+ src_dir = src_dir
+ ))
command = " && ".join(command)
files = self.get("files")
if files:
- self.info("Uploading files %s " % files)
+ self.info("Uploading files {} ".format(files))
self.node.upload(files, self.node.share_dir, overwrite = False)
def upload_libraries(self, libs = None):
libs = self.get("libs")
if libs:
- self.info("Uploading libraries %s " % libaries)
+ self.info("Uploading libraries {} ".format(libs))
self.node.upload(libs, self.node.lib_dir, overwrite = False)
def upload_binaries(self, bins = None):
bins = self.get("bins")
if bins:
- self.info("Uploading binaries %s " % binaries)
+ self.info("Uploading binaries {} ".format(bins))
self.node.upload(bins, self.node.bin_dir, overwrite = False)
def upload_code(self, code = None):
if code:
self.info("Uploading code")
-
dst = os.path.join(self.app_home, "code")
- self.node.upload(code, dst, overwrite = False, text = True)
+ self.node.upload(code, dst, overwrite = False, text = True, executable = True)
def upload_stdin(self, stdin = None):
if not stdin:
self.node.upload(stdin, dst, overwrite = False, text = True)
# create "stdin" symlink on ${APP_HOME} directory
- command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
- "app_home": self.app_home,
- "stdin": dst })
-
+ command = "( cd {app_home} ; [ ! -f stdin ] && ln -s {stdin} stdin )"\
+ .format(app_home = self.app_home, stdin = dst)
return command
def install_dependencies(self, depends = None):
depends = self.get("depends")
if depends:
- self.info("Installing dependencies %s" % depends)
+ self.info("Installing dependencies {}".format(depends))
return self.node.install_packages_command(depends)
def build(self, build = None):
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
- self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state)
+ self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state))
self.ec.schedule(self.reschedule_delay, self.deploy)
else:
command = self.get("command") or ""
- self.info("Deploying command '%s' " % command)
+ self.info("Deploying command '{}' ".format(command))
self.do_discover()
self.do_provision()
def do_start(self):
command = self.get("command")
- self.info("Starting command '%s'" % command)
+ self.info("Starting command '{}'".format(command))
if not command:
# If no command was given (i.e. Application was used for dependency
# We also set blocking = False, since we don't want the
# thread to block until the execution finishes.
(out, err), self._proc = self.execute_command(command,
- env = env,
- sudo = sudo,
- forward_x11 = x11,
- blocking = False)
+ env = env,
+ sudo = sudo,
+ forward_x11 = x11,
+ blocking = False)
if self._proc.poll():
self.error(msg, out, err)
sudo = self.get("sudo") or False
stdout = "stdout"
- stderr = "stderr"
+ # low-level spawn tools in both sshfuncs and execfuncs
+ # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
+ stderr = "stderr" \
+ if self.get("splitStderr") \
+ else STDOUT
stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
else None
# The command to run was previously uploaded to a bash script
# during deployment, now we launch the remote script using 'run'
# method from the node.
- cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
+ cmd = "bash {}".format(os.path.join(self.app_home, "start.sh"))
(out, err), proc = self.node.run(cmd, self.run_home,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- sudo = sudo)
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ sudo = sudo)
# check if execution errors occurred
- msg = " Failed to start command '%s' " % command
+ msg = " Failed to start command '{}' ".format(command)
if proc.poll():
self.error(msg, out, err)
# on the remote machine
if not self.pid or not self.ppid:
(out, err), proc = self.node.check_errors(self.run_home,
- stderr = stderr)
+ stderr = stderr)
# Out is what was written in the stderr file
if err:
- msg = " Failed to start command '%s' " % command
+ msg = " Failed to start command '{}' ".format(command)
self.error(msg, out, err)
raise RuntimeError(msg)
if self.state == ResourceState.STARTED:
- self.info("Stopping command '%s' " % command)
+ self.info("Stopping command '{}' ".format(command))
# If the command is running in foreground (it was launched using
# the node 'execute' method), then we use the handler to the Popen
# were retrieved
if self.pid and self.ppid:
(out, err), proc = self.node.kill(self.pid, self.ppid,
- sudo = self._sudo_kill)
+ sudo = self._sudo_kill)
"""
# TODO: check if execution errors occurred
if (proc and proc.poll()) or err:
- msg = " Failed to STOP command '%s' " % self.get("command")
+ msg = " Failed to STOP command '{}' ".format(self.get("command"))
self.error(msg, out, err)
"""
# retcode == 0 -> finished
if retcode:
out = ""
- msg = " Failed to execute command '%s'" % self.get("command")
+ msg = " Failed to execute command '{}'".format(self.get("command"))
err = self._proc.stderr.read()
self.error(msg, out, err)
self.do_fail()
if status == ProcStatus.FINISHED:
# If the program finished, check if execution
# errors occurred
- (out, err), proc = self.node.check_errors(
- self.run_home)
+ (out, err), proc \
+ = self.node.check_errors(self.run_home)
if err:
- msg = "Failed to execute command '%s'" % \
- self.get("command")
- self.error(msg, out, err)
- self.do_fail()
+ # Thierry : there's nothing wrong with a non-empty
+ # stderr, is there ?
+ #msg = "Failed to execute command '{}'"\
+ # .format(self.get("command"))
+ #self.error(msg, out, err)
+ #self.do_fail()
+ # xxx TODO OTOH it would definitely make sense
+ # to check the exitcode
+ pass
else:
self.set_stopped()
return self._state
def execute_command(self, command,
- env=None,
- sudo=False,
- tty=False,
- forward_x11=False,
- blocking=False):
+ env = None,
+ sudo = False,
+ tty = False,
+ forward_x11 = False,
+ blocking = False):
environ = ""
if env:
- environ = self.node.format_environment(env, inline=True)
+ environ = self.node.format_environment(env, inline = True)
command = environ + command
command = self.replace_paths(command)
return self.node.execute(command,
- sudo=sudo,
- tty=tty,
- forward_x11=forward_x11,
- blocking=blocking)
+ sudo = sudo,
+ tty = tty,
+ forward_x11 = forward_x11,
+ blocking = blocking)
- def replace_paths(self, command, node=None, app_home=None, run_home=None):
+ def replace_paths(self, command, node = None, app_home = None, run_home = None):
"""
Replace all special path tags with shell-escaped actual paths.
"""
if not node:
- node=self.node
+ node = self.node
if not app_home:
- app_home=self.app_home
+ app_home = self.app_home
if not run_home:
run_home = self.run_home
return ( command
- .replace("${USR}", node.usr_dir)
- .replace("${LIB}", node.lib_dir)
- .replace("${BIN}", node.bin_dir)
- .replace("${SRC}", node.src_dir)
- .replace("${SHARE}", node.share_dir)
- .replace("${EXP}", node.exp_dir)
- .replace("${EXP_HOME}", node.exp_home)
- .replace("${APP_HOME}", app_home)
- .replace("${RUN_HOME}", run_home)
- .replace("${NODE_HOME}", node.node_home)
- .replace("${HOME}", node.home_dir)
- )
+ .replace("${USR}", node.usr_dir)
+ .replace("${LIB}", node.lib_dir)
+ .replace("${BIN}", node.bin_dir)
+ .replace("${SRC}", node.src_dir)
+ .replace("${SHARE}", node.share_dir)
+ .replace("${EXP}", node.exp_dir)
+ .replace("${EXP_HOME}", node.exp_home)
+ .replace("${APP_HOME}", app_home)
+ .replace("${RUN_HOME}", run_home)
+ .replace("${NODE_HOME}", node.node_home)
+ .replace("${HOME}", node.home_dir)
+ # a shortcut to refer to the file uploaded as 'code = '
+ .replace("${CODE}", "{}/code".format(app_home))
+ )
def valid_connection(self, guid):
# TODO: Validate!
return True
-