+# 6.1.2 - Apr 7, 2016
+
+* bugfixes in sshfuncs
+
+# 6.1.1 - Mar 16, 2016
+
+* undo dirty hack for python3 and subprocesses considered as producing
+ text outputs without distinction
+* this jeopardizes python3 code for now, but brings more stability to
+ python2 scripts in the short term; needs more work on the long run
+* a linux application that generates non-empty stderr is not considered
+ broken anymore
+
+# 6.1.0 - Mar 14, 2016
+
+* linux::Application receives new attribute `splitStderr`
+* that defaults to False
+* stdout and stderr are now merged by default in the `stdout` trace
+* previous behaviour can be achieved by setting this new attribute to `True`
+
+# 6.0.9 - Mar 11, 2016
+
+* register_resource also supports the `connectedTo` keyword,
+ that allows to call `register_connection` automatically
+* linuxapplication's code, when specified as a string,
+ is uploaded as an executable file (for inline shell scripts)
+* linuxapplication's command field can use ${CODE} to refer
+ to the path of the uploaded code (the one set by code=)
+
+# 6.0.8 - Mar 8, 2016
+
+* register_resource accepts special flag
+ autoDeploy = True
+ this way the subsequent call to deploy() is performed
+ automatically as part of register_resource
+
+# 6.0.7 - Mar 4, 2016
+
+* bugfix for python3 when running commands locally
+
+# 6.0.6 - Feb 3, 2016
+
+* bugfix for undefined variable 'nowait' in util.execfuncs.lkill
+
# 6.0.5 - Feb 2, 2016
* make it possible to install in virtualenv (not using /etc/nepi anymore)
"""
def __init__(self, name, help, type = Types.String,
- flags = None, default = None, allowed = None,
- range = None, set_hook = None):
+ flags = None, default = None, allowed = None,
+ range = None, set_hook = None):
"""
:param name: Name of the Attribute
:type name: str
self._value = value
else:
- raise ValueError("Invalid value %s for attribute %s" %
- (str(value), self.name))
+ raise ValueError("Invalid value {} for attribute {}"
+ .format(value, self.name))
value = property(get_value, set_value)
:return: Guid of the RM
:rtype: int
+ Specifying additional keywords results in the following actions
+ * autoDeploy:
+ boolean: if set, causes the created object to be `deploy`ed
+ before returning from register_resource
+ * connectedTo:
+ resourceObject: if set, causes the `register_connection` method
+ to be called before returning and after auto-deployment if relevant
+ * other keywords are used to call `set` to set attributes
+
+ Example:
+ app = ec.register_resource("linux::Application",
+ command = "systemctl start httpd",
+ autoDeploy = True,
+ connectedTo = node)
+ ### instead of
+ app = ec.register_resource("linux::Application")
+ ec.set(app, "command", "systemctl start httpd")
+ ec.deploy(app)
+ ec.register_connection(app, node)
+
"""
# Get next available guid
# xxx_next_hiccup
# Store RM
self._resources[guid] = rm
- ### so we can do something like
- # node = ec.register_resource("linux::Node",
- # username = user,
- # hostname = host)
- ### instead of
- # node = ec.register_resource("linux::Node")
- # ec.set(node, "username", user)
- # ec.set(node, "hostname", host)
+ ### special keywords
+ specials = []
+
+ # is there a need to call deploy
+ special = 'autoDeploy'
+ specials.append(special)
+ auto_deploy = special in keywords and keywords[special]
+
+ # is there a need to call register_connection, and if so to what
+ special = 'connectedTo'
+ specials.append(special)
+ connected_to = special in keywords and keywords[special]
+ ### now we can do all the calls to 'set'
for name, value in keywords.items():
- self.set(guid, name, value)
+ # specials are handled locally and not propagated to 'set'
+ if name not in specials:
+ self.set(guid, name, value)
+
+ ### deal with specials
+ if auto_deploy:
+ self.deploy(guid)
+ if connected_to:
+ self.register_connection(guid, connected_to)
return guid
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
ResourceState, ResourceAction
-from nepi.util.sshfuncs import ProcStatus
import os
import tempfile
@classmethod
def _register_attributes(cls):
- trace_name = Attribute("traceName",
- "Name of the trace to be collected",
- flags = Flags.Design)
-
- sub_dir = Attribute("subDir",
- "Sub directory to collect traces into",
- flags = Flags.Design)
-
- rename = Attribute("rename",
- "Name to give to the collected trace file",
- flags = Flags.Design)
-
- cls._register_attribute(trace_name)
- cls._register_attribute(sub_dir)
- cls._register_attribute(rename)
+ cls._register_attribute(
+ Attribute("traceName",
+ "Name of the trace to be collected",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("subDir",
+ "Sub directory to collect traces into",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("rename",
+ "Name to give to the collected trace file",
+ flags = Flags.Design))
def __init__(self, ec, guid):
super(Collector, self).__init__(ec, guid)
if subdir:
self._store_path = os.path.join(self.store_path, subdir)
- msg = "Creating local directory at %s to store %s traces " % (
- self.store_path, trace_name)
+ msg = "Creating local directory at {} to store {} traces "\
+ .format(self.store_path, trace_name)
self.info(msg)
try:
trace_name = self.get("traceName")
rename = self.get("rename") or trace_name
- msg = "Collecting '%s' traces to local directory %s" % (
- trace_name, self.store_path)
+ msg = "Collecting '{}' traces to local directory {}"\
+ .format(trace_name, self.store_path)
self.info(msg)
rms = self.get_connected()
for rm in rms:
- fpath = os.path.join(self.store_path, "%d.%s" % (rm.guid,
- rename))
+ fpath = os.path.join(self.store_path, "{}.{}"\
+ .format(rm.guid, rename))
try:
result = self.ec.trace(rm.guid, trace_name)
+ print("collector.do_release ..")
with open(fpath, "w") as f:
f.write(result)
except:
import traceback
err = traceback.format_exc()
- msg = "Couldn't retrieve trace %s for %d at %s " % (trace_name,
- rm.guid, fpath)
+ msg = "Couldn't retrieve trace {} for {} at {} "\
+ .format(trace_name, rm.guid, fpath)
self.error(msg, out = "", err = err)
continue
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+import os
+import subprocess
+import logging
+
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.trace import Trace, TraceAttr
from nepi.execution.resource import ResourceManager, clsinit_copy, \
ResourceState
from nepi.resources.linux.node import LinuxNode
-from nepi.util.sshfuncs import ProcStatus
+from nepi.util.sshfuncs import ProcStatus, STDOUT
from nepi.util.timefuncs import tnow, tdiffsec
-import os
-import subprocess
+# to debug, just use
+# logging.getLogger('application').setLevel(logging.DEBUG)
+logger = logging.getLogger("application")
# TODO: Resolve wildcards in commands!!
# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
@classmethod
def _register_attributes(cls):
- command = Attribute("command", "Command to execute at application start. "
- "Note that commands will be executed in the ${RUN_HOME} directory, "
- "make sure to take this into account when using relative paths. ",
- flags = Flags.Design)
- forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections",
- flags = Flags.Design)
- env = Attribute("env", "Environment variables string for command execution",
- flags = Flags.Design)
- sudo = Attribute("sudo", "Run with root privileges",
- flags = Flags.Design)
- depends = Attribute("depends",
- "Space-separated list of packages required to run the application",
- flags = Flags.Design)
- sources = Attribute("sources",
- "semi-colon separated list of regular files to be uploaded to ${SRC} "
- "directory prior to building. Archives won't be expanded automatically. "
- "Sources are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all sources). ",
- flags = Flags.Design)
- files = Attribute("files",
- "semi-colon separated list of regular miscellaneous files to be uploaded "
- "to ${SHARE} directory. "
- "Files are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all files). ",
- flags = Flags.Design)
- libs = Attribute("libs",
- "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
- "to ${LIB} directory. "
- "Libraries are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all files). ",
- flags = Flags.Design)
- bins = Attribute("bins",
- "semi-colon separated list of binary files to be uploaded "
- "to ${BIN} directory. "
- "Binaries are globally available for all experiments unless "
- "cleanHome is set to True (This will delete all files). ",
- flags = Flags.Design)
- code = Attribute("code",
- "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
- flags = Flags.Design)
- build = Attribute("build",
- "Build commands to execute after deploying the sources. "
- "Sources are uploaded to the ${SRC} directory and code "
- "is uploaded to the ${APP_HOME} directory. \n"
- "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
- "./configure && make && make clean.\n"
- "Make sure to make the build commands return with a nonzero exit "
- "code on error.",
- flags = Flags.Design)
- install = Attribute("install",
- "Commands to transfer built files to their final destinations. "
- "Install commands are executed after build commands. ",
- flags = Flags.Design)
- stdin = Attribute("stdin", "Standard input for the 'command'",
- flags = Flags.Design)
- tear_down = Attribute("tearDown", "Command to be executed just before "
- "releasing the resource",
- flags = Flags.Design)
-
- cls._register_attribute(command)
- cls._register_attribute(forward_x11)
- cls._register_attribute(env)
- cls._register_attribute(sudo)
- cls._register_attribute(depends)
- cls._register_attribute(sources)
- cls._register_attribute(code)
- cls._register_attribute(files)
- cls._register_attribute(bins)
- cls._register_attribute(libs)
- cls._register_attribute(build)
- cls._register_attribute(install)
- cls._register_attribute(stdin)
- cls._register_attribute(tear_down)
+ cls._register_attribute(
+ Attribute("command", "Command to execute at application start. "
+ "Note that commands will be executed in the ${RUN_HOME} directory, "
+ "make sure to take this into account when using relative paths. ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("forwardX11",
+ "Enables X11 forwarding for SSH connections",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("env",
+ "Environment variables string for command execution",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("sudo",
+ "Run with root privileges",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("depends",
+ "Space-separated list of packages required to run the application",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("sources",
+ "semi-colon separated list of regular files to be uploaded to ${SRC} "
+ "directory prior to building. Archives won't be expanded automatically. "
+ "Sources are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all sources). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("files",
+ "semi-colon separated list of regular miscellaneous files to be uploaded "
+ "to ${SHARE} directory. "
+ "Files are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("libs",
+ "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
+ "to ${LIB} directory. "
+ "Libraries are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("bins",
+ "semi-colon separated list of binary files to be uploaded "
+ "to ${BIN} directory. "
+ "Binaries are globally available for all experiments unless "
+ "cleanHome is set to True (This will delete all files). ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("code",
+ "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("build",
+ "Build commands to execute after deploying the sources. "
+ "Sources are uploaded to the ${SRC} directory and code "
+ "is uploaded to the ${APP_HOME} directory. \n"
+ "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
+ "./configure && make && make clean.\n"
+ "Make sure to make the build commands return with a nonzero exit "
+ "code on error.",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("install",
+ "Commands to transfer built files to their final destinations. "
+ "Install commands are executed after build commands. ",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("stdin", "Standard input for the 'command'",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("tearDown",
+ "Command to be executed just before releasing the resource",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("splitStderr",
+ "requests stderr to be retrieved separately",
+ default = False))
@classmethod
def _register_traces(cls):
- stdout = Trace("stdout", "Standard output stream", enabled = True)
- stderr = Trace("stderr", "Standard error stream", enabled = True)
-
- cls._register_trace(stdout)
- cls._register_trace(stderr)
+ cls._register_trace(
+ Trace("stdout", "Standard output stream", enabled = True))
+ cls._register_trace(
+ Trace("stderr", "Standard error stream", enabled = True))
def __init__(self, ec, guid):
super(LinuxApplication, self).__init__(ec, guid)
self._pid = None
self._ppid = None
self._node = None
- self._home = "app-%s" % self.guid
+ self._home = "app-{}".format(self.guid)
# whether the command should run in foreground attached
# to a terminal
self._last_state_check = tnow()
def log_message(self, msg):
- return " guid %d - host %s - %s " % (self.guid,
- self.node.get("hostname"), msg)
+ return " guid {} - host {} - {} "\
+ .format(self.guid, self.node.get("hostname"), msg)
@property
def node(self):
if not self._node:
node = self.get_connected(LinuxNode.get_rtype())
if not node:
- msg = "Application %s guid %d NOT connected to Node" % (
- self._rtype, self.guid)
+ msg = "Application {} guid {} NOT connected to Node"\
+ .format(self._rtype, self.guid)
raise RuntimeError(msg)
self._node = node[0]
@property
def in_foreground(self):
- """ Returns True if the command needs to be executed in foreground.
+ """
+ Returns True if the command needs to be executed in foreground.
This means that command will be executed using 'execute' instead of
'run' ('run' executes a command in background and detached from the
terminal)
return os.path.join(self.run_home, filename)
def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
- self.info("Retrieving '%s' trace %s " % (name, attr))
+ self.info("Retrieving '{}' trace {} ".format(name, attr))
path = self.trace_filepath(name)
+ logger.debug("trace: path= {}".format(path))
- command = "(test -f %s && echo 'success') || echo 'error'" % path
+ command = "(test -f {} && echo 'success') || echo 'error'".format(path)
(out, err), proc = self.node.execute(command)
if (err and proc.poll()) or out.find("error") != -1:
- msg = " Couldn't find trace %s " % name
+ msg = " Couldn't find trace {} ".format(name)
self.error(msg, out, err)
return None
(out, err), proc = self.node.check_output(self.run_home, name)
if proc.poll():
- msg = " Couldn't read trace %s " % name
+ msg = " Couldn't read trace {} ".format(name)
self.error(msg, out, err)
return None
return out
if attr == TraceAttr.STREAM:
- cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
+ cmd = "dd if={} bs={} count=1 skip={}".format(path, block, offset)
elif attr == TraceAttr.SIZE:
- cmd = "stat -c%%s %s " % path
+ cmd = "stat -c {} ".format(path)
(out, err), proc = self.node.execute(cmd)
if proc.poll():
- msg = " Couldn't find trace %s " % name
+ msg = " Couldn't find trace {} ".format(name)
self.error(msg, out, err)
return None
if self.node.get("username") == 'root':
import pickle
procs = dict()
- ps_aux = "ps aux |awk '{print $2,$11}'"
+ ps_aux = "ps aux | awk '{print $2,$11}'"
(out, err), proc = self.node.execute(ps_aux)
if len(out) != 0:
for line in out.strip().split("\n"):
# build
self.build,
# Install
- self.install]
+ self.install,
+ ]
command = []
command = self.get("command")
if command and not self.in_foreground:
- self.info("Uploading command '%s'" % command)
+# self.info("Uploading command '{}'".format(command))
# replace application specific paths in the command
command = self.replace_paths(command)
shfile = os.path.join(self.app_home, "start.sh")
self.node.upload_command(command,
- shfile = shfile,
- env = env,
- overwrite = overwrite)
+ shfile = shfile,
+ env = env,
+ overwrite = overwrite)
def execute_deploy_command(self, command, prefix="deploy"):
if command:
# Upload the command to a bash script and run it
# in background ( but wait until the command has
# finished to continue )
- shfile = os.path.join(self.app_home, "%s.sh" % prefix)
+ shfile = os.path.join(self.app_home, "{}.sh".format(prefix))
+ # low-level spawn tools in both sshfuncs and execfuncs
+ # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
+ stderr = "{}_stderr".format(prefix) \
+ if self.get("splitStderr") \
+ else STDOUT
+ print("{} : prefix = {}, command={}, stderr={}"
+ .format(self, prefix, command, stderr))
self.node.run_and_wait(command, self.run_home,
- shfile = shfile,
- overwrite = False,
- pidfile = "%s_pidfile" % prefix,
- ecodefile = "%s_exitcode" % prefix,
- stdout = "%s_stdout" % prefix,
- stderr = "%s_stderr" % prefix)
+ shfile = shfile,
+ overwrite = False,
+ pidfile = "{}_pidfile".format(prefix),
+ ecodefile = "{}_exitcode".format(prefix),
+ stdout = "{}_stdout".format(prefix),
+ stderr = stderr)
def upload_sources(self, sources = None, src_dir = None):
if not sources:
# remove the hhtp source from the sources list
sources.remove(source)
- command.append( " ( "
- # Check if the source already exists
- " ls %(src_dir)s/%(basename)s "
- " || ( "
- # If source doesn't exist, download it and check
- # that it it downloaded ok
- " wget -c --directory-prefix=%(src_dir)s %(source)s && "
- " ls %(src_dir)s/%(basename)s "
- " ) ) " % {
- "basename": os.path.basename(source),
- "source": source,
- "src_dir": src_dir
- })
+ command.append(
+ " ( "
+ # Check if the source already exists
+ " ls {src_dir}/{basename} "
+ " || ( "
+ # If source doesn't exist, download it and check
+ # that it it downloaded ok
+ " wget -c --directory-prefix={src_dir} {source} && "
+ " ls {src_dir}/{basename} "
+ " ) ) ".format(
+ basename = os.path.basename(source),
+ source = source,
+ src_dir = src_dir
+ ))
command = " && ".join(command)
files = self.get("files")
if files:
- self.info("Uploading files %s " % files)
+ self.info("Uploading files {} ".format(files))
self.node.upload(files, self.node.share_dir, overwrite = False)
def upload_libraries(self, libs = None):
libs = self.get("libs")
if libs:
- self.info("Uploading libraries %s " % libaries)
+ self.info("Uploading libraries {} ".format(libs))
self.node.upload(libs, self.node.lib_dir, overwrite = False)
def upload_binaries(self, bins = None):
bins = self.get("bins")
if bins:
- self.info("Uploading binaries %s " % binaries)
+ self.info("Uploading binaries {} ".format(bins))
self.node.upload(bins, self.node.bin_dir, overwrite = False)
def upload_code(self, code = None):
if code:
self.info("Uploading code")
-
dst = os.path.join(self.app_home, "code")
- self.node.upload(code, dst, overwrite = False, text = True)
+ self.node.upload(code, dst, overwrite = False, text = True, executable = True)
def upload_stdin(self, stdin = None):
if not stdin:
self.node.upload(stdin, dst, overwrite = False, text = True)
# create "stdin" symlink on ${APP_HOME} directory
- command = "( cd %(app_home)s ; [ ! -f stdin ] && ln -s %(stdin)s stdin )" % ({
- "app_home": self.app_home,
- "stdin": dst })
-
+ command = "( cd {app_home} ; [ ! -f stdin ] && ln -s {stdin} stdin )"\
+ .format(app_home = self.app_home, stdin = dst)
return command
def install_dependencies(self, depends = None):
depends = self.get("depends")
if depends:
- self.info("Installing dependencies %s" % depends)
+ self.info("Installing dependencies {}".format(depends))
return self.node.install_packages_command(depends)
def build(self, build = None):
# Wait until node is associated and deployed
node = self.node
if not node or node.state < ResourceState.READY:
- self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state)
+ self.debug("---- RESCHEDULING DEPLOY ---- node state {} ".format(self.node.state))
self.ec.schedule(self.reschedule_delay, self.deploy)
else:
command = self.get("command") or ""
- self.info("Deploying command '%s' " % command)
+ self.info("Deploying command '{}' ".format(command))
self.do_discover()
self.do_provision()
def do_start(self):
command = self.get("command")
- self.info("Starting command '%s'" % command)
+ self.info("Starting command '{}'".format(command))
if not command:
# If no command was given (i.e. Application was used for dependency
# We also set blocking = False, since we don't want the
# thread to block until the execution finishes.
(out, err), self._proc = self.execute_command(command,
- env = env,
- sudo = sudo,
- forward_x11 = x11,
- blocking = False)
+ env = env,
+ sudo = sudo,
+ forward_x11 = x11,
+ blocking = False)
if self._proc.poll():
self.error(msg, out, err)
sudo = self.get("sudo") or False
stdout = "stdout"
- stderr = "stderr"
+ # low-level spawn tools in both sshfuncs and execfuncs
+ # expect stderr=sshfuncs.STDOUT to mean std{out,err} are merged
+ stderr = "stderr" \
+ if self.get("splitStderr") \
+ else STDOUT
stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
else None
# The command to run was previously uploaded to a bash script
# during deployment, now we launch the remote script using 'run'
# method from the node.
- cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
+ cmd = "bash {}".format(os.path.join(self.app_home, "start.sh"))
(out, err), proc = self.node.run(cmd, self.run_home,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- sudo = sudo)
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ sudo = sudo)
# check if execution errors occurred
- msg = " Failed to start command '%s' " % command
+ msg = " Failed to start command '{}' ".format(command)
if proc.poll():
self.error(msg, out, err)
# on the remote machine
if not self.pid or not self.ppid:
(out, err), proc = self.node.check_errors(self.run_home,
- stderr = stderr)
+ stderr = stderr)
# Out is what was written in the stderr file
if err:
- msg = " Failed to start command '%s' " % command
+ msg = " Failed to start command '{}' ".format(command)
self.error(msg, out, err)
raise RuntimeError(msg)
if self.state == ResourceState.STARTED:
- self.info("Stopping command '%s' " % command)
+ self.info("Stopping command '{}' ".format(command))
# If the command is running in foreground (it was launched using
# the node 'execute' method), then we use the handler to the Popen
# were retrieved
if self.pid and self.ppid:
(out, err), proc = self.node.kill(self.pid, self.ppid,
- sudo = self._sudo_kill)
+ sudo = self._sudo_kill)
"""
# TODO: check if execution errors occurred
if (proc and proc.poll()) or err:
- msg = " Failed to STOP command '%s' " % self.get("command")
+ msg = " Failed to STOP command '{}' ".format(self.get("command"))
self.error(msg, out, err)
"""
# retcode == 0 -> finished
if retcode:
out = ""
- msg = " Failed to execute command '%s'" % self.get("command")
+ msg = " Failed to execute command '{}'".format(self.get("command"))
err = self._proc.stderr.read()
self.error(msg, out, err)
self.do_fail()
if status == ProcStatus.FINISHED:
# If the program finished, check if execution
# errors occurred
- (out, err), proc = self.node.check_errors(
- self.run_home)
+ (out, err), proc \
+ = self.node.check_errors(self.run_home)
if err:
- msg = "Failed to execute command '%s'" % \
- self.get("command")
- self.error(msg, out, err)
- self.do_fail()
+ # Thierry : there's nothing wrong with a non-empty
+ # stderr, is there ?
+ #msg = "Failed to execute command '{}'"\
+ # .format(self.get("command"))
+ #self.error(msg, out, err)
+ #self.do_fail()
+ # xxx TODO OTOH it would definitely make sense
+ # to check the exitcode
+ pass
else:
self.set_stopped()
return self._state
def execute_command(self, command,
- env=None,
- sudo=False,
- tty=False,
- forward_x11=False,
- blocking=False):
+ env = None,
+ sudo = False,
+ tty = False,
+ forward_x11 = False,
+ blocking = False):
environ = ""
if env:
- environ = self.node.format_environment(env, inline=True)
+ environ = self.node.format_environment(env, inline = True)
command = environ + command
command = self.replace_paths(command)
return self.node.execute(command,
- sudo=sudo,
- tty=tty,
- forward_x11=forward_x11,
- blocking=blocking)
+ sudo = sudo,
+ tty = tty,
+ forward_x11 = forward_x11,
+ blocking = blocking)
- def replace_paths(self, command, node=None, app_home=None, run_home=None):
+ def replace_paths(self, command, node = None, app_home = None, run_home = None):
"""
Replace all special path tags with shell-escaped actual paths.
"""
if not node:
- node=self.node
+ node = self.node
if not app_home:
- app_home=self.app_home
+ app_home = self.app_home
if not run_home:
run_home = self.run_home
return ( command
- .replace("${USR}", node.usr_dir)
- .replace("${LIB}", node.lib_dir)
- .replace("${BIN}", node.bin_dir)
- .replace("${SRC}", node.src_dir)
- .replace("${SHARE}", node.share_dir)
- .replace("${EXP}", node.exp_dir)
- .replace("${EXP_HOME}", node.exp_home)
- .replace("${APP_HOME}", app_home)
- .replace("${RUN_HOME}", run_home)
- .replace("${NODE_HOME}", node.node_home)
- .replace("${HOME}", node.home_dir)
- )
+ .replace("${USR}", node.usr_dir)
+ .replace("${LIB}", node.lib_dir)
+ .replace("${BIN}", node.bin_dir)
+ .replace("${SRC}", node.src_dir)
+ .replace("${SHARE}", node.share_dir)
+ .replace("${EXP}", node.exp_dir)
+ .replace("${EXP_HOME}", node.exp_home)
+ .replace("${APP_HOME}", app_home)
+ .replace("${RUN_HOME}", run_home)
+ .replace("${NODE_HOME}", node.node_home)
+ .replace("${HOME}", node.home_dir)
+ # a shortcut to refer to the file uploaded as 'code = '
+ .replace("${CODE}", "{}/code".format(app_home))
+ )
def valid_connection(self, guid):
# TODO: Validate!
return True
-
import collections
import os
+import stat
import random
import re
import tempfile
@classmethod
def _register_attributes(cls):
- cls._register_attribute(Attribute(
- "hostname", "Hostname of the machine",
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "username", "Local account username",
- flags = Flags.Credential))
-
- cls._register_attribute(Attribute(
- "port", "SSH port",
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "home",
- "Experiment home directory to store all experiment related files",
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "identity", "SSH identity file",
- flags = Flags.Credential))
-
- cls._register_attribute(Attribute(
- "serverKey", "Server public key",
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "cleanHome",
- "Remove all nepi files and directories "
- " from node home folder before starting experiment",
- type = Types.Bool,
- default = False,
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "cleanExperiment", "Remove all files and directories "
- " from a previous same experiment, before the new experiment starts",
- type = Types.Bool,
- default = False,
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "cleanProcesses",
- "Kill all running processes before starting experiment",
- type = Types.Bool,
- default = False,
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "cleanProcessesAfter",
- """Kill all running processes after starting experiment
- This might be dangerous when using user root""",
- type = Types.Bool,
- default = True,
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "tearDown",
- "Bash script to be executed before releasing the resource",
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "gatewayUser",
- "Gateway account username",
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "gateway",
- "Hostname of the gateway machine",
- flags = Flags.Design))
-
- cls._register_attribute(Attribute(
- "ip",
- "Linux host public IP address. "
- "Must not be modified by the user unless hostname is 'localhost'",
+ cls._register_attribute(
+ Attribute("hostname",
+ "Hostname of the machine",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("username",
+ "Local account username",
+ flags = Flags.Credential))
+ cls._register_attribute(
+ Attribute("port",
+ "SSH port",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("home",
+ "Experiment home directory to store all experiment related files",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("identity",
+ "SSH identity file",
+ flags = Flags.Credential))
+ cls._register_attribute(
+ Attribute("serverKey",
+ "Server public key",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("cleanHome",
+ "Remove all nepi files and directories "
+ " from node home folder before starting experiment",
+ type = Types.Bool,
+ default = False,
flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("cleanExperiment",
+ "Remove all files and directories "
+ " from a previous same experiment, before the new experiment starts",
+ type = Types.Bool,
+ default = False,
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("cleanProcesses",
+ "Kill all running processes before starting experiment",
+ type = Types.Bool,
+ default = False,
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("cleanProcessesAfter",
+ "Kill all running processes after starting experiment"
+ "NOTE: This might be dangerous when using user root",
+ type = Types.Bool,
+ default = True,
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("tearDown",
+ "Bash script to be executed before releasing the resource",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("gatewayUser",
+ "Gateway account username",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("gateway",
+ "Hostname of the gateway machine",
+ flags = Flags.Design))
+ cls._register_attribute(
+ Attribute("ip",
+ "Linux host public IP address. "
+ "Must not be modified by the user unless hostname is 'localhost'",
+ flags = Flags.Design))
def __init__(self, ec, guid):
super(LinuxNode, self).__init__(ec, guid)
if out.find("Debian") == 0:
self._os = OSType.DEBIAN
- elif out.find("Ubuntu") ==0:
+ elif out.find("Ubuntu") == 0:
self._os = OSType.UBUNTU
elif out.find("Fedora release") == 0:
self._os = OSType.FEDORA
@property
def use_deb(self):
- return (self.os & (OSType.DEBIAN|OSType.UBUNTU))
+ return (self.os & (OSType.DEBIAN | OSType.UBUNTU))
@property
def use_rpm(self):
(out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
- def search_for_child(self, pid, pids, ppid, family=[]):
+ def search_for_child(self, pid, pids, ppid, family=None):
""" Recursive function to search for child. List A contains the pids and list B the parents (ppid)
"""
+ family = family if family is not None else []
family.append(pid)
for key, value in enumerate(ppid):
if value == pid:
return (out, err), proc
def upload(self, src, dst, text = False, overwrite = True,
- raise_on_error = True):
+ raise_on_error = True, executable = False):
""" Copy content to destination
src string with the content to copy. Can be:
dst string with destination path on the remote host (remote is
always self.host)
- text src is text input, it must be stored into a temp file before
- uploading
+ when src is text input, it gets stored into a temp file before
+ uploading; in this case, and if executable is True, said temp file
+ is made executable, and thus uploaded file will be too
"""
# If source is a string input
f = None
# create a temporal file with the content to upload
# in python3 we need to open in binary mode if str is bytes
mode = 'w' if isinstance(src, str) else 'wb'
- f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
+ f = tempfile.NamedTemporaryFile(mode = mode, delete = False)
f.write(src)
f.close()
+ if executable:
+ # do something like chmod u+x
+ mode = os.stat(f.name).st_mode
+ mode |= stat.S_IXUSR
+ os.chmod(f.name, mode)
+
src = f.name
# If dst files should not be overwritten, check that the files do not
return self.execute(cmd, with_lock = True)
def run_and_wait(self, command, home,
- shfile="cmd.sh",
- env=None,
- overwrite=True,
- wait_run=True,
- pidfile="pidfile",
- ecodefile="exitcode",
- stdin=None,
- stdout="stdout",
- stderr="stderr",
- sudo=False,
- tty=False,
- raise_on_error=True):
+ shfile = "cmd.sh",
+ env = None,
+ overwrite = True,
+ wait_run = True,
+ pidfile = "pidfile",
+ ecodefile = "exitcode",
+ stdin = None,
+ stdout = "stdout",
+ stderr = "stderr",
+ sudo = False,
+ tty = False,
+ raise_on_error = True):
"""
Uploads the 'command' to a bash script in the host.
Then runs the script detached in background in the host, and
return ExitCode.ERROR
def upload_command(self, command,
- shfile="cmd.sh",
- ecodefile="exitcode",
- overwrite=True,
- env=None):
+ shfile = "cmd.sh",
+ ecodefile = "exitcode",
+ overwrite = True,
+ env = None):
""" Saves the command as a bash script file in the remote host, and
forces to save the exit code of the command execution to the ecodefile
"""
# The exit code of the command will be stored in ecodefile
command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
- .format(command=command, ecodefile=ecodefile)
+ .format(command = command, ecodefile = ecodefile)
# Export environment
environ = self.format_environment(env)
# Add environ to command
command = environ + command
- return self.upload(command, shfile, text=True, overwrite=overwrite)
+ return self.upload(command, shfile, text = True, overwrite = overwrite)
- def format_environment(self, env, inline=False):
+ def format_environment(self, env, inline = False):
""" Formats the environment variables for a command to be executed
either as an inline command
(i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
command = []
for d in dests:
- command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) )
+ command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst = d) )
command = ";".join(command)
import shlex
import subprocess
+from six import PY2
+
def lexec(command,
- user = None,
- sudo = False,
- env = None):
+ user = None,
+ sudo = False,
+ env = None):
"""
- Executes a local command, returns ((stdout,stderr),process)
+ Executes a local command, returns ((stdout, stderr), process)
"""
if env:
export = ''
for envkey, envval in env.items():
- export += '%s=%s ' % (envkey, envval)
- command = "%s %s" % (export, command)
+ export += "{}={} ".format(envkey, envval)
+ command = "{} {}".format(export, command)
if sudo:
- command = "sudo %s" % command
+ command = "sudo {}".format(command)
# XXX: Doing 'su user' blocks waiting for a password on stdin
#elif user:
- # command = "su %s ; %s " % (user, command)
+ # command = "su {} ; {} ".format(user, command)
+ extras = {} if PY2 else {'universal_newlines' : True}
proc = subprocess.Popen(command,
- shell = True,
- stdout = subprocess.PIPE,
- stderr = subprocess.PIPE)
+ shell = True,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
+ **extras)
out = err = ""
- log_msg = "lexec - command %s " % command
+ log_msg = "lexec - command {} ".format(command)
try:
out, err = proc.communicate()
def lcopy(source, dest, recursive = False):
"""
- Copies from/to localy.
+ Copies from/to locally.
"""
args = ["cp"]
else:
args.append(dest)
+ extras = {} if PY2 else {'universal_newlines' : True}
proc = subprocess.Popen(args,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ **extras)
out = err = ""
command = " ".join(args)
- log_msg = " lcopy - command %s " % command
+ log_msg = " lcopy - command {} ".format(command)
try:
out, err = proc.communicate()
return ((out, err), proc)
def lspawn(command, pidfile,
- stdout = '/dev/null',
- stderr = STDOUT,
- stdin = '/dev/null',
- home = None,
- create_home = False,
- sudo = False,
- user = None):
+ stdout = '/dev/null',
+ stderr = STDOUT,
+ stdin = '/dev/null',
+ home = None,
+ create_home = False,
+ sudo = False,
+ user = None):
"""
Spawn a local command such that it will continue working asynchronously.
else:
stderr = ' ' + stderr
- daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
- 'command' : command,
- 'pidfile' : shell_escape(pidfile),
- 'stdout' : stdout,
- 'stderr' : stderr,
- 'stdin' : stdin,
- }
+ escaped_pidfile = shell_escape(pidfile)
+ daemon_command = '{{ {{ {command} > {stdout} 2>{stderr} < {stdin} & }} ; echo $! 1 > {escaped_pidfile} ; }}'\
+ .format(**locals())
- cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s bash -c %(command)s " % {
- 'command' : shell_escape(daemon_command),
- 'sudo' : 'sudo -S' if sudo else '',
- 'pidfile' : shell_escape(pidfile),
- 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
- 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home else '',
- }
+ cmd = "{create}{gohome} rm -f {pidfile} ; {sudo} bash -c {command} "\
+ .format(command = shell_escape(daemon_command),
+ sudo = 'sudo -S' if sudo else '',
+ pidfile = shell_escape(pidfile),
+ gohome = 'cd {} ; '.format(shell_escape(home)) if home else '',
+ create = 'mkdir -p {} ; '.format(shell_escape(home)) if create_home else '')
- (out,err), proc = lexec(cmd)
+ (out, err), proc = lexec(cmd)
if proc.wait():
- raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
+ raise RuntimeError("Failed to set up local application: {} {}"
+ .format(out, err))
- return ((out,err), proc)
+ return ((out, err), proc)
def lgetpid(pidfile):
"""
or None if the pidfile isn't valid yet (maybe the process is still starting).
"""
- (out,err), proc = lexec("cat %s" % pidfile )
+ (out, err), proc = lexec("cat {}".format(pidfile))
if proc.wait():
return None
One of NOT_STARTED, RUNNING, FINISHED
"""
- (out,err), proc = lexec(
+ (out, err), proc = lexec(
# Check only by pid. pid+ppid does not always work (especially with sudo)
- " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
- 'ppid' : ppid,
- 'pid' : pid,
- })
+ " (( ps --pid {pid} -o pid | grep -c {pid} && echo 'wait') || echo 'done' ) | tail -n 1"
+ .format(ppid = ppid, pid = pid))
if proc.wait():
return ProcStatus.NOT_STARTED
return ProcStatus.RUNNING if status else ProcStatus.FINISHED
-def lkill(pid, ppid, sudo = False):
+def lkill(pid, ppid, sudo = False, nowait = False):
"""
Kill a process spawned with lspawn.
Nothing, should have killed the process
"""
- subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
- cmd = """
-SUBKILL="%(subkill)s" ;
-%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
-%(sudo)s kill %(pid)d $SUBKILL || /bin/true
+ subkill = "$(ps --ppid {pid} -o pid h)".format(pid=pid)
+ cmd_format = """
+SUBKILL="{subkill}" ;
+{sudo} kill -- -{pid} $SUBKILL || /bin/true
+{sudo} kill {pid} $SUBKILL || /bin/true
for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
sleep 0.2
- if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
+ if [ `ps --pid {pid} -o pid | grep -c {pid}` == '0' ]; then
break
else
- %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
- %(sudo)s kill %(pid)d $SUBKILL || /bin/true
+ {sudo} kill -- -{pid} $SUBKILL || /bin/true
+ {sudo} kill {pid} $SUBKILL || /bin/true
fi
sleep 1.8
done
-if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
- %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
- %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
+if [ `ps --pid {pid} -o pid | grep -c {pid}` != '0' ]; then
+ {sudo} kill -9 -- -{pid} $SUBKILL || /bin/true
+ {sudo} kill -9 {pid} $SUBKILL || /bin/true
fi
"""
if nowait:
- cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
+ cmd = "( {} ) >/dev/null 2>/dev/null </dev/null &".format(cmd)
- (out,err),proc = lexec(
- cmd % {
- 'ppid' : ppid,
- 'pid' : pid,
- 'sudo' : 'sudo -S' if sudo else '',
- 'subkill' : subkill,
- })
+ (out, err), proc = lexec(
+ cmd_format.format(
+ ppid = ppid,
+ pid = pid,
+ sudo = 'sudo -S' if sudo else '',
+ subkill = subkill))
_re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*")
+# to debug, just use
+# logging.getLogger('sshfuncs').setLevel(logging.DEBUG)
logger = logging.getLogger("sshfuncs")
def log(msg, level = logging.DEBUG, out = None, err = None):
if out:
- msg += " - OUT: %s " % out
+ msg += " - OUT is {} long".format(len(out))
if err:
- msg += " - ERROR: %s " % err
+ msg += " - ERR is {} long".format(len(err))
logger.log(level, msg)
if hasattr(os, "devnull"):
extras = {} if PY2 else {'universal_newlines' : True}
p = subprocess.Popen(
"ip -o addr list",
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
+ shell = True,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE,
**extras
)
stdout, stderr = p.communicate()
hostbyname = resolve_hostname(host)
hostbyname_cache[host] = hostbyname
- msg = " Added hostbyname %s - %s " % (host, hostbyname)
+ msg = " Added hostbyname {} - {} ".format(host, hostbyname)
log(msg, logging.DEBUG)
return hostbyname
stdin = null,
**extras
)
- out,err = proc.communicate()
+ out, err = proc.communicate()
proc.wait()
vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I)
"""
if port is not None:
- host = '%s:%s' % (host, str(port))
+ host = '{}:{}'.format(host, str(port))
# Create a temporary server key file
tmp_known_hosts = tempfile.NamedTemporaryFile()
hostbyname = gethostbyname(host)
# Add the intended host key
- tmp_known_hosts.write('%s,%s %s\n' % (host, hostbyname, server_key))
+ tmp_known_hosts.write('{},{} {}\n'.format(host, hostbyname, server_key))
# If we're not in strict mode, add user-configured keys
- if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'):
- user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),)
+ if os.environ.get('NEPI_STRICT_AUTH_MODE', "").lower() not in ('1', 'true', 'on'):
+ user_hosts_path = '{}/.ssh/known_hosts'.format(os.environ.get('HOME', ""))
if os.access(user_hosts_path, os.R_OK):
with open(user_hosts_path, "r") as f:
tmp_known_hosts.write(f.read())
ctrl_path = "/tmp/nepi_ssh"
if agent:
- ctrl_path +="_a"
+ ctrl_path += "_a"
if forward_x11:
- ctrl_path +="_x"
+ ctrl_path += "_x"
ctrl_path += "-%r@%h:%p"
else:
# unsafe string - escape
def escape(c):
- if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'):
+ if (32 <= ord(c) < 127 or c in ('\r', '\n', '\t')) and c not in ("'", '"'):
return c
else:
- return "'$'\\x%02x''" % (ord(c),)
+ return "'$'\\x{:02x}''".format(ord(c))
s = ''.join(map(escape, s))
- return "'%s'" % (s,)
+ return "'{}'".format(s)
def eintr_retry(func):
"""Retries a function invocation when a EINTR occurs"""
return rv
def rexec(command, host, user,
- port = None,
- gwuser = None,
- gw = None,
- agent = True,
- sudo = False,
- identity = None,
- server_key = None,
- env = None,
- tty = False,
- connect_timeout = 30,
- retry = 3,
- persistent = True,
- forward_x11 = False,
- blocking = True,
- strict_host_checking = True):
+ port = None,
+ gwuser = None,
+ gw = None,
+ agent = True,
+ sudo = False,
+ identity = None,
+ server_key = None,
+ env = None,
+ tty = False,
+ connect_timeout = 30,
+ retry = 3,
+ persistent = True,
+ forward_x11 = False,
+ blocking = True,
+ strict_host_checking = True):
"""
- Executes a remote command, returns ((stdout,stderr),process)
+ Executes a remote command, returns ((stdout, stderr), process)
"""
tmp_known_hosts = None
args = ['ssh', '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
- '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
+ '-o', 'ConnectTimeout={}'.format(connect_timeout),
'-o', 'ConnectionAttempts=3',
'-o', 'ServerAliveInterval=30',
'-o', 'TCPKeepAlive=yes',
if persistent and openssh_has_persist():
args.extend([
'-o', 'ControlMaster=auto',
- '-o', 'ControlPath=%s' % (make_control_path(agent, forward_x11),),
+ '-o', 'ControlPath={}'.format(make_control_path(agent, forward_x11)),
'-o', 'ControlPersist=60' ])
if not strict_host_checking:
args.append('-A')
if port:
- args.append('-p%d' % port)
+ args.append('-p{}'.format(port))
if identity:
identity = os.path.expanduser(identity)
if server_key:
# Create a temporary server key file
tmp_known_hosts = make_server_key_args(server_key, host, port)
- args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+ args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
if sudo:
command = "sudo " + command
args.append(command)
- log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args)))
+ log_msg = " rexec - host {} - command {} ".format(host, pretty_args(args))
stdout = stderr = stdin = subprocess.PIPE
if forward_x11:
stdout = stderr = stdin = None
return _retry_rexec(args, log_msg,
- stderr = stderr,
- stdin = stdin,
- stdout = stdout,
+ stdin = stdin, stdout = stdout, stderr = stderr,
env = env,
retry = retry,
tmp_known_hosts = tmp_known_hosts,
blocking = blocking)
def rcopy(source, dest,
- port = None,
- gwuser = None,
- gw = None,
- recursive = False,
- identity = None,
- server_key = None,
- retry = 3,
- strict_host_checking = True):
+ port = None,
+ gwuser = None,
+ gw = None,
+ recursive = False,
+ identity = None,
+ server_key = None,
+ retry = 3,
+ strict_host_checking = True):
"""
Copies from/to remote sites.
# Parse destination as <user>@<server>:<path>
if isinstance(dest, str) and ':' in dest:
- remspec, path = dest.split(':',1)
+ remspec, path = dest.split(':', 1)
elif isinstance(source, str) and ':' in source:
- remspec, path = source.split(':',1)
+ remspec, path = source.split(':', 1)
else:
raise ValueError("Both endpoints cannot be local")
- user,host = remspec.rsplit('@',1)
+ user, host = remspec.rsplit('@', 1)
# plain scp
tmp_known_hosts = None
'-o', 'TCPKeepAlive=yes' ]
if port:
- args.append('-P%d' % port)
+ args.append('-P{}'.format(port))
if gw:
proxycommand = _proxy_command(gw, gwuser, identity)
if server_key:
# Create a temporary server key file
tmp_known_hosts = make_server_key_args(server_key, host, port)
- args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+ args.extend(['-o', 'UserKnownHostsFile={}'.format(tmp_known_hosts.name)])
if not strict_host_checking:
# Do not check for Host key. Unsafe.
if openssh_has_persist():
args.extend([
'-o', 'ControlMaster=auto',
- '-o', 'ControlPath=%s' % (make_control_path(False, False),)
+ '-o', 'ControlPath={}'.format(make_control_path(False, False))
])
args.append(source)
else:
args.append(dest)
- log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
+ log_msg = " rcopy - host {} - command {} ".format(host, pretty_args(args))
return _retry_rexec(args, log_msg, env = None, retry = retry,
- tmp_known_hosts = tmp_known_hosts,
- blocking = True)
+ tmp_known_hosts = tmp_known_hosts,
+ blocking = True)
def rspawn(command, pidfile,
stdout = '/dev/null',
else:
stderr = ' ' + stderr
- daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
- 'command' : command,
- 'pidfile' : shell_escape(pidfile),
- 'stdout' : stdout,
- 'stderr' : stderr,
- 'stdin' : stdin,
- }
+ daemon_command = '{{ {{ {command} > {stdout} 2>{stderr} < {stdin} & }} ; echo $! 1 > {pidfile} ; }}'\
+ .format(command = command,
+ pidfile = shell_escape(pidfile),
+ stdout = stdout,
+ stderr = stderr,
+ stdin = stdin)
- cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c %(command)s " % {
- 'command' : shell_escape(daemon_command),
- 'sudo' : 'sudo -S' if sudo else '',
- 'pidfile' : shell_escape(pidfile),
- 'gohome' : 'cd %s ; ' % (shell_escape(home),) if home else '',
- 'create' : 'mkdir -p %s ; ' % (shell_escape(home),) if create_home and home else '',
- }
-
- (out,err),proc = rexec(
+ cmd = "{create}{gohome} rm -f {pidfile} ; {sudo} nohup bash -c {command} "\
+ .format(command = shell_escape(daemon_command),
+ sudo = 'sudo -S' if sudo else '',
+ pidfile = shell_escape(pidfile),
+ gohome = 'cd {} ; '.format(shell_escape(home)) if home else '',
+ create = 'mkdir -p {} ; '.format(shell_escape(home)) if create_home and home else '')
+
+ (out, err), proc = rexec(
cmd,
host = host,
port = port,
server_key = server_key,
tty = tty,
strict_host_checking = strict_host_checking ,
- )
+ )
if proc.wait():
- raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,))
+ raise RuntimeError("Failed to set up application on host {}: {} {}".format(host, out, err))
return ((out, err), proc)
or None if the pidfile isn't valid yet (can happen when process is staring up)
"""
- (out,err),proc = rexec(
- "cat %(pidfile)s" % {
- 'pidfile' : pidfile,
- },
+ (out, err), proc = rexec(
+ "cat {}".format(pidfile),
host = host,
port = port,
user = user,
identity = identity,
server_key = server_key,
strict_host_checking = strict_host_checking
- )
+ )
if proc.wait():
return None
:rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
"""
- (out,err),proc = rexec(
+ (out, err), proc = rexec(
# Check only by pid. pid+ppid does not always work (especially with sudo)
- " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % {
- 'ppid' : ppid,
- 'pid' : pid,
- },
+ " (( ps --pid {pid} -o pid | grep -c {pid} && echo 'wait') || echo 'done' ) | tail -n 1"\
+ .format(**locals()),
host = host,
port = port,
user = user,
identity = identity,
server_key = server_key,
strict_host_checking = strict_host_checking
- )
+ )
if proc.wait():
return ProcStatus.NOT_STARTED
:type sudo: bool
"""
- subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
- cmd = """
-SUBKILL="%(subkill)s" ;
-%(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
-%(sudo)s kill %(pid)d $SUBKILL || /bin/true
+ subkill = "$(ps --ppid {} -o pid h)".format(pid)
+ cmd_format = """
+SUBKILL="{subkill}" ;
+{sudo} kill -- -{pid} $SUBKILL || /bin/true
+{sudo} kill {pid} $SUBKILL || /bin/true
for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do
sleep 0.2
- if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` == '0' ]; then
+ if [ `ps --pid {pid} -o pid | grep -c {pid}` == '0' ]; then
break
else
- %(sudo)s kill -- -%(pid)d $SUBKILL || /bin/true
- %(sudo)s kill %(pid)d $SUBKILL || /bin/true
+ {sudo} kill -- -{pid} $SUBKILL || /bin/true
+ {sudo} kill {pid} $SUBKILL || /bin/true
fi
sleep 1.8
done
-if [ `ps --pid %(pid)d -o pid | grep -c %(pid)d` != '0' ]; then
- %(sudo)s kill -9 -- -%(pid)d $SUBKILL || /bin/true
- %(sudo)s kill -9 %(pid)d $SUBKILL || /bin/true
+if [ `ps --pid {pid} -o pid | grep -c {pid}` != '0' ]; then
+ {sudo} kill -9 -- -{pid} $SUBKILL || /bin/true
+ {sudo} kill -9 {pid} $SUBKILL || /bin/true
fi
"""
if nowait:
- cmd = "( %s ) >/dev/null 2>/dev/null </dev/null &" % (cmd,)
-
- (out,err),proc = rexec(
- cmd % {
- 'ppid' : ppid,
- 'pid' : pid,
- 'sudo' : 'sudo -S' if sudo else '',
- 'subkill' : subkill,
- },
+ cmd_format = "( {} ) >/dev/null 2>/dev/null </dev/null &".format(cmd_format)
+
+ sudo = 'sudo -S' if sudo else ''
+ (out, err), proc = rexec(
+ cmd_format.format(**locals()),
host = host,
port = port,
user = user,
return (out, err), proc
+# add quotes around a shell arg only if it has spaces
+def pretty_arg(shell_arg):
+ return shell_arg if ' ' not in shell_arg else "'{}'".format(shell_arg)
+def pretty_args(shell_args):
+ return " ".join([pretty_arg(shell_arg) for shell_arg in shell_args])
+
def _retry_rexec(args,
log_msg,
stdout = subprocess.PIPE,
for x in range(retry):
# display command actually invoked when debug is turned on
- message = " ".join( [ "'{}'".format(arg) for arg in args ] )
+ message = pretty_args(args)
log("sshfuncs: invoking {}".format(message), logging.DEBUG)
extras = {} if PY2 else {'universal_newlines' : True}
# connects to the remote host and starts a remote connection
if skip:
t = x*2
- msg = "SLEEPING %d ... ATEMPT %d - command %s " % (
- t, x, " ".join(args))
+ msg = "SLEEPING {} ... ATEMPT {} - command {} "\
+ .format(t, x, " ".join(args))
log(msg, logging.DEBUG)
time.sleep(t)
continue
break
except RuntimeError as e:
- msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
+ msg = " rexec EXCEPTION - TIMEOUT -> {} \n {}".format(e.args, log_msg)
log(msg, logging.DEBUG, out, err)
if retry <= 0:
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
bytes_written = os.write(proc.stdin.fileno(),
- buffer(input, input_offset, 512))
+ buffer(input, input_offset, 512))
input_offset += bytes_written
if input_offset >= len(input):
# data = os.read(proc.stdout.fileno(), 1024)
# but this would return bytes, so..
if proc.stdout in rlist:
- data = proc.stdout.read()
+ #data = proc.stdout.read()
+ data = os.read(proc.stdout.fileno(), 1024)
if not data:
proc.stdout.close()
read_set.remove(proc.stdout)
stdout.append(data)
+ log("have read {} bytes from stdout".format(len(stdout)))
# likewise
if proc.stderr in rlist:
- data = proc.stderr.read()
+ #data = proc.stderr.read()
+ data = os.read(proc.stderr.fileno(), 1024)
if not data:
proc.stderr.close()
read_set.remove(proc.stderr)
stderr.append(data)
+ log("have read {} bytes from stderr".format(len(stdout)))
# All data exchanged. Translate lists into strings.
if stdout is not None:
proxycommand = 'ProxyCommand=ssh -q '
if gwidentity:
- proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
+ proxycommand += '-i {} '.format(os.path.expanduser(gwidentity))
if gwuser:
- proxycommand += '%s' % gwuser
+ proxycommand += '{}'.format(gwuser)
else:
proxycommand += '%r'
- proxycommand += '@%s -W %%h:%%p' % gw
+ proxycommand += '@{} -W %h:%p'.format(gw)
return proxycommand