#
# NEPI, a framework to manage network experiments
# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation;
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Author: Alina Quereilhac
from nepi.execution.attribute import Attribute, Flags, Types
from nepi.execution.resource import (ResourceManager, clsinit_copy,
ResourceState)
from nepi.resources.linux import rpmfuncs, debfuncs
from nepi.util import sshfuncs, execfuncs
from nepi.util.sshfuncs import ProcStatus
import collections
import os
import stat
import random
import re
import tempfile
import time
import threading
import traceback
from six import PY3
# TODO: Unify delays!!
# TODO: Validate outcome of uploads!!
class ExitCode:
"""
Error codes that the rexitcode function can return if unable to
check the exit code of a spawned process
"""
FILENOTFOUND = -1
CORRUPTFILE = -2
ERROR = -3
OK = 0
class OSType:
"""
Supported flavors of Linux OS
"""
DEBIAN = 1
UBUNTU = 1 << 1
FEDORA = 1 << 2
FEDORA_8 = 1 << 3 | FEDORA
FEDORA_12 = 1 << 4 | FEDORA
FEDORA_14 = 1 << 5 | FEDORA
@clsinit_copy
class LinuxNode(ResourceManager):
"""
.. class:: Class Args :
:param ec: The Experiment controller
:type ec: ExperimentController
:param guid: guid of the RM
:type guid: int
.. note::
There are different ways in which commands can be executed using the
LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
'run_and_wait').
Brief explanation:
* 'execute' (blocking mode) :
HOW IT WORKS: 'execute', forks a process and run the
command, synchronously, attached to the terminal, in
foreground.
The execute method will block until the command returns
the result on 'out', 'err' (so until it finishes executing).
USAGE: short-lived commands that must be executed attached
to a terminal and in foreground, for which it IS necessary
to block until the command has finished (e.g. if you want
to run 'ls' or 'cat').
* 'execute' (NON blocking mode - blocking = False) :
HOW IT WORKS: Same as before, except that execute method
will return immediately (even if command still running).
USAGE: long-lived commands that must be executed attached
to a terminal and in foreground, but for which it is not
necessary to block until the command has finished. (e.g.
start an application using X11 forwarding)
* 'run' :
HOW IT WORKS: Connects to the host ( using SSH if remote)
and launches the command in background, detached from any
terminal (daemonized), and returns. The command continues to
run remotely, but since it is detached from the terminal,
its pipes (stdin, stdout, stderr) can't be redirected to the
console (as normal non detached processes would), and so they
are explicitly redirected to files. The pidfile is created as
part of the process of launching the command. The pidfile
holds the pid and ppid of the process forked in background,
so later on it is possible to check whether the command is still
running.
USAGE: long-lived commands that can run detached in background,
for which it is NOT necessary to block (wait) until the command
has finished. (e.g. start an application that is not using X11
forwarding. It can run detached and remotely in background)
* 'run_and_wait' :
HOW IT WORKS: Similar to 'run' except that it 'blocks' until
the command has finished execution. It also checks whether
errors occurred during runtime by reading the exitcode file,
which contains the exit code of the command that was run
(checking stderr only is not always reliable since many
commands throw debugging info to stderr and the only way to
automatically know whether an error really happened is to
check the process exit code).
Another difference with respect to 'run', is that instead
of directly executing the command as a bash command line,
it uploads the command to a bash script and runs the script.
This allows to use the bash script to debug errors, since
it remains at the remote host and can be run manually to
reproduce the error.
USAGE: medium-lived commands that can run detached in
background, for which it IS necessary to block (wait) until
the command has finished. (e.g. Package installation,
source compilation, file download, etc)
"""
_rtype = "linux::Node"
_help = "Controls Linux host machines ( either localhost or a host " \
"that can be accessed using a SSH key)"
_platform = "linux"
@classmethod
def _register_attributes(cls):
cls._register_attribute(
Attribute("hostname",
"Hostname of the machine",
flags = Flags.Design))
cls._register_attribute(
Attribute("username",
"Local account username",
flags = Flags.Credential))
cls._register_attribute(
Attribute("port",
"SSH port",
flags = Flags.Design))
cls._register_attribute(
Attribute("home",
"Experiment home directory to store all experiment related files",
flags = Flags.Design))
cls._register_attribute(
Attribute("identity",
"SSH identity file",
flags = Flags.Credential))
cls._register_attribute(
Attribute("serverKey",
"Server public key",
flags = Flags.Design))
cls._register_attribute(
Attribute("cleanHome",
"Remove all nepi files and directories "
" from node home folder before starting experiment",
type = Types.Bool,
default = False,
flags = Flags.Design))
cls._register_attribute(
Attribute("cleanExperiment",
"Remove all files and directories "
" from a previous same experiment, before the new experiment starts",
type = Types.Bool,
default = False,
flags = Flags.Design))
cls._register_attribute(
Attribute("cleanProcesses",
"Kill all running processes before starting experiment",
type = Types.Bool,
default = False,
flags = Flags.Design))
cls._register_attribute(
Attribute("cleanProcessesAfter",
"Kill all running processes after starting experiment"
"NOTE: This might be dangerous when using user root",
type = Types.Bool,
default = True,
flags = Flags.Design))
cls._register_attribute(
Attribute("tearDown",
"Bash script to be executed before releasing the resource",
flags = Flags.Design))
cls._register_attribute(
Attribute("gatewayUser",
"Gateway account username",
flags = Flags.Design))
cls._register_attribute(
Attribute("gateway",
"Hostname of the gateway machine",
flags = Flags.Design))
cls._register_attribute(
Attribute("ip",
"Linux host public IP address. "
"Must not be modified by the user unless hostname is 'localhost'",
flags = Flags.Design))
def __init__(self, ec, guid):
super(LinuxNode, self).__init__(ec, guid)
self._os = None
# home directory at Linux host
self._home_dir = ""
# lock to prevent concurrent applications on the same node,
# to execute commands at the same time. There are potential
# concurrency issues when using SSH to a same host from
# multiple threads. There are also possible operational
# issues, e.g. an application querying the existence
# of a file or folder prior to its creation, and another
# application creating the same file or folder in between.
self._node_lock = threading.Lock()
def log_message(self, msg):
return " guid {} - host {} - {} "\
.format(self.guid, self.get("hostname"), msg)
@property
def home_dir(self):
home = self.get("home") or ""
if not home.startswith("/"):
home = os.path.join(self._home_dir, home)
return home
@property
def nepi_home(self):
return os.path.join(self.home_dir, ".nepi")
@property
def usr_dir(self):
return os.path.join(self.nepi_home, "nepi-usr")
@property
def lib_dir(self):
return os.path.join(self.usr_dir, "lib")
@property
def bin_dir(self):
return os.path.join(self.usr_dir, "bin")
@property
def src_dir(self):
return os.path.join(self.usr_dir, "src")
@property
def share_dir(self):
return os.path.join(self.usr_dir, "share")
@property
def exp_dir(self):
return os.path.join(self.nepi_home, "nepi-exp")
@property
def exp_home(self):
return os.path.join(self.exp_dir, self.ec.exp_id)
@property
def node_home(self):
return os.path.join(self.exp_home, "node-{}".format(self.guid))
@property
def run_home(self):
return os.path.join(self.node_home, self.ec.run_id)
@property
def os(self):
if self._os:
return self._os
if not self.localhost and not self.get("username"):
msg = "Can't resolve OS, insufficient data "
self.error(msg)
raise RuntimeError(msg)
out = self.get_os()
if out.find("Debian") == 0:
self._os = OSType.DEBIAN
elif out.find("Ubuntu") == 0:
self._os = OSType.UBUNTU
elif out.find("Fedora release") == 0:
self._os = OSType.FEDORA
if out.find("Fedora release 8") == 0:
self._os = OSType.FEDORA_8
elif out.find("Fedora release 12") == 0:
self._os = OSType.FEDORA_12
elif out.find("Fedora release 14") == 0:
self._os = OSType.FEDORA_14
else:
msg = "Unsupported OS"
self.error(msg, out)
raise RuntimeError("{} - {} ".format(msg, out))
return self._os
def get_os(self):
# The underlying SSH layer will sometimes return an empty
# output (even if the command was executed without errors).
# To work arround this, repeat the operation N times or
# until the result is not empty string
out = ""
try:
(out, err), proc = self.execute("cat /etc/issue",
with_lock = True,
blocking = True)
except:
trace = traceback.format_exc()
msg = "Error detecting OS: {} ".format(trace)
self.error(msg, out, err)
return out
@property
def use_deb(self):
return (self.os & (OSType.DEBIAN | OSType.UBUNTU))
@property
def use_rpm(self):
return (self.os & OSType.FEDORA)
@property
def localhost(self):
return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
def do_provision(self):
# check if host is alive
if not self.is_alive():
trace = traceback.format_exc()
msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace)
self.error(msg)
raise RuntimeError(msg)
self.find_home()
if self.get("cleanProcesses"):
self.clean_processes()
if self.get("cleanHome"):
self.clean_home()
if self.get("cleanExperiment"):
self.clean_experiment()
# Create shared directory structure and node home directory
paths = [self.lib_dir,
self.bin_dir,
self.src_dir,
self.share_dir,
self.node_home]
self.mkdir(paths)
# Get Public IP address if possible
if not self.get("ip"):
try:
ip = sshfuncs.gethostbyname(self.get("hostname"))
self.set("ip", ip)
except:
if self.get("gateway") is None:
msg = "Local DNS can not resolve hostname {}".format(self.get("hostname"))
self.error(msg)
super(LinuxNode, self).do_provision()
def do_deploy(self):
if self.state == ResourceState.NEW:
self.info("Deploying node")
self.do_discover()
self.do_provision()
# Node needs to wait until all associated interfaces are
# ready before it can finalize deployment
from nepi.resources.linux.interface import LinuxInterface
ifaces = self.get_connected(LinuxInterface.get_rtype())
for iface in ifaces:
if iface.state < ResourceState.READY:
self.ec.schedule(self.reschedule_delay, self.deploy)
return
super(LinuxNode, self).do_deploy()
def do_release(self):
rms = self.get_connected()
for rm in rms:
# Node needs to wait until all associated RMs are released
# before it can be released
if rm.state != ResourceState.RELEASED:
self.ec.schedule(self.reschedule_delay, self.release)
return
tear_down = self.get("tearDown")
if tear_down:
self.execute(tear_down)
if self.get("cleanProcessesAfter"):
self.clean_processes()
super(LinuxNode, self).do_release()
def valid_connection(self, guid):
# TODO: Validate!
return True
def clean_processes(self):
self.info("Cleaning up processes")
if self.localhost:
return
if self.get("username") != 'root':
cmd = ("sudo -S killall tcpdump || /bin/true ; " +
"sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
"sudo -S killall -u {} || /bin/true ; ".format(self.get("username")))
else:
if self.state >= ResourceState.READY:
########################
#Collect all process (must change for a more intelligent way)
ppid = []
pids = []
avoid_pids = "ps axjf | awk '{print $1,$2}'"
(out, err), proc = self.execute(avoid_pids)
if len(out) != 0:
for line in out.strip().split("\n"):
parts = line.strip().split(" ")
ppid.append(parts[0])
pids.append(parts[1])
#Collect all process below ssh -D
tree_owner = 0
ssh_pids = []
sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'"
(out, err), proc = self.execute(sshs)
if len(out) != 0:
for line in out.strip().split("\n"):
parts = line.strip().split(" ")
if parts[1].startswith('root@pts'):
ssh_pids.append(parts[0])
elif parts[1] == "-D":
tree_owner = parts[0]
avoid_kill = []
temp = []
#Search for the child process of the pid's collected at the first block.
for process in ssh_pids:
temp = self.search_for_child(process, pids, ppid)
avoid_kill = list(set(temp))
if len(avoid_kill) > 0:
avoid_kill.append(tree_owner)
########################
import pickle
with open("/tmp/save.proc", "rb") as pickle_file:
pids = pickle.load(pickle_file)
pids_temp = dict()
ps_aux = "ps aux | awk '{print $2,$11}'"
(out, err), proc = self.execute(ps_aux)
if len(out) != 0:
for line in out.strip().split("\n"):
parts = line.strip().split(" ")
pids_temp[parts[0]] = parts[1]
# creates the difference between the machine pids freezed (pickle) and the actual
# adding the avoided pids filtered above (avoid_kill) to allow users keep process
# alive when using besides ssh connections
kill_pids = set(pids_temp.items()) - set(pids.items())
# py2/py3 : keep it simple
kill_pids = ' '.join(kill_pids)
# removing pids from beside connections and its process
kill_pids = kill_pids.split(' ')
kill_pids = list(set(kill_pids) - set(avoid_kill))
kill_pids = ' '.join(kill_pids)
cmd = ("killall tcpdump || /bin/true ; " +
"kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " +
"kill {} || /bin/true ; ".format(kill_pids))
else:
cmd = ("killall tcpdump || /bin/true ; " +
"kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
else:
cmd = ("killall tcpdump || /bin/true ; " +
"kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ")
(out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
def search_for_child(self, pid, pids, ppid, family=None):
""" Recursive function to search for child. List A contains the pids and list B the parents (ppid)
"""
family = family if family is not None else []
family.append(pid)
for key, value in enumerate(ppid):
if value == pid:
child = pids[key]
self.search_for_child(child, pids, ppid)
return family
def clean_home(self):
""" Cleans all NEPI related folders in the Linux host
"""
self.info("Cleaning up home")
cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\
.format(self.home_dir)
return self.execute(cmd, with_lock = True)
def clean_experiment(self):
""" Cleans all experiment related files in the Linux host.
It preserves NEPI files and folders that have a multi experiment
scope.
"""
self.info("Cleaning up experiment files")
cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\
.format(self.exp_dir, self.ec.exp_id)
return self.execute(cmd, with_lock = True)
def execute(self, command,
sudo = False,
env = None,
tty = False,
forward_x11 = False,
retry = 3,
connect_timeout = 30,
strict_host_checking = False,
persistent = True,
blocking = True,
with_lock = False
):
""" Notice that this invocation will block until the
execution finishes. If this is not the desired behavior,
use 'run' instead."""
if self.localhost:
(out, err), proc = execfuncs.lexec(
command,
user = self.get("username"), # still problem with localhost
sudo = sudo,
env = env)
else:
if with_lock:
# If the execute command is blocking, we don't want to keep
# the node lock. This lock is used to avoid race conditions
# when creating the ControlMaster sockets. A more elegant
# solution is needed.
with self._node_lock:
(out, err), proc = sshfuncs.rexec(
command,
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
gwuser = self.get("gatewayUser"),
gw = self.get("gateway"),
agent = True,
sudo = sudo,
identity = self.get("identity"),
server_key = self.get("serverKey"),
env = env,
tty = tty,
forward_x11 = forward_x11,
retry = retry,
connect_timeout = connect_timeout,
persistent = persistent,
blocking = blocking,
strict_host_checking = strict_host_checking
)
else:
(out, err), proc = sshfuncs.rexec(
command,
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
gwuser = self.get("gatewayUser"),
gw = self.get("gateway"),
agent = True,
sudo = sudo,
identity = self.get("identity"),
server_key = self.get("serverKey"),
env = env,
tty = tty,
forward_x11 = forward_x11,
retry = retry,
connect_timeout = connect_timeout,
persistent = persistent,
blocking = blocking,
strict_host_checking = strict_host_checking
)
return (out, err), proc
def run(self, command, home,
create_home = False,
pidfile = 'pidfile',
stdin = None,
stdout = 'stdout',
stderr = 'stderr',
sudo = False,
tty = False,
strict_host_checking = False):
self.debug("Running command '{}'".format(command))
if self.localhost:
(out, err), proc = execfuncs.lspawn(
command, pidfile,
home = home,
create_home = create_home,
stdin = stdin or '/dev/null',
stdout = stdout or '/dev/null',
stderr = stderr or '/dev/null',
sudo = sudo)
else:
with self._node_lock:
(out, err), proc = sshfuncs.rspawn(
command,
pidfile = pidfile,
home = home,
create_home = create_home,
stdin = stdin or '/dev/null',
stdout = stdout or '/dev/null',
stderr = stderr or '/dev/null',
sudo = sudo,
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
gwuser = self.get("gatewayUser"),
gw = self.get("gateway"),
agent = True,
identity = self.get("identity"),
server_key = self.get("serverKey"),
tty = tty,
strict_host_checking = strict_host_checking
)
return (out, err), proc
def getpid(self, home, pidfile = "pidfile"):
if self.localhost:
pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile))
else:
with self._node_lock:
pidtuple = sshfuncs.rgetpid(
os.path.join(home, pidfile),
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
gwuser = self.get("gatewayUser"),
gw = self.get("gateway"),
agent = True,
identity = self.get("identity"),
server_key = self.get("serverKey"),
strict_host_checking = False
)
return pidtuple
def status(self, pid, ppid):
if self.localhost:
status = execfuncs.lstatus(pid, ppid)
else:
with self._node_lock:
status = sshfuncs.rstatus(
pid, ppid,
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
gwuser = self.get("gatewayUser"),
gw = self.get("gateway"),
agent = True,
identity = self.get("identity"),
server_key = self.get("serverKey"),
strict_host_checking = False
)
return status
def kill(self, pid, ppid, sudo = False):
out = err = ""
proc = None
status = self.status(pid, ppid)
if status == sshfuncs.ProcStatus.RUNNING:
if self.localhost:
(out, err), proc = execfuncs.lkill(pid, ppid, sudo)
else:
with self._node_lock:
(out, err), proc = sshfuncs.rkill(
pid, ppid,
host = self.get("hostname"),
user = self.get("username"),
port = self.get("port"),
gwuser = self.get("gatewayUser"),
gw = self.get("gateway"),
agent = True,
sudo = sudo,
identity = self.get("identity"),
server_key = self.get("serverKey"),
strict_host_checking = False
)
return (out, err), proc
def copy(self, src, dst):
if self.localhost:
(out, err), proc = execfuncs.lcopy(
src, dst,
recursive = True)
else:
with self._node_lock:
(out, err), proc = sshfuncs.rcopy(
src, dst,
port = self.get("port"),
gwuser = self.get("gatewayUser"),
gw = self.get("gateway"),
identity = self.get("identity"),
server_key = self.get("serverKey"),
recursive = True,
strict_host_checking = False)
return (out, err), proc
def upload(self, src, dst, text = False, overwrite = True,
raise_on_error = True, executable = False):
""" Copy content to destination
src string with the content to copy. Can be:
- plain text
- a string with the path to a local file
- a string with a semi-colon separeted list of local files
- a string with a local directory
dst string with destination path on the remote host (remote is
always self.host)
when src is text input, it gets stored into a temp file before
uploading; in this case, and if executable is True, said temp file
is made executable, and thus uploaded file will be too
"""
# If source is a string input
f = None
if text and not os.path.isfile(src):
# src is text input that should be uploaded as file
# create a temporal file with the content to upload
# in python3 we need to open in binary mode if str is bytes
mode = 'w' if isinstance(src, str) else 'wb'
f = tempfile.NamedTemporaryFile(mode = mode, delete = False)
f.write(src)
f.close()
if executable:
# do something like chmod u+x
mode = os.stat(f.name).st_mode
mode |= stat.S_IXUSR
os.chmod(f.name, mode)
src = f.name
# If dst files should not be overwritten, check that the files do not
# exist already
if isinstance(src, str):
src = [s.strip() for s in src.split(";")]
if overwrite == False:
src = self.filter_existing_files(src, dst)
if not src:
return ("", ""), None
if not self.localhost:
# Build destination as @:
dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst)
((out, err), proc) = self.copy(src, dst)
# clean up temp file
if f:
os.remove(f.name)
if err:
msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst)
self.error(msg, out, err)
msg = "{} out: {} err: {}".format(msg, out, err)
if raise_on_error:
raise RuntimeError(msg)
return ((out, err), proc)
def download(self, src, dst, raise_on_error = True):
if not self.localhost:
# Build destination as @:
src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src)
((out, err), proc) = self.copy(src, dst)
if err:
msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst)
self.error(msg, out, err)
if raise_on_error:
raise RuntimeError(msg)
return ((out, err), proc)
def install_packages_command(self, packages):
command = ""
if self.use_rpm:
command = rpmfuncs.install_packages_command(self.os, packages)
elif self.use_deb:
command = debfuncs.install_packages_command(self.os, packages)
else:
msg = "Error installing packages ( OS not known ) "
self.error(msg, self.os)
raise RuntimeError(msg)
return command
def install_packages(self, packages, home,
run_home = None,
raise_on_error = True):
""" Install packages in the Linux host.
'home' is the directory to upload the package installation script.
'run_home' is the directory from where to execute the script.
"""
command = self.install_packages_command(packages)
run_home = run_home or home
(out, err), proc = self.run_and_wait(command, run_home,
shfile = os.path.join(home, "instpkg.sh"),
pidfile = "instpkg_pidfile",
ecodefile = "instpkg_exitcode",
stdout = "instpkg_stdout",
stderr = "instpkg_stderr",
overwrite = False,
raise_on_error = raise_on_error)
return (out, err), proc
def remove_packages(self, packages, home, run_home = None,
raise_on_error = True):
""" Uninstall packages from the Linux host.
'home' is the directory to upload the package un-installation script.
'run_home' is the directory from where to execute the script.
"""
if self.use_rpm:
command = rpmfuncs.remove_packages_command(self.os, packages)
elif self.use_deb:
command = debfuncs.remove_packages_command(self.os, packages)
else:
msg = "Error removing packages ( OS not known ) "
self.error(msg)
raise RuntimeError(msg)
run_home = run_home or home
(out, err), proc = self.run_and_wait(command, run_home,
shfile = os.path.join(home, "rmpkg.sh"),
pidfile = "rmpkg_pidfile",
ecodefile = "rmpkg_exitcode",
stdout = "rmpkg_stdout",
stderr = "rmpkg_stderr",
overwrite = False,
raise_on_error = raise_on_error)
return (out, err), proc
def mkdir(self, paths, clean = False):
""" Paths is either a single remote directory path to create,
or a list of directories to create.
"""
if clean:
self.rmdir(paths)
if isinstance(paths, str):
paths = [paths]
cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths])
return self.execute(cmd, with_lock = True)
def rmdir(self, paths):
""" Paths is either a single remote directory path to delete,
or a list of directories to delete.
"""
if isinstance(paths, str):
paths = [paths]
cmd = " ; ".join(["rm -rf {}".format(path) for path in paths])
return self.execute(cmd, with_lock = True)
def run_and_wait(self, command, home,
shfile = "cmd.sh",
env = None,
overwrite = True,
wait_run = True,
pidfile = "pidfile",
ecodefile = "exitcode",
stdin = None,
stdout = "stdout",
stderr = "stderr",
sudo = False,
tty = False,
raise_on_error = True):
"""
Uploads the 'command' to a bash script in the host.
Then runs the script detached in background in the host, and
busy-waites until the script finishes executing.
"""
if not shfile.startswith("/"):
shfile = os.path.join(home, shfile)
self.upload_command(command,
shfile = shfile,
ecodefile = ecodefile,
env = env,
overwrite = overwrite)
command = "bash {}".format(shfile)
# run command in background in remote host
(out, err), proc = self.run(command, home,
pidfile = pidfile,
stdin = stdin,
stdout = stdout,
stderr = stderr,
sudo = sudo,
tty = tty)
# check no errors occurred
if proc.poll():
msg = " Failed to run command '{}' ".format(command)
self.error(msg, out, err)
if raise_on_error:
raise RuntimeError(msg)
# Wait for pid file to be generated
pid, ppid = self.wait_pid(
home = home,
pidfile = pidfile,
raise_on_error = raise_on_error)
if wait_run:
# wait until command finishes to execute
self.wait_run(pid, ppid)
(eout, err), proc = self.check_errors(home,
ecodefile = ecodefile,
stderr = stderr)
# Out is what was written in the stderr file
if err:
msg = " Failed to run command '{}' ".format(command)
self.error(msg, eout, err)
if raise_on_error:
raise RuntimeError(msg)
(out, oerr), proc = self.check_output(home, stdout)
return (out, err), proc
def exitcode(self, home, ecodefile = "exitcode"):
"""
Get the exit code of an application.
Returns an integer value with the exit code
"""
(out, err), proc = self.check_output(home, ecodefile)
# Succeeded to open file, return exit code in the file
if proc.wait() == 0:
try:
return int(out.strip())
except:
# Error in the content of the file!
return ExitCode.CORRUPTFILE
# No such file or directory
if proc.returncode == 1:
return ExitCode.FILENOTFOUND
# Other error from 'cat'
return ExitCode.ERROR
def upload_command(self, command,
shfile = "cmd.sh",
ecodefile = "exitcode",
overwrite = True,
env = None):
""" Saves the command as a bash script file in the remote host, and
forces to save the exit code of the command execution to the ecodefile
"""
if not (command.strip().endswith(";") or command.strip().endswith("&")):
command += ";"
# The exit code of the command will be stored in ecodefile
command = " {{ {command} }} ; echo $? > {ecodefile} ;"\
.format(command = command, ecodefile = ecodefile)
# Export environment
environ = self.format_environment(env)
# Add environ to command
command = environ + command
return self.upload(command, shfile, text = True, overwrite = overwrite)
def format_environment(self, env, inline = False):
""" Formats the environment variables for a command to be executed
either as an inline command
(i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or
as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
"""
if not env: return ""
# Remove extra white spaces
env = re.sub(r'\s+', ' ', env.strip())
sep = ";" if inline else "\n"
return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep
def check_errors(self, home,
ecodefile = "exitcode",
stderr = "stderr"):
""" Checks whether errors occurred while running a command.
It first checks the exit code for the command, and only if the
exit code is an error one it returns the error output.
"""
proc = None
err = ""
# get exit code saved in the 'exitcode' file
ecode = self.exitcode(home, ecodefile)
if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile)
elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
# The process returned an error code or didn't exist.
# Check standard error.
(err, eerr), proc = self.check_output(home, stderr)
# If the stderr file was not found, assume nothing bad happened,
# and just ignore the error.
# (cat returns 1 for error "No such file or directory")
if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1:
err = ""
return ("", err), proc
def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
""" Waits until the pid file for the command is generated,
and returns the pid and ppid of the process """
pid = ppid = None
delay = 1.0
for i in range(2):
pidtuple = self.getpid(home = home, pidfile = pidfile)
if pidtuple:
pid, ppid = pidtuple
break
else:
time.sleep(delay)
delay = delay * 1.5
else:
msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile )
self.error(msg)
if raise_on_error:
raise RuntimeError(msg)
return pid, ppid
def wait_run(self, pid, ppid, trial = 0):
""" wait for a remote process to finish execution """
delay = 1.0
while True:
status = self.status(pid, ppid)
if status is ProcStatus.FINISHED:
break
elif status is not ProcStatus.RUNNING:
delay = delay * 1.5
time.sleep(delay)
# If it takes more than 20 seconds to start, then
# asume something went wrong
if delay > 20:
break
else:
# The app is running, just wait...
time.sleep(0.5)
def check_output(self, home, filename):
""" Retrives content of file """
(out, err), proc = self.execute(
"cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True)
return (out, err), proc
def is_alive(self):
""" Checks if host is responsive
"""
if self.localhost:
return True
out = err = ""
msg = "Unresponsive host. Wrong answer. "
# The underlying SSH layer will sometimes return an empty
# output (even if the command was executed without errors).
# To work arround this, repeat the operation N times or
# until the result is not empty string
try:
(out, err), proc = self.execute("echo 'ALIVE'",
blocking = True,
with_lock = True)
if out.find("ALIVE") > -1:
return True
except:
trace = traceback.format_exc()
msg = "Unresponsive host. Error reaching host: {} ".format(trace)
self.error(msg, out, err)
return False
def find_home(self):
"""
Retrieves host home directory
"""
# The underlying SSH layer will sometimes return an empty
# output (even if the command was executed without errors).
# To work arround this, repeat the operation N times or
# until the result is not empty string
msg = "Impossible to retrieve HOME directory"
try:
(out, err), proc = self.execute("echo ${HOME}",
blocking = True,
with_lock = True)
if out.strip() != "":
self._home_dir = out.strip()
except:
trace = traceback.format_exc()
msg = "Impossible to retrieve HOME directory {}".format(trace)
if not self._home_dir:
self.error(msg)
raise RuntimeError(msg)
def filter_existing_files(self, src, dst):
""" Removes files that already exist in the Linux host from src list
"""
# construct a dictionary with { dst: src }
dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \
if len(src) > 1 else {dst: src[0]}
command = []
for d in dests:
command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst = d) )
command = ";".join(command)
(out, err), proc = self.execute(command, retry = 1, with_lock = True)
# avoid RuntimeError that would result from
# changing loop subject during iteration
keys = list(dests.keys())
for d in keys:
if out.find(d) > -1:
del dests[d]
if not dests:
return []
retcod = dests.values()
if PY3: retcod = list(retcod)
return retcod