# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import (ResourceManager, clsinit_copy, ResourceState) from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus import collections import os import stat import random import re import tempfile import time import threading import traceback from six import PY3 # TODO: Unify delays!! # TODO: Validate outcome of uploads!! class ExitCode: """ Error codes that the rexitcode function can return if unable to check the exit code of a spawned process """ FILENOTFOUND = -1 CORRUPTFILE = -2 ERROR = -3 OK = 0 class OSType: """ Supported flavors of Linux OS """ DEBIAN = 1 UBUNTU = 1 << 1 FEDORA = 1 << 2 FEDORA_8 = 1 << 3 | FEDORA FEDORA_12 = 1 << 4 | FEDORA FEDORA_14 = 1 << 5 | FEDORA @clsinit_copy class LinuxNode(ResourceManager): """ .. class:: Class Args : :param ec: The Experiment controller :type ec: ExperimentController :param guid: guid of the RM :type guid: int .. note:: There are different ways in which commands can be executed using the LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run', 'run_and_wait'). Brief explanation: * 'execute' (blocking mode) : HOW IT WORKS: 'execute', forks a process and run the command, synchronously, attached to the terminal, in foreground. The execute method will block until the command returns the result on 'out', 'err' (so until it finishes executing). USAGE: short-lived commands that must be executed attached to a terminal and in foreground, for which it IS necessary to block until the command has finished (e.g. if you want to run 'ls' or 'cat'). * 'execute' (NON blocking mode - blocking = False) : HOW IT WORKS: Same as before, except that execute method will return immediately (even if command still running). USAGE: long-lived commands that must be executed attached to a terminal and in foreground, but for which it is not necessary to block until the command has finished. (e.g. start an application using X11 forwarding) * 'run' : HOW IT WORKS: Connects to the host ( using SSH if remote) and launches the command in background, detached from any terminal (daemonized), and returns. The command continues to run remotely, but since it is detached from the terminal, its pipes (stdin, stdout, stderr) can't be redirected to the console (as normal non detached processes would), and so they are explicitly redirected to files. The pidfile is created as part of the process of launching the command. The pidfile holds the pid and ppid of the process forked in background, so later on it is possible to check whether the command is still running. USAGE: long-lived commands that can run detached in background, for which it is NOT necessary to block (wait) until the command has finished. (e.g. start an application that is not using X11 forwarding. It can run detached and remotely in background) * 'run_and_wait' : HOW IT WORKS: Similar to 'run' except that it 'blocks' until the command has finished execution. It also checks whether errors occurred during runtime by reading the exitcode file, which contains the exit code of the command that was run (checking stderr only is not always reliable since many commands throw debugging info to stderr and the only way to automatically know whether an error really happened is to check the process exit code). Another difference with respect to 'run', is that instead of directly executing the command as a bash command line, it uploads the command to a bash script and runs the script. This allows to use the bash script to debug errors, since it remains at the remote host and can be run manually to reproduce the error. USAGE: medium-lived commands that can run detached in background, for which it IS necessary to block (wait) until the command has finished. (e.g. Package installation, source compilation, file download, etc) """ _rtype = "linux::Node" _help = "Controls Linux host machines ( either localhost or a host " \ "that can be accessed using a SSH key)" _platform = "linux" @classmethod def _register_attributes(cls): cls._register_attribute( Attribute("hostname", "Hostname of the machine", flags = Flags.Design)) cls._register_attribute( Attribute("username", "Local account username", flags = Flags.Credential)) cls._register_attribute( Attribute("port", "SSH port", flags = Flags.Design)) cls._register_attribute( Attribute("home", "Experiment home directory to store all experiment related files", flags = Flags.Design)) cls._register_attribute( Attribute("identity", "SSH identity file", flags = Flags.Credential)) cls._register_attribute( Attribute("serverKey", "Server public key", flags = Flags.Design)) cls._register_attribute( Attribute("cleanHome", "Remove all nepi files and directories " " from node home folder before starting experiment", type = Types.Bool, default = False, flags = Flags.Design)) cls._register_attribute( Attribute("cleanExperiment", "Remove all files and directories " " from a previous same experiment, before the new experiment starts", type = Types.Bool, default = False, flags = Flags.Design)) cls._register_attribute( Attribute("cleanProcesses", "Kill all running processes before starting experiment", type = Types.Bool, default = False, flags = Flags.Design)) cls._register_attribute( Attribute("cleanProcessesAfter", "Kill all running processes after starting experiment" "NOTE: This might be dangerous when using user root", type = Types.Bool, default = True, flags = Flags.Design)) cls._register_attribute( Attribute("tearDown", "Bash script to be executed before releasing the resource", flags = Flags.Design)) cls._register_attribute( Attribute("gatewayUser", "Gateway account username", flags = Flags.Design)) cls._register_attribute( Attribute("gateway", "Hostname of the gateway machine", flags = Flags.Design)) cls._register_attribute( Attribute("ip", "Linux host public IP address. " "Must not be modified by the user unless hostname is 'localhost'", flags = Flags.Design)) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None # home directory at Linux host self._home_dir = "" # lock to prevent concurrent applications on the same node, # to execute commands at the same time. There are potential # concurrency issues when using SSH to a same host from # multiple threads. There are also possible operational # issues, e.g. an application querying the existence # of a file or folder prior to its creation, and another # application creating the same file or folder in between. self._node_lock = threading.Lock() def log_message(self, msg): return " guid {} - host {} - {} "\ .format(self.guid, self.get("hostname"), msg) @property def home_dir(self): home = self.get("home") or "" if not home.startswith("/"): home = os.path.join(self._home_dir, home) return home @property def nepi_home(self): return os.path.join(self.home_dir, ".nepi") @property def usr_dir(self): return os.path.join(self.nepi_home, "nepi-usr") @property def lib_dir(self): return os.path.join(self.usr_dir, "lib") @property def bin_dir(self): return os.path.join(self.usr_dir, "bin") @property def src_dir(self): return os.path.join(self.usr_dir, "src") @property def share_dir(self): return os.path.join(self.usr_dir, "share") @property def exp_dir(self): return os.path.join(self.nepi_home, "nepi-exp") @property def exp_home(self): return os.path.join(self.exp_dir, self.ec.exp_id) @property def node_home(self): return os.path.join(self.exp_home, "node-{}".format(self.guid)) @property def run_home(self): return os.path.join(self.node_home, self.ec.run_id) @property def os(self): if self._os: return self._os if not self.localhost and not self.get("username"): msg = "Can't resolve OS, insufficient data " self.error(msg) raise RuntimeError(msg) out = self.get_os() if out.find("Debian") == 0: self._os = OSType.DEBIAN elif out.find("Ubuntu") == 0: self._os = OSType.UBUNTU elif out.find("Fedora release") == 0: self._os = OSType.FEDORA if out.find("Fedora release 8") == 0: self._os = OSType.FEDORA_8 elif out.find("Fedora release 12") == 0: self._os = OSType.FEDORA_12 elif out.find("Fedora release 14") == 0: self._os = OSType.FEDORA_14 else: msg = "Unsupported OS" self.error(msg, out) raise RuntimeError("{} - {} ".format(msg, out)) return self._os def get_os(self): # The underlying SSH layer will sometimes return an empty # output (even if the command was executed without errors). # To work arround this, repeat the operation N times or # until the result is not empty string out = "" try: (out, err), proc = self.execute("cat /etc/issue", with_lock = True, blocking = True) except: trace = traceback.format_exc() msg = "Error detecting OS: {} ".format(trace) self.error(msg, out, err) return out @property def use_deb(self): return (self.os & (OSType.DEBIAN | OSType.UBUNTU)) @property def use_rpm(self): return (self.os & OSType.FEDORA) @property def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.1', '::1'] def do_provision(self): # check if host is alive if not self.is_alive(): trace = traceback.format_exc() msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace) self.error(msg) raise RuntimeError(msg) self.find_home() if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): self.clean_home() if self.get("cleanExperiment"): self.clean_experiment() # Create shared directory structure and node home directory paths = [self.lib_dir, self.bin_dir, self.src_dir, self.share_dir, self.node_home] self.mkdir(paths) # Get Public IP address if possible if not self.get("ip"): try: ip = sshfuncs.gethostbyname(self.get("hostname")) self.set("ip", ip) except: if self.get("gateway") is None: msg = "Local DNS can not resolve hostname {}".format(self.get("hostname")) self.error(msg) super(LinuxNode, self).do_provision() def do_deploy(self): if self.state == ResourceState.NEW: self.info("Deploying node") self.do_discover() self.do_provision() # Node needs to wait until all associated interfaces are # ready before it can finalize deployment from nepi.resources.linux.interface import LinuxInterface ifaces = self.get_connected(LinuxInterface.get_rtype()) for iface in ifaces: if iface.state < ResourceState.READY: self.ec.schedule(self.reschedule_delay, self.deploy) return super(LinuxNode, self).do_deploy() def do_release(self): rms = self.get_connected() for rm in rms: # Node needs to wait until all associated RMs are released # before it can be released if rm.state != ResourceState.RELEASED: self.ec.schedule(self.reschedule_delay, self.release) return tear_down = self.get("tearDown") if tear_down: self.execute(tear_down) if self.get("cleanProcessesAfter"): self.clean_processes() super(LinuxNode, self).do_release() def valid_connection(self, guid): # TODO: Validate! return True def clean_processes(self): self.info("Cleaning up processes") if self.localhost: return if self.get("username") != 'root': cmd = ("sudo -S killall tcpdump || /bin/true ; " + "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " + "sudo -S killall -u {} || /bin/true ; ".format(self.get("username"))) else: if self.state >= ResourceState.READY: ######################## #Collect all process (must change for a more intelligent way) ppid = [] pids = [] avoid_pids = "ps axjf | awk '{print $1,$2}'" (out, err), proc = self.execute(avoid_pids) if len(out) != 0: for line in out.strip().split("\n"): parts = line.strip().split(" ") ppid.append(parts[0]) pids.append(parts[1]) #Collect all process below ssh -D tree_owner = 0 ssh_pids = [] sshs = "ps aux | grep 'sshd' | awk '{print $2,$12}'" (out, err), proc = self.execute(sshs) if len(out) != 0: for line in out.strip().split("\n"): parts = line.strip().split(" ") if parts[1].startswith('root@pts'): ssh_pids.append(parts[0]) elif parts[1] == "-D": tree_owner = parts[0] avoid_kill = [] temp = [] #Search for the child process of the pid's collected at the first block. for process in ssh_pids: temp = self.search_for_child(process, pids, ppid) avoid_kill = list(set(temp)) if len(avoid_kill) > 0: avoid_kill.append(tree_owner) ######################## import pickle with open("/tmp/save.proc", "rb") as pickle_file: pids = pickle.load(pickle_file) pids_temp = dict() ps_aux = "ps aux | awk '{print $2,$11}'" (out, err), proc = self.execute(ps_aux) if len(out) != 0: for line in out.strip().split("\n"): parts = line.strip().split(" ") pids_temp[parts[0]] = parts[1] # creates the difference between the machine pids freezed (pickle) and the actual # adding the avoided pids filtered above (avoid_kill) to allow users keep process # alive when using besides ssh connections kill_pids = set(pids_temp.items()) - set(pids.items()) # py2/py3 : keep it simple kill_pids = ' '.join(kill_pids) # removing pids from beside connections and its process kill_pids = kill_pids.split(' ') kill_pids = list(set(kill_pids) - set(avoid_kill)) kill_pids = ' '.join(kill_pids) cmd = ("killall tcpdump || /bin/true ; " + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " + "kill {} || /bin/true ; ".format(kill_pids)) else: cmd = ("killall tcpdump || /bin/true ; " + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ") else: cmd = ("killall tcpdump || /bin/true ; " + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ") (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) def search_for_child(self, pid, pids, ppid, family=None): """ Recursive function to search for child. List A contains the pids and list B the parents (ppid) """ family = family if family is not None else [] family.append(pid) for key, value in enumerate(ppid): if value == pid: child = pids[key] self.search_for_child(child, pids, ppid) return family def clean_home(self): """ Cleans all NEPI related folders in the Linux host """ self.info("Cleaning up home") cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\ .format(self.home_dir) return self.execute(cmd, with_lock = True) def clean_experiment(self): """ Cleans all experiment related files in the Linux host. It preserves NEPI files and folders that have a multi experiment scope. """ self.info("Cleaning up experiment files") cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\ .format(self.exp_dir, self.ec.exp_id) return self.execute(cmd, with_lock = True) def execute(self, command, sudo = False, env = None, tty = False, forward_x11 = False, retry = 3, connect_timeout = 30, strict_host_checking = False, persistent = True, blocking = True, with_lock = False ): """ Notice that this invocation will block until the execution finishes. If this is not the desired behavior, use 'run' instead.""" if self.localhost: (out, err), proc = execfuncs.lexec( command, user = self.get("username"), # still problem with localhost sudo = sudo, env = env) else: if with_lock: # If the execute command is blocking, we don't want to keep # the node lock. This lock is used to avoid race conditions # when creating the ControlMaster sockets. A more elegant # solution is needed. with self._node_lock: (out, err), proc = sshfuncs.rexec( command, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), gwuser = self.get("gatewayUser"), gw = self.get("gateway"), agent = True, sudo = sudo, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, tty = tty, forward_x11 = forward_x11, retry = retry, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, strict_host_checking = strict_host_checking ) else: (out, err), proc = sshfuncs.rexec( command, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), gwuser = self.get("gatewayUser"), gw = self.get("gateway"), agent = True, sudo = sudo, identity = self.get("identity"), server_key = self.get("serverKey"), env = env, tty = tty, forward_x11 = forward_x11, retry = retry, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, strict_host_checking = strict_host_checking ) return (out, err), proc def run(self, command, home, create_home = False, pidfile = 'pidfile', stdin = None, stdout = 'stdout', stderr = 'stderr', sudo = False, tty = False, strict_host_checking = False): self.debug("Running command '{}'".format(command)) if self.localhost: (out, err), proc = execfuncs.lspawn( command, pidfile, home = home, create_home = create_home, stdin = stdin or '/dev/null', stdout = stdout or '/dev/null', stderr = stderr or '/dev/null', sudo = sudo) else: with self._node_lock: (out, err), proc = sshfuncs.rspawn( command, pidfile = pidfile, home = home, create_home = create_home, stdin = stdin or '/dev/null', stdout = stdout or '/dev/null', stderr = stderr or '/dev/null', sudo = sudo, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), gwuser = self.get("gatewayUser"), gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey"), tty = tty, strict_host_checking = strict_host_checking ) return (out, err), proc def getpid(self, home, pidfile = "pidfile"): if self.localhost: pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) else: with self._node_lock: pidtuple = sshfuncs.rgetpid( os.path.join(home, pidfile), host = self.get("hostname"), user = self.get("username"), port = self.get("port"), gwuser = self.get("gatewayUser"), gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey"), strict_host_checking = False ) return pidtuple def status(self, pid, ppid): if self.localhost: status = execfuncs.lstatus(pid, ppid) else: with self._node_lock: status = sshfuncs.rstatus( pid, ppid, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), gwuser = self.get("gatewayUser"), gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey"), strict_host_checking = False ) return status def kill(self, pid, ppid, sudo = False): out = err = "" proc = None status = self.status(pid, ppid) if status == sshfuncs.ProcStatus.RUNNING: if self.localhost: (out, err), proc = execfuncs.lkill(pid, ppid, sudo) else: with self._node_lock: (out, err), proc = sshfuncs.rkill( pid, ppid, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), gwuser = self.get("gatewayUser"), gw = self.get("gateway"), agent = True, sudo = sudo, identity = self.get("identity"), server_key = self.get("serverKey"), strict_host_checking = False ) return (out, err), proc def copy(self, src, dst): if self.localhost: (out, err), proc = execfuncs.lcopy( src, dst, recursive = True) else: with self._node_lock: (out, err), proc = sshfuncs.rcopy( src, dst, port = self.get("port"), gwuser = self.get("gatewayUser"), gw = self.get("gateway"), identity = self.get("identity"), server_key = self.get("serverKey"), recursive = True, strict_host_checking = False) return (out, err), proc def upload(self, src, dst, text = False, overwrite = True, raise_on_error = True, executable = False): """ Copy content to destination src string with the content to copy. Can be: - plain text - a string with the path to a local file - a string with a semi-colon separeted list of local files - a string with a local directory dst string with destination path on the remote host (remote is always self.host) when src is text input, it gets stored into a temp file before uploading; in this case, and if executable is True, said temp file is made executable, and thus uploaded file will be too """ # If source is a string input f = None if text and not os.path.isfile(src): # src is text input that should be uploaded as file # create a temporal file with the content to upload # in python3 we need to open in binary mode if str is bytes mode = 'w' if isinstance(src, str) else 'wb' f = tempfile.NamedTemporaryFile(mode = mode, delete = False) f.write(src) f.close() if executable: # do something like chmod u+x mode = os.stat(f.name).st_mode mode |= stat.S_IXUSR os.chmod(f.name, mode) src = f.name # If dst files should not be overwritten, check that the files do not # exist already if isinstance(src, str): src = [s.strip() for s in src.split(";")] if overwrite == False: src = self.filter_existing_files(src, dst) if not src: return ("", ""), None if not self.localhost: # Build destination as @: dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst) ((out, err), proc) = self.copy(src, dst) # clean up temp file if f: os.remove(f.name) if err: msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst) self.error(msg, out, err) msg = "{} out: {} err: {}".format(msg, out, err) if raise_on_error: raise RuntimeError(msg) return ((out, err), proc) def download(self, src, dst, raise_on_error = True): if not self.localhost: # Build destination as @: src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src) ((out, err), proc) = self.copy(src, dst) if err: msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) self.error(msg, out, err) if raise_on_error: raise RuntimeError(msg) return ((out, err), proc) def install_packages_command(self, packages): command = "" if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) elif self.use_deb: command = debfuncs.install_packages_command(self.os, packages) else: msg = "Error installing packages ( OS not known ) " self.error(msg, self.os) raise RuntimeError(msg) return command def install_packages(self, packages, home, run_home = None, raise_on_error = True): """ Install packages in the Linux host. 'home' is the directory to upload the package installation script. 'run_home' is the directory from where to execute the script. """ command = self.install_packages_command(packages) run_home = run_home or home (out, err), proc = self.run_and_wait(command, run_home, shfile = os.path.join(home, "instpkg.sh"), pidfile = "instpkg_pidfile", ecodefile = "instpkg_exitcode", stdout = "instpkg_stdout", stderr = "instpkg_stderr", overwrite = False, raise_on_error = raise_on_error) return (out, err), proc def remove_packages(self, packages, home, run_home = None, raise_on_error = True): """ Uninstall packages from the Linux host. 'home' is the directory to upload the package un-installation script. 'run_home' is the directory from where to execute the script. """ if self.use_rpm: command = rpmfuncs.remove_packages_command(self.os, packages) elif self.use_deb: command = debfuncs.remove_packages_command(self.os, packages) else: msg = "Error removing packages ( OS not known ) " self.error(msg) raise RuntimeError(msg) run_home = run_home or home (out, err), proc = self.run_and_wait(command, run_home, shfile = os.path.join(home, "rmpkg.sh"), pidfile = "rmpkg_pidfile", ecodefile = "rmpkg_exitcode", stdout = "rmpkg_stdout", stderr = "rmpkg_stderr", overwrite = False, raise_on_error = raise_on_error) return (out, err), proc def mkdir(self, paths, clean = False): """ Paths is either a single remote directory path to create, or a list of directories to create. """ if clean: self.rmdir(paths) if isinstance(paths, str): paths = [paths] cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths]) return self.execute(cmd, with_lock = True) def rmdir(self, paths): """ Paths is either a single remote directory path to delete, or a list of directories to delete. """ if isinstance(paths, str): paths = [paths] cmd = " ; ".join(["rm -rf {}".format(path) for path in paths]) return self.execute(cmd, with_lock = True) def run_and_wait(self, command, home, shfile = "cmd.sh", env = None, overwrite = True, wait_run = True, pidfile = "pidfile", ecodefile = "exitcode", stdin = None, stdout = "stdout", stderr = "stderr", sudo = False, tty = False, raise_on_error = True): """ Uploads the 'command' to a bash script in the host. Then runs the script detached in background in the host, and busy-waites until the script finishes executing. """ if not shfile.startswith("/"): shfile = os.path.join(home, shfile) self.upload_command(command, shfile = shfile, ecodefile = ecodefile, env = env, overwrite = overwrite) command = "bash {}".format(shfile) # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, stdin = stdin, stdout = stdout, stderr = stderr, sudo = sudo, tty = tty) # check no errors occurred if proc.poll(): msg = " Failed to run command '{}' ".format(command) self.error(msg, out, err) if raise_on_error: raise RuntimeError(msg) # Wait for pid file to be generated pid, ppid = self.wait_pid( home = home, pidfile = pidfile, raise_on_error = raise_on_error) if wait_run: # wait until command finishes to execute self.wait_run(pid, ppid) (eout, err), proc = self.check_errors(home, ecodefile = ecodefile, stderr = stderr) # Out is what was written in the stderr file if err: msg = " Failed to run command '{}' ".format(command) self.error(msg, eout, err) if raise_on_error: raise RuntimeError(msg) (out, oerr), proc = self.check_output(home, stdout) return (out, err), proc def exitcode(self, home, ecodefile = "exitcode"): """ Get the exit code of an application. Returns an integer value with the exit code """ (out, err), proc = self.check_output(home, ecodefile) # Succeeded to open file, return exit code in the file if proc.wait() == 0: try: return int(out.strip()) except: # Error in the content of the file! return ExitCode.CORRUPTFILE # No such file or directory if proc.returncode == 1: return ExitCode.FILENOTFOUND # Other error from 'cat' return ExitCode.ERROR def upload_command(self, command, shfile = "cmd.sh", ecodefile = "exitcode", overwrite = True, env = None): """ Saves the command as a bash script file in the remote host, and forces to save the exit code of the command execution to the ecodefile """ if not (command.strip().endswith(";") or command.strip().endswith("&")): command += ";" # The exit code of the command will be stored in ecodefile command = " {{ {command} }} ; echo $? > {ecodefile} ;"\ .format(command = command, ecodefile = ecodefile) # Export environment environ = self.format_environment(env) # Add environ to command command = environ + command return self.upload(command, shfile, text = True, overwrite = overwrite) def format_environment(self, env, inline = False): """ Formats the environment variables for a command to be executed either as an inline command (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n) """ if not env: return "" # Remove extra white spaces env = re.sub(r'\s+', ' ', env.strip()) sep = ";" if inline else "\n" return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep def check_errors(self, home, ecodefile = "exitcode", stderr = "stderr"): """ Checks whether errors occurred while running a command. It first checks the exit code for the command, and only if the exit code is an error one it returns the error output. """ proc = None err = "" # get exit code saved in the 'exitcode' file ecode = self.exitcode(home, ecodefile) if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]: err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile) elif ecode > 0 or ecode == ExitCode.FILENOTFOUND: # The process returned an error code or didn't exist. # Check standard error. (err, eerr), proc = self.check_output(home, stderr) # If the stderr file was not found, assume nothing bad happened, # and just ignore the error. # (cat returns 1 for error "No such file or directory") if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: err = "" return ("", err), proc def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): """ Waits until the pid file for the command is generated, and returns the pid and ppid of the process """ pid = ppid = None delay = 1.0 for i in range(2): pidtuple = self.getpid(home = home, pidfile = pidfile) if pidtuple: pid, ppid = pidtuple break else: time.sleep(delay) delay = delay * 1.5 else: msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile ) self.error(msg) if raise_on_error: raise RuntimeError(msg) return pid, ppid def wait_run(self, pid, ppid, trial = 0): """ wait for a remote process to finish execution """ delay = 1.0 while True: status = self.status(pid, ppid) if status is ProcStatus.FINISHED: break elif status is not ProcStatus.RUNNING: delay = delay * 1.5 time.sleep(delay) # If it takes more than 20 seconds to start, then # asume something went wrong if delay > 20: break else: # The app is running, just wait... time.sleep(0.5) def check_output(self, home, filename): """ Retrives content of file """ (out, err), proc = self.execute( "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True) return (out, err), proc def is_alive(self): """ Checks if host is responsive """ if self.localhost: return True out = err = "" msg = "Unresponsive host. Wrong answer. " # The underlying SSH layer will sometimes return an empty # output (even if the command was executed without errors). # To work arround this, repeat the operation N times or # until the result is not empty string try: (out, err), proc = self.execute("echo 'ALIVE'", blocking = True, with_lock = True) if out.find("ALIVE") > -1: return True except: trace = traceback.format_exc() msg = "Unresponsive host. Error reaching host: {} ".format(trace) self.error(msg, out, err) return False def find_home(self): """ Retrieves host home directory """ # The underlying SSH layer will sometimes return an empty # output (even if the command was executed without errors). # To work arround this, repeat the operation N times or # until the result is not empty string msg = "Impossible to retrieve HOME directory" try: (out, err), proc = self.execute("echo ${HOME}", blocking = True, with_lock = True) if out.strip() != "": self._home_dir = out.strip() except: trace = traceback.format_exc() msg = "Impossible to retrieve HOME directory {}".format(trace) if not self._home_dir: self.error(msg) raise RuntimeError(msg) def filter_existing_files(self, src, dst): """ Removes files that already exist in the Linux host from src list """ # construct a dictionary with { dst: src } dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \ if len(src) > 1 else {dst: src[0]} command = [] for d in dests: command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst = d) ) command = ";".join(command) (out, err), proc = self.execute(command, retry = 1, with_lock = True) # avoid RuntimeError that would result from # changing loop subject during iteration keys = list(dests.keys()) for d in keys: if out.find(d) > -1: del dests[d] if not dests: return [] retcod = dests.values() if PY3: retcod = list(retcod) return retcod