X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=0896ed0447cb46e65a50bdde44cfb01ac449d732;hb=eb1ef58fe34bd9b72308e29e4cfceb42c634f040;hp=a96267f1e5192432420eab79ddb9a53c39d61e0a;hpb=25a46fdfb9753474f35fa2dc677182f8500c52a9;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index a96267f1..0896ed04 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -17,8 +16,9 @@ # # Author: Alina Quereilhac -from nepi.execution.attribute import Attribute, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import (ResourceManager, clsinit_copy, + ResourceState) from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -30,14 +30,11 @@ import re import tempfile import time import threading +import traceback -# TODO: Verify files and dirs exists already -# TODO: Blacklist nodes! # TODO: Unify delays!! # TODO: Validate outcome of uploads!! -reschedule_delay = "0.5s" - class ExitCode: """ Error codes that the rexitcode function can return if unable to @@ -52,13 +49,14 @@ class OSType: """ Supported flavors of Linux OS """ - FEDORA_12 = "f12" - FEDORA_14 = "f14" - FEDORA = "fedora" - UBUNTU = "ubuntu" - DEBIAN = "debian" - -@clsinit + DEBIAN = 1 + UBUNTU = 1 << 1 + FEDORA = 1 << 2 + FEDORA_8 = 1 << 3 | FEDORA + FEDORA_12 = 1 << 4 | FEDORA + FEDORA_14 = 1 << 5 | FEDORA + +@clsinit_copy class LinuxNode(ResourceManager): """ .. class:: Class Args : @@ -142,201 +140,571 @@ class LinuxNode(ResourceManager): source compilation, file download, etc) """ - _rtype = "LinuxNode" + _rtype = "linux::Node" + _help = "Controls Linux host machines ( either localhost or a host " \ + "that can be accessed using a SSH key)" + _platform = "linux" @classmethod def _register_attributes(cls): - hostname = Attribute("hostname", "Hostname of the machine", - flags = Flags.ExecReadOnly) + cls._register_attribute(Attribute( + "hostname", "Hostname of the machine", + flags = Flags.Design)) - username = Attribute("username", "Local account username", - flags = Flags.Credential) + cls._register_attribute(Attribute( + "username", "Local account username", + flags = Flags.Credential)) - port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly) + cls._register_attribute(Attribute( + "port", "SSH port", + flags = Flags.Design)) - home = Attribute("home", - "Experiment home directory to store all experiment related files", - flags = Flags.ExecReadOnly) + cls._register_attribute(Attribute( + "home", + "Experiment home directory to store all experiment related files", + flags = Flags.Design)) - identity = Attribute("identity", "SSH identity file", - flags = Flags.Credential) + cls._register_attribute(Attribute( + "identity", "SSH identity file", + flags = Flags.Credential)) - server_key = Attribute("serverKey", "Server public key", - flags = Flags.ExecReadOnly) + cls._register_attribute(Attribute( + "serverKey", "Server public key", + flags = Flags.Design)) - clean_home = Attribute("cleanHome", "Remove all files and directories " + \ - " from home folder before starting experiment", - flags = Flags.ExecReadOnly) + cls._register_attribute(Attribute( + "cleanHome", + "Remove all nepi files and directories " + " from node home folder before starting experiment", + type = Types.Bool, + default = False, + flags = Flags.Design)) + + cls._register_attribute(Attribute( + "cleanExperiment", "Remove all files and directories " + " from a previous same experiment, before the new experiment starts", + type = Types.Bool, + default = False, + flags = Flags.Design)) - clean_processes = Attribute("cleanProcesses", - "Kill all running processes before starting experiment", - flags = Flags.ExecReadOnly) + cls._register_attribute(Attribute( + "cleanProcesses", + "Kill all running processes before starting experiment", + type = Types.Bool, + default = False, + flags = Flags.Design)) - tear_down = Attribute("tearDown", "Bash script to be executed before " + \ - "releasing the resource", - flags = Flags.ExecReadOnly) - - cls._register_attribute(hostname) - cls._register_attribute(username) - cls._register_attribute(port) - cls._register_attribute(home) - cls._register_attribute(identity) - cls._register_attribute(server_key) - cls._register_attribute(clean_home) - cls._register_attribute(clean_processes) - cls._register_attribute(tear_down) + cls._register_attribute(Attribute( + "cleanProcessesAfter", + """Kill all running processes after starting experiment + This might be dangerous when using user root""", + type = Types.Bool, + default = True, + flags = Flags.Design)) + + cls._register_attribute(Attribute( + "tearDown", + "Bash script to be executed before releasing the resource", + flags = Flags.Design)) + + cls._register_attribute(Attribute( + "gatewayUser", + "Gateway account username", + flags = Flags.Design)) + + cls._register_attribute(Attribute( + "gateway", + "Hostname of the gateway machine", + flags = Flags.Design)) + + cls._register_attribute(Attribute( + "ip", + "Linux host public IP address. " + "Must not be modified by the user unless hostname is 'localhost'", + flags = Flags.Design)) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None + # home directory at Linux host + self._home_dir = "" + + # lock to prevent concurrent applications on the same node, + # to execute commands at the same time. There are potential + # concurrency issues when using SSH to a same host from + # multiple threads. There are also possible operational + # issues, e.g. an application querying the existence + # of a file or folder prior to its creation, and another + # application creating the same file or folder in between. + self._node_lock = threading.Lock() - # lock to avoid concurrency issues on methods used by applications - self._lock = threading.Lock() - def log_message(self, msg): - return " guid %d - host %s - %s " % (self.guid, - self.get("hostname"), msg) + return " guid {} - host {} - {} "\ + .format(self.guid, self.get("hostname"), msg) + + @property + def home_dir(self): + home = self.get("home") or "" + if not home.startswith("/"): + home = os.path.join(self._home_dir, home) + return home + + @property + def nepi_home(self): + return os.path.join(self.home_dir, ".nepi") + + @property + def usr_dir(self): + return os.path.join(self.nepi_home, "nepi-usr") + + @property + def lib_dir(self): + return os.path.join(self.usr_dir, "lib") + + @property + def bin_dir(self): + return os.path.join(self.usr_dir, "bin") + + @property + def src_dir(self): + return os.path.join(self.usr_dir, "src") @property - def home(self): - return self.get("home") or "" + def share_dir(self): + return os.path.join(self.usr_dir, "share") + + @property + def exp_dir(self): + return os.path.join(self.nepi_home, "nepi-exp") @property def exp_home(self): - return os.path.join(self.home, self.ec.exp_id) + return os.path.join(self.exp_dir, self.ec.exp_id) @property def node_home(self): - node_home = "node-%d" % self.guid - return os.path.join(self.exp_home, node_home) + return os.path.join(self.exp_home, "node-{}".format(self.guid)) + + @property + def run_home(self): + return os.path.join(self.node_home, self.ec.run_id) @property def os(self): if self._os: return self._os - if (not self.get("hostname") or not self.get("username")): + if not self.localhost and not self.get("username"): msg = "Can't resolve OS, insufficient data " self.error(msg) raise RuntimeError, msg - (out, err), proc = self.execute("cat /etc/issue", with_lock = True) + out = self.get_os() - if err and proc.poll(): - msg = "Error detecting OS " - self.error(msg, out, err) - raise RuntimeError, "%s - %s - %s" %( msg, out, err ) - - if out.find("Fedora release 12") == 0: - self._os = OSType.FEDORA_12 - elif out.find("Fedora release 14") == 0: - self._os = OSType.FEDORA_14 - elif out.find("Debian") == 0: + if out.find("Debian") == 0: self._os = OSType.DEBIAN elif out.find("Ubuntu") ==0: self._os = OSType.UBUNTU + elif out.find("Fedora release") == 0: + self._os = OSType.FEDORA + if out.find("Fedora release 8") == 0: + self._os = OSType.FEDORA_8 + elif out.find("Fedora release 12") == 0: + self._os = OSType.FEDORA_12 + elif out.find("Fedora release 14") == 0: + self._os = OSType.FEDORA_14 else: msg = "Unsupported OS" self.error(msg, out) - raise RuntimeError, "%s - %s " %( msg, out ) + raise RuntimeError("{} - {} ".format(msg, out)) return self._os + def get_os(self): + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + out = "" + try: + (out, err), proc = self.execute("cat /etc/issue", + with_lock = True, + blocking = True) + except: + trace = traceback.format_exc() + msg = "Error detecting OS: {} ".format(trace) + self.error(msg, out, err) + + return out + + @property + def use_deb(self): + return (self.os & (OSType.DEBIAN|OSType.UBUNTU)) + + @property + def use_rpm(self): + return (self.os & OSType.FEDORA) + @property def localhost(self): - return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] + return self.get("hostname") in ['localhost', '127.0.0.1', '::1'] - def provision(self): + def do_provision(self): + # check if host is alive if not self.is_alive(): - self._state = ResourceState.FAILED - msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") + trace = traceback.format_exc() + msg = "Deploy failed. Unresponsive node {} -- traceback {}".format(self.get("hostname"), trace) self.error(msg) raise RuntimeError, msg + self.find_home() + if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): self.clean_home() - - self.mkdir(self.node_home) + + if self.get("cleanExperiment"): + self.clean_experiment() + + # Create shared directory structure and node home directory + paths = [self.lib_dir, + self.bin_dir, + self.src_dir, + self.share_dir, + self.node_home] - super(LinuxNode, self).provision() + self.mkdir(paths) - def deploy(self): - if self.state == ResourceState.NEW: + # Get Public IP address if possible + if not self.get("ip"): try: - self.discover() - self.provision() + ip = sshfuncs.gethostbyname(self.get("hostname")) + self.set("ip", ip) except: - self._state = ResourceState.FAILED - raise + if self.get("gateway") is None: + msg = "Local DNS can not resolve hostname {}".format(self.get("hostname")) + self.error(msg) + + super(LinuxNode, self).do_provision() + + def do_deploy(self): + if self.state == ResourceState.NEW: + self.info("Deploying node") + self.do_discover() + self.do_provision() # Node needs to wait until all associated interfaces are # ready before it can finalize deployment from nepi.resources.linux.interface import LinuxInterface - ifaces = self.get_connected(LinuxInterface.rtype()) + ifaces = self.get_connected(LinuxInterface.get_rtype()) for iface in ifaces: if iface.state < ResourceState.READY: - self.ec.schedule(reschedule_delay, self.deploy) + self.ec.schedule(self.reschedule_delay, self.deploy) return - super(LinuxNode, self).deploy() + super(LinuxNode, self).do_deploy() + + def do_release(self): + rms = self.get_connected() + for rm in rms: + # Node needs to wait until all associated RMs are released + # before it can be released + if rm.state != ResourceState.RELEASED: + self.ec.schedule(self.reschedule_delay, self.release) + return - def release(self): tear_down = self.get("tearDown") if tear_down: self.execute(tear_down) - super(LinuxNode, self).release() + if self.get("cleanProcessesAfter"): + self.clean_processes() + + super(LinuxNode, self).do_release() def valid_connection(self, guid): # TODO: Validate! return True - def clean_processes(self, killer = False): + def clean_processes(self): self.info("Cleaning up processes") + + if self.localhost: + return - if killer: - # Hardcore kill - cmd = ("sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; ") - else: - # Be gentler... + if self.get("username") != 'root': cmd = ("sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall -u %s || /bin/true ; " % self.get("username") + - "sudo -S killall -u %s || /bin/true ; " % self.get("username")) + "sudo -S kill -9 $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " + + "sudo -S killall -u {} || /bin/true ; ".format(self.get("username"))) + else: + if self.state >= ResourceState.READY: + import pickle + pids = pickle.load(open("/tmp/save.proc", "rb")) + pids_temp = dict() + ps_aux = "ps aux |awk '{print $2,$11}'" + (out, err), proc = self.execute(ps_aux) + if len(out) != 0: + for line in out.strip().split("\n"): + parts = line.strip().split(" ") + pids_temp[parts[0]] = parts[1] + kill_pids = set(pids_temp.items()) - set(pids.items()) + kill_pids = ' '.join(dict(kill_pids).keys()) + + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; " + + "kill {} || /bin/true ; ".format(kill_pids)) + else: + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ") + else: + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[.]nepi' | awk '{print $2}') || /bin/true ; ") + + (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) - out = err = "" - (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) - def clean_home(self): + """ Cleans all NEPI related folders in the Linux host + """ self.info("Cleaning up home") - cmd = ( - # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" + - "find . -maxdepth 1 -name 'nepi-*' " + - " -execdir rm -rf {} + " - ) - - if self.home: - cmd = "cd %s ; " % self.home + cmd + cmd = "cd {} ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {{}} + "\ + .format(self.home_dir) + + return self.execute(cmd, with_lock = True) + + def clean_experiment(self): + """ Cleans all experiment related files in the Linux host. + It preserves NEPI files and folders that have a multi experiment + scope. + """ + self.info("Cleaning up experiment files") + + cmd = "cd {} ; find . -maxdepth 1 -name '{}' -execdir rm -rf {{}} + "\ + .format(self.exp_dir, self.ec.exp_id) + + return self.execute(cmd, with_lock = True) + def execute(self, command, + sudo = False, + env = None, + tty = False, + forward_x11 = False, + retry = 3, + connect_timeout = 30, + strict_host_checking = False, + persistent = True, + blocking = True, + with_lock = False + ): + """ Notice that this invocation will block until the + execution finishes. If this is not the desired behavior, + use 'run' instead.""" + + if self.localhost: + (out, err), proc = execfuncs.lexec( + command, + user = self.get("username"), # still problem with localhost + sudo = sudo, + env = env) + else: + if with_lock: + # If the execute command is blocking, we don't want to keep + # the node lock. This lock is used to avoid race conditions + # when creating the ControlMaster sockets. A more elegant + # solution is needed. + with self._node_lock: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + retry = retry, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + else: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + retry = retry, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + + return (out, err), proc + + def run(self, command, home, + create_home = False, + pidfile = 'pidfile', + stdin = None, + stdout = 'stdout', + stderr = 'stderr', + sudo = False, + tty = False, + strict_host_checking = False): + + self.debug("Running command '{}'".format(command)) + + if self.localhost: + (out, err), proc = execfuncs.lspawn( + command, pidfile, + home = home, + create_home = create_home, + stdin = stdin or '/dev/null', + stdout = stdout or '/dev/null', + stderr = stderr or '/dev/null', + sudo = sudo) + else: + with self._node_lock: + (out, err), proc = sshfuncs.rspawn( + command, + pidfile = pidfile, + home = home, + create_home = create_home, + stdin = stdin or '/dev/null', + stdout = stdout or '/dev/null', + stderr = stderr or '/dev/null', + sudo = sudo, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey"), + tty = tty, + strict_host_checking = strict_host_checking + ) + + return (out, err), proc + + def getpid(self, home, pidfile = "pidfile"): + if self.localhost: + pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) + else: + with self._node_lock: + pidtuple = sshfuncs.rgetpid( + os.path.join(home, pidfile), + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey"), + strict_host_checking = False + ) + + return pidtuple + + def status(self, pid, ppid): + if self.localhost: + status = execfuncs.lstatus(pid, ppid) + else: + with self._node_lock: + status = sshfuncs.rstatus( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey"), + strict_host_checking = False + ) + + return status + + def kill(self, pid, ppid, sudo = False): out = err = "" - (out, err), proc = self.execute(cmd, with_lock = True) + proc = None + status = self.status(pid, ppid) - def upload(self, src, dst, text = False): + if status == sshfuncs.ProcStatus.RUNNING: + if self.localhost: + (out, err), proc = execfuncs.lkill(pid, ppid, sudo) + else: + with self._node_lock: + (out, err), proc = sshfuncs.rkill( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey"), + strict_host_checking = False + ) + + return (out, err), proc + + def copy(self, src, dst): + if self.localhost: + (out, err), proc = execfuncs.lcopy( + src, dst, + recursive = True) + else: + with self._node_lock: + (out, err), proc = sshfuncs.rcopy( + src, dst, + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + identity = self.get("identity"), + server_key = self.get("serverKey"), + recursive = True, + strict_host_checking = False) + + return (out, err), proc + + def upload(self, src, dst, text = False, overwrite = True, + raise_on_error = True): """ Copy content to destination - src content to copy. Can be a local file, directory or a list of files + src string with the content to copy. Can be: + - plain text + - a string with the path to a local file + - a string with a semi-colon separeted list of local files + - a string with a local directory - dst destination path on the remote host (remote is always self.host) + dst string with destination path on the remote host (remote is + always self.host) - text src is text input, it must be stored into a temp file before uploading + text src is text input, it must be stored into a temp file before + uploading """ # If source is a string input f = None @@ -348,138 +716,214 @@ class LinuxNode(ResourceManager): f.close() src = f.name + # If dst files should not be overwritten, check that the files do not + # exits already + if isinstance(src, str): + src = map(str.strip, src.split(";")) + + if overwrite == False: + src = self.filter_existing_files(src, dst) + if not src: + return ("", ""), None + if not self.localhost: # Build destination as @: - dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) - result = self.copy(src, dst) + dst = "{}@{}:{}".format(self.get("username"), self.get("hostname"), dst) + + ((out, err), proc) = self.copy(src, dst) # clean up temp file if f: os.remove(f.name) - return result + if err: + msg = " Failed to upload files - src: {} dst: {}".format(";".join(src), dst) + self.error(msg, out, err) + + msg = "{} out: {} err: {}".format(msg, out, err) + if raise_on_error: + raise RuntimeError, msg + + return ((out, err), proc) - def download(self, src, dst): + def download(self, src, dst, raise_on_error = True): if not self.localhost: # Build destination as @: - src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) - return self.copy(src, dst) + src = "{}@{}:{}".format(self.get("username"), self.get("hostname"), src) + + ((out, err), proc) = self.copy(src, dst) + + if err: + msg = " Failed to download files - src: {} dst: {}".format(";".join(src), dst) + self.error(msg, out, err) - def install_packages(self, packages, home): + if raise_on_error: + raise RuntimeError, msg + + return ((out, err), proc) + + def install_packages_command(self, packages): command = "" - if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) - elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + elif self.use_deb: command = debfuncs.install_packages_command(self.os, packages) else: msg = "Error installing packages ( OS not known ) " self.error(msg, self.os) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "instpkg.sh", - pidfile = "instpkg_pidfile", - ecodefile = "instpkg_exitcode", - stdout = "instpkg_stdout", - stderr = "instpkg_stderr", - raise_on_error = True) + return command + + def install_packages(self, packages, home, + run_home = None, + raise_on_error = True): + """ Install packages in the Linux host. + + 'home' is the directory to upload the package installation script. + 'run_home' is the directory from where to execute the script. + """ + command = self.install_packages_command(packages) + + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "instpkg.sh"), + pidfile = "instpkg_pidfile", + ecodefile = "instpkg_exitcode", + stdout = "instpkg_stdout", + stderr = "instpkg_stderr", + overwrite = False, + raise_on_error = raise_on_error) return (out, err), proc - def remove_packages(self, packages, home): - command = "" - if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + def remove_packages(self, packages, home, run_home = None, + raise_on_error = True): + """ Uninstall packages from the Linux host. + + 'home' is the directory to upload the package un-installation script. + 'run_home' is the directory from where to execute the script. + """ + if self.use_rpm: command = rpmfuncs.remove_packages_command(self.os, packages) - elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + elif self.use_deb: command = debfuncs.remove_packages_command(self.os, packages) else: msg = "Error removing packages ( OS not known ) " self.error(msg) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "rmpkg.sh", - pidfile = "rmpkg_pidfile", - ecodefile = "rmpkg_exitcode", - stdout = "rmpkg_stdout", - stderr = "rmpkg_stderr", - raise_on_error = True) - + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "rmpkg.sh"), + pidfile = "rmpkg_pidfile", + ecodefile = "rmpkg_exitcode", + stdout = "rmpkg_stdout", + stderr = "rmpkg_stderr", + overwrite = False, + raise_on_error = raise_on_error) + return (out, err), proc - def mkdir(self, path, clean = False): + def mkdir(self, paths, clean = False): + """ Paths is either a single remote directory path to create, + or a list of directories to create. + """ if clean: - self.rmdir(path) + self.rmdir(paths) - return self.execute("mkdir -p %s" % path, with_lock = True) + if isinstance(paths, str): + paths = [paths] - def rmdir(self, path): - return self.execute("rm -rf %s" % path, with_lock = True) - + cmd = " ; ".join(["mkdir -p {}".format(path) for path in paths]) + + return self.execute(cmd, with_lock = True) + + def rmdir(self, paths): + """ Paths is either a single remote directory path to delete, + or a list of directories to delete. + """ + + if isinstance(paths, str): + paths = [paths] + + cmd = " ; ".join(map(lambda path: "rm -rf {}".format(path), paths)) + + return self.execute(cmd, with_lock = True) + def run_and_wait(self, command, home, - shfile = "cmd.sh", - env = None, - pidfile = "pidfile", - ecodefile = "exitcode", - stdin = None, - stdout = "stdout", - stderr = "stderr", - sudo = False, - tty = False, - raise_on_error = False): + shfile="cmd.sh", + env=None, + overwrite=True, + wait_run=True, + pidfile="pidfile", + ecodefile="exitcode", + stdin=None, + stdout="stdout", + stderr="stderr", + sudo=False, + tty=False, + raise_on_error=True): """ Uploads the 'command' to a bash script in the host. Then runs the script detached in background in the host, and busy-waites until the script finishes executing. """ - self.upload_command(command, home, - shfile = shfile, - ecodefile = ecodefile, - env = env) - command = "bash ./%s" % shfile + if not shfile.startswith("/"): + shfile = os.path.join(home, shfile) + + self.upload_command(command, + shfile = shfile, + ecodefile = ecodefile, + env = env, + overwrite = overwrite) + + command = "bash {}".format(shfile) # run command in background in remote host (out, err), proc = self.run(command, home, - pidfile = pidfile, - stdin = stdin, - stdout = stdout, - stderr = stderr, - sudo = sudo, - tty = tty) + pidfile = pidfile, + stdin = stdin, + stdout = stdout, + stderr = stderr, + sudo = sudo, + tty = tty) # check no errors occurred if proc.poll(): - msg = " Failed to run command '%s' " % command + msg = " Failed to run command '{}' ".format(command) self.error(msg, out, err) if raise_on_error: raise RuntimeError, msg # Wait for pid file to be generated pid, ppid = self.wait_pid( - home = home, - pidfile = pidfile, - raise_on_error = raise_on_error) + home = home, + pidfile = pidfile, + raise_on_error = raise_on_error) - # wait until command finishes to execute - self.wait_run(pid, ppid) - - (out, err), proc = self.check_errors(home, - ecodefile = ecodefile, - stdout = stdout, - stderr= stderr) + if wait_run: + # wait until command finishes to execute + self.wait_run(pid, ppid) + + (eout, err), proc = self.check_errors(home, + ecodefile = ecodefile, + stderr = stderr) - # Out is what was written in the stderr file - if err: - msg = " Failed to run command '%s' " % command - self.error(msg, out, err) + # Out is what was written in the stderr file + if err: + msg = " Failed to run command '{}' ".format(command) + self.error(msg, eout, err) - if raise_on_error: - raise RuntimeError, msg + if raise_on_error: + raise RuntimeError, msg + + (out, oerr), proc = self.check_output(home, stdout) return (out, err), proc - + def exitcode(self, home, ecodefile = "exitcode"): """ Get the exit code of an application. @@ -502,22 +946,21 @@ class LinuxNode(ResourceManager): # Other error from 'cat' return ExitCode.ERROR - def upload_command(self, command, home, - shfile = "cmd.sh", - ecodefile = "exitcode", - env = None): + def upload_command(self, command, + shfile="cmd.sh", + ecodefile="exitcode", + overwrite=True, + env=None): """ Saves the command as a bash script file in the remote host, and forces to save the exit code of the command execution to the ecodefile """ if not (command.strip().endswith(";") or command.strip().endswith("&")): command += ";" - + # The exit code of the command will be stored in ecodefile - command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % { - 'command': command, - 'ecodefile': ecodefile, - } + command = " {{ {command} }} ; echo $? > {ecodefile} ;"\ + .format(command=command, ecodefile=ecodefile) # Export environment environ = self.format_environment(env) @@ -525,12 +968,11 @@ class LinuxNode(ResourceManager): # Add environ to command command = environ + command - dst = os.path.join(home, shfile) - return self.upload(command, dst, text = True) + return self.upload(command, shfile, text=True, overwrite=overwrite) - def format_environment(self, env, inline = False): - """Format environmental variables for command to be executed either - as an inline command + def format_environment(self, env, inline=False): + """ Formats the environment variables for a command to be executed + either as an inline command (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n) """ @@ -540,28 +982,24 @@ class LinuxNode(ResourceManager): env = re.sub(r'\s+', ' ', env.strip()) sep = ";" if inline else "\n" - return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep + return sep.join([" export {}".format(e) for e in env.split(" ")]) + sep def check_errors(self, home, - ecodefile = "exitcode", - stdout = "stdout", - stderr = "stderr"): - """ - Checks whether errors occurred while running a command. + ecodefile = "exitcode", + stderr = "stderr"): + """ Checks whether errors occurred while running a command. It first checks the exit code for the command, and only if the exit code is an error one it returns the error output. """ proc = None err = "" - # retrive standard output from the file - (out, oerr), oproc = self.check_output(home, stdout) # get exit code saved in the 'exitcode' file ecode = self.exitcode(home, ecodefile) if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]: - err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile) + err = "Error retrieving exit code status from file {}/{}".format(home, ecodefile) elif ecode > 0 or ecode == ExitCode.FILENOTFOUND: # The process returned an error code or didn't exist. # Check standard error. @@ -572,16 +1010,16 @@ class LinuxNode(ResourceManager): # (cat returns 1 for error "No such file or directory") if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: err = "" - - return (out, err), proc - + + return ("", err), proc + def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): """ Waits until the pid file for the command is generated, and returns the pid and ppid of the process """ pid = ppid = None delay = 1.0 - for i in xrange(4): + for i in xrange(2): pidtuple = self.getpid(home = home, pidfile = pidfile) if pidtuple: @@ -591,10 +1029,9 @@ class LinuxNode(ResourceManager): time.sleep(delay) delay = delay * 1.5 else: - msg = " Failed to get pid for pidfile %s/%s " % ( - home, pidfile ) + msg = " Failed to get pid for pidfile {}/{} ".format(home, pidfile ) self.error(msg) - + if raise_on_error: raise RuntimeError, msg @@ -602,7 +1039,7 @@ class LinuxNode(ResourceManager): def wait_run(self, pid, ppid, trial = 0): """ wait for a remote process to finish execution """ - start_delay = 1.0 + delay = 1.0 while True: status = self.status(pid, ppid) @@ -622,220 +1059,82 @@ class LinuxNode(ResourceManager): def check_output(self, home, filename): """ Retrives content of file """ - (out, err), proc = self.execute("cat %s" % - os.path.join(home, filename), retry = 1, with_lock = True) + (out, err), proc = self.execute( + "cat {}".format(os.path.join(home, filename)), retry = 1, with_lock = True) return (out, err), proc def is_alive(self): + """ Checks if host is responsive + """ if self.localhost: return True out = err = "" + msg = "Unresponsive host. Wrong answer. " + + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string try: - # TODO: FIX NOT ALIVE!!!! - (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, - with_lock = True) + (out, err), proc = self.execute("echo 'ALIVE'", + blocking = True, + with_lock = True) + + if out.find("ALIVE") > -1: + return True except: - import traceback trace = traceback.format_exc() - msg = "Unresponsive host %s " % err - self.error(msg, out, trace) - return False - - if out.strip().startswith('ALIVE'): - return True - else: - msg = "Unresponsive host " - self.error(msg, out, err) - return False - - def copy(self, src, dst): - if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, - recursive = True, - strict_host_checking = False) - else: - with self._lock: - (out, err), proc = sshfuncs.rcopy( - src, dst, - port = self.get("port"), - identity = self.get("identity"), - server_key = self.get("serverKey"), - recursive = True, - strict_host_checking = False) + msg = "Unresponsive host. Error reaching host: {} ".format(trace) - return (out, err), proc + self.error(msg, out, err) + return False - def execute(self, command, - sudo = False, - stdin = None, - env = None, - tty = False, - forward_x11 = False, - timeout = None, - retry = 3, - err_on_timeout = True, - connect_timeout = 30, - strict_host_checking = False, - persistent = True, - blocking = True, - with_lock = False - ): - """ Notice that this invocation will block until the - execution finishes. If this is not the desired behavior, - use 'run' instead.""" + def find_home(self): + """ + Retrieves host home directory + """ + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + msg = "Impossible to retrieve HOME directory" + try: + (out, err), proc = self.execute("echo ${HOME}", + blocking = True, + with_lock = True) + + if out.strip() != "": + self._home_dir = out.strip() + except: + trace = traceback.format_exc() + msg = "Impossible to retrieve HOME directory {}".format(trace) - if self.localhost: - (out, err), proc = execfuncs.lexec(command, - user = user, - sudo = sudo, - stdin = stdin, - env = env) - else: - if with_lock: - with self._lock: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) - else: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) + if not self._home_dir: + self.error(msg) + raise RuntimeError, msg - return (out, err), proc + def filter_existing_files(self, src, dst): + """ Removes files that already exist in the Linux host from src list + """ + # construct a dictionary with { dst: src } + dests = { os.path.join(dst, os.path.basename(s)) : s for s in src } \ + if len(src) > 1 else {dst: src[0]} - def run(self, command, home, - create_home = False, - pidfile = 'pidfile', - stdin = None, - stdout = 'stdout', - stderr = 'stderr', - sudo = False, - tty = False): - - self.debug("Running command '%s'" % command) - - if self.localhost: - (out, err), proc = execfuncs.lspawn(command, pidfile, - stdout = stdout, - stderr = stderr, - stdin = stdin, - home = home, - create_home = create_home, - sudo = sudo, - user = user) - else: - with self._lock: - (out, err), proc = sshfuncs.rspawn( - command, - pidfile = pidfile, - home = home, - create_home = create_home, - stdin = stdin if stdin is not None else '/dev/null', - stdout = stdout if stdout else '/dev/null', - stderr = stderr if stderr else '/dev/null', - sudo = sudo, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey"), - tty = tty - ) + command = [] + for d in dests.keys(): + command.append(" [ -f {dst} ] && echo '{dst}' ".format(dst=d) ) - return (out, err), proc + command = ";".join(command) - def getpid(self, home, pidfile = "pidfile"): - if self.localhost: - pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) - else: - with self._lock: - pidtuple = sshfuncs.rgetpid( - os.path.join(home, pidfile), - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) + (out, err), proc = self.execute(command, retry = 1, with_lock = True) - return pidtuple + for d in dests.keys(): + if out.find(d) > -1: + del dests[d] - def status(self, pid, ppid): - if self.localhost: - status = execfuncs.lstatus(pid, ppid) - else: - with self._lock: - status = sshfuncs.rstatus( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return status - - def kill(self, pid, ppid, sudo = False): - out = err = "" - proc = None - status = self.status(pid, ppid) + if not dests: + return [] - if status == sshfuncs.ProcStatus.RUNNING: - if self.localhost: - (out, err), proc = execfuncs.lkill(pid, ppid, sudo) - else: - with self._lock: - (out, err), proc = sshfuncs.rkill( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return (out, err), proc + return dests.values()