X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=9188022fa208e7a76041afaf253a187b1b19cd62;hb=c957791ea6cdbd032f28cb62d4a39013dbfb7a4a;hp=b7e86684199a807c20b101179a4449319155cd92;hpb=75a9c13d51a01652643281acd06fa78b93d68281;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index b7e86684..9188022f 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -18,7 +18,8 @@ # Author: Alina Quereilhac from nepi.execution.attribute import Attribute, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \ + reschedule_delay from nepi.resources.linux import rpmfuncs, debfuncs from nepi.util import sshfuncs, execfuncs from nepi.util.sshfuncs import ProcStatus @@ -30,14 +31,11 @@ import re import tempfile import time import threading +import traceback -# TODO: Verify files and dirs exists already -# TODO: Blacklist nodes! # TODO: Unify delays!! # TODO: Validate outcome of uploads!! -reschedule_delay = "0.5s" - class ExitCode: """ Error codes that the rexitcode function can return if unable to @@ -52,6 +50,7 @@ class OSType: """ Supported flavors of Linux OS """ + FEDORA_8 = "f8" FEDORA_12 = "f12" FEDORA_14 = "f14" FEDORA = "fedora" @@ -60,6 +59,88 @@ class OSType: @clsinit class LinuxNode(ResourceManager): + """ + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + + .. note:: + + There are different ways in which commands can be executed using the + LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run', + 'run_and_wait'). + + Brief explanation: + + * 'execute' (blocking mode) : + + HOW IT WORKS: 'execute', forks a process and run the + command, synchronously, attached to the terminal, in + foreground. + The execute method will block until the command returns + the result on 'out', 'err' (so until it finishes executing). + + USAGE: short-lived commands that must be executed attached + to a terminal and in foreground, for which it IS necessary + to block until the command has finished (e.g. if you want + to run 'ls' or 'cat'). + + * 'execute' (NON blocking mode - blocking = False) : + + HOW IT WORKS: Same as before, except that execute method + will return immediately (even if command still running). + + USAGE: long-lived commands that must be executed attached + to a terminal and in foreground, but for which it is not + necessary to block until the command has finished. (e.g. + start an application using X11 forwarding) + + * 'run' : + + HOW IT WORKS: Connects to the host ( using SSH if remote) + and launches the command in background, detached from any + terminal (daemonized), and returns. The command continues to + run remotely, but since it is detached from the terminal, + its pipes (stdin, stdout, stderr) can't be redirected to the + console (as normal non detached processes would), and so they + are explicitly redirected to files. The pidfile is created as + part of the process of launching the command. The pidfile + holds the pid and ppid of the process forked in background, + so later on it is possible to check whether the command is still + running. + + USAGE: long-lived commands that can run detached in background, + for which it is NOT necessary to block (wait) until the command + has finished. (e.g. start an application that is not using X11 + forwarding. It can run detached and remotely in background) + + * 'run_and_wait' : + + HOW IT WORKS: Similar to 'run' except that it 'blocks' until + the command has finished execution. It also checks whether + errors occurred during runtime by reading the exitcode file, + which contains the exit code of the command that was run + (checking stderr only is not always reliable since many + commands throw debugging info to stderr and the only way to + automatically know whether an error really happened is to + check the process exit code). + + Another difference with respect to 'run', is that instead + of directly executing the command as a bash command line, + it uploads the command to a bash script and runs the script. + This allows to use the bash script to debug errors, since + it remains at the remote host and can be run manually to + reproduce the error. + + USAGE: medium-lived commands that can run detached in + background, for which it IS necessary to block (wait) until + the command has finished. (e.g. Package installation, + source compilation, file download, etc) + + """ _rtype = "LinuxNode" @classmethod @@ -82,8 +163,12 @@ class LinuxNode(ResourceManager): server_key = Attribute("serverKey", "Server public key", flags = Flags.ExecReadOnly) - clean_home = Attribute("cleanHome", "Remove all files and directories " + \ - " from home folder before starting experiment", + clean_home = Attribute("cleanHome", "Remove all nepi files and directories " + " from node home folder before starting experiment", + flags = Flags.ExecReadOnly) + + clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " + " from a previous same experiment, before the new experiment starts", flags = Flags.ExecReadOnly) clean_processes = Attribute("cleanProcesses", @@ -101,12 +186,15 @@ class LinuxNode(ResourceManager): cls._register_attribute(identity) cls._register_attribute(server_key) cls._register_attribute(clean_home) + cls._register_attribute(clean_experiment) cls._register_attribute(clean_processes) cls._register_attribute(tear_down) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None + # home directory at Linux host + self._home_dir = "" # lock to avoid concurrency issues on methods used by applications self._lock = threading.Lock() @@ -116,17 +204,47 @@ class LinuxNode(ResourceManager): self.get("hostname"), msg) @property - def home(self): - return self.get("home") or "" + def home_dir(self): + home = self.get("home") or "" + if not home.startswith("/"): + home = os.path.join(self._home_dir, home) + return home + + @property + def usr_dir(self): + return os.path.join(self.home_dir, "nepi-usr") + + @property + def lib_dir(self): + return os.path.join(self.usr_dir, "lib") + + @property + def bin_dir(self): + return os.path.join(self.usr_dir, "bin") + + @property + def src_dir(self): + return os.path.join(self.usr_dir, "src") + + @property + def share_dir(self): + return os.path.join(self.usr_dir, "share") + + @property + def exp_dir(self): + return os.path.join(self.home_dir, "nepi-exp") @property def exp_home(self): - return os.path.join(self.home, self.ec.exp_id) + return os.path.join(self.exp_dir, self.ec.exp_id) @property def node_home(self): - node_home = "node-%d" % self.guid - return os.path.join(self.exp_home, node_home) + return os.path.join(self.exp_home, "node-%d" % self.guid) + + @property + def run_home(self): + return os.path.join(self.node_home, self.ec.run_id) @property def os(self): @@ -138,14 +256,11 @@ class LinuxNode(ResourceManager): self.error(msg) raise RuntimeError, msg - (out, err), proc = self.execute("cat /etc/issue", with_lock = True) - - if err and proc.poll(): - msg = "Error detecting OS " - self.error(msg, out, err) - raise RuntimeError, "%s - %s - %s" %( msg, out, err ) + out = self.get_os() - if out.find("Fedora release 12") == 0: + if out.find("Fedora release 8") == 0: + self._os = OSType.FEDORA_8 + elif out.find("Fedora release 12") == 0: self._os = OSType.FEDORA_12 elif out.find("Fedora release 14") == 0: self._os = OSType.FEDORA_14 @@ -160,23 +275,72 @@ class LinuxNode(ResourceManager): return self._os + def get_os(self): + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + out = "" + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("cat /etc/issue", + retry = 5, + with_lock = True, + blocking = True) + + if out.strip() != "": + return out + except: + trace = traceback.format_exc() + msg = "Error detecting OS: %s " % trace + self.error(msg, out, err) + return False + + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 + + + @property + def use_deb(self): + return self.os in [OSType.DEBIAN, OSType.UBUNTU] + + @property + def use_rpm(self): + return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8, + OSType.FEDORA] + @property def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] def provision(self): + # check if host is alive if not self.is_alive(): - self._state = ResourceState.FAILED + self.fail() + msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") self.error(msg) raise RuntimeError, msg + self.find_home() + if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): self.clean_home() - + + if self.get("cleanExperiment"): + self.clean_experiment() + + # Create shared directory structure + self.mkdir(self.lib_dir) + self.mkdir(self.bin_dir) + self.mkdir(self.src_dir) + self.mkdir(self.share_dir) + + # Create experiment node home directory self.mkdir(self.node_home) super(LinuxNode, self).provision() @@ -202,10 +366,20 @@ class LinuxNode(ResourceManager): super(LinuxNode, self).deploy() def release(self): + # Node needs to wait until all associated RMs are released + # to be released + rms = self.get_connected() + for rm in rms: + if rm.state < ResourceState.STOPPED: + self.ec.schedule(reschedule_delay, self.release) + return + tear_down = self.get("tearDown") if tear_down: self.execute(tear_down) + self.clean_processes() + super(LinuxNode, self).release() def valid_connection(self, guid): @@ -233,21 +407,220 @@ class LinuxNode(ResourceManager): (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) def clean_home(self): + """ Cleans all NEPI related folders in the Linux host + """ self.info("Cleaning up home") - cmd = ( - # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" + - "find . -maxdepth 1 -name 'nepi-*' " + - " -execdir rm -rf {} + " - ) + cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % ( + self.home_dir ) + + return self.execute(cmd, with_lock = True) + + def clean_experiment(self): + """ Cleans all experiment related files in the Linux host. + It preserves NEPI files and folders that have a multi experiment + scope. + """ + self.info("Cleaning up experiment files") + + cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % ( + self.exp_dir, + self.ec.exp_id ) - if self.home: - cmd = "cd %s ; " % self.home + cmd + return self.execute(cmd, with_lock = True) + + def execute(self, command, + sudo = False, + stdin = None, + env = None, + tty = False, + forward_x11 = False, + timeout = None, + retry = 3, + err_on_timeout = True, + connect_timeout = 30, + strict_host_checking = False, + persistent = True, + blocking = True, + with_lock = False + ): + """ Notice that this invocation will block until the + execution finishes. If this is not the desired behavior, + use 'run' instead.""" + + if self.localhost: + (out, err), proc = execfuncs.lexec(command, + user = user, + sudo = sudo, + stdin = stdin, + env = env) + else: + if with_lock: + with self._lock: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + stdin = stdin, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + else: + (out, err), proc = sshfuncs.rexec( + command, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + stdin = stdin, + identity = self.get("identity"), + server_key = self.get("serverKey"), + env = env, + tty = tty, + forward_x11 = forward_x11, + timeout = timeout, + retry = retry, + err_on_timeout = err_on_timeout, + connect_timeout = connect_timeout, + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking + ) + + return (out, err), proc + + def run(self, command, home, + create_home = False, + pidfile = 'pidfile', + stdin = None, + stdout = 'stdout', + stderr = 'stderr', + sudo = False, + tty = False): + + self.debug("Running command '%s'" % command) + + if self.localhost: + (out, err), proc = execfuncs.lspawn(command, pidfile, + stdout = stdout, + stderr = stderr, + stdin = stdin, + home = home, + create_home = create_home, + sudo = sudo, + user = user) + else: + with self._lock: + (out, err), proc = sshfuncs.rspawn( + command, + pidfile = pidfile, + home = home, + create_home = create_home, + stdin = stdin if stdin is not None else '/dev/null', + stdout = stdout if stdout else '/dev/null', + stderr = stderr if stderr else '/dev/null', + sudo = sudo, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey"), + tty = tty + ) + + return (out, err), proc + + def getpid(self, home, pidfile = "pidfile"): + if self.localhost: + pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) + else: + with self._lock: + pidtuple = sshfuncs.rgetpid( + os.path.join(home, pidfile), + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return pidtuple + def status(self, pid, ppid): + if self.localhost: + status = execfuncs.lstatus(pid, ppid) + else: + with self._lock: + status = sshfuncs.rstatus( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) + + return status + + def kill(self, pid, ppid, sudo = False): out = err = "" - (out, err), proc = self.execute(cmd, with_lock = True) + proc = None + status = self.status(pid, ppid) + + if status == sshfuncs.ProcStatus.RUNNING: + if self.localhost: + (out, err), proc = execfuncs.lkill(pid, ppid, sudo) + else: + with self._lock: + (out, err), proc = sshfuncs.rkill( + pid, ppid, + host = self.get("hostname"), + user = self.get("username"), + port = self.get("port"), + agent = True, + sudo = sudo, + identity = self.get("identity"), + server_key = self.get("serverKey") + ) - def upload(self, src, dst, text = False): + return (out, err), proc + + def copy(self, src, dst): + if self.localhost: + (out, err), proc = execfuncs.lcopy(source, dest, + recursive = True, + strict_host_checking = False) + else: + with self._lock: + (out, err), proc = sshfuncs.rcopy( + src, dst, + port = self.get("port"), + identity = self.get("identity"), + server_key = self.get("serverKey"), + recursive = True, + strict_host_checking = False) + + return (out, err), proc + + + def upload(self, src, dst, text = False, overwrite = True): """ Copy content to destination src content to copy. Can be a local file, directory or a list of files @@ -266,6 +639,13 @@ class LinuxNode(ResourceManager): f.close() src = f.name + # If dst files should not be overwritten, check that the files do not + # exits already + if overwrite == False: + src = self.filter_existing_files(src, dst) + if not src: + return ("", ""), None + if not self.localhost: # Build destination as @: dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) @@ -284,46 +664,64 @@ class LinuxNode(ResourceManager): src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) return self.copy(src, dst) - def install_packages(self, packages, home): + def install_packages_command(self, packages): command = "" - if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + if self.use_rpm: command = rpmfuncs.install_packages_command(self.os, packages) - elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + elif self.use_deb: command = debfuncs.install_packages_command(self.os, packages) else: msg = "Error installing packages ( OS not known ) " self.error(msg, self.os) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "instpkg.sh", + return command + + def install_packages(self, packages, home, run_home = None): + """ Install packages in the Linux host. + + 'home' is the directory to upload the package installation script. + 'run_home' is the directory from where to execute the script. + """ + command = self.install_packages_command(packages) + + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "instpkg.sh"), pidfile = "instpkg_pidfile", ecodefile = "instpkg_exitcode", stdout = "instpkg_stdout", stderr = "instpkg_stderr", + overwrite = False, raise_on_error = True) return (out, err), proc - def remove_packages(self, packages, home): - command = "" - if self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA]: + def remove_packages(self, packages, home, run_home = None): + """ Uninstall packages from the Linux host. + + 'home' is the directory to upload the package un-installation script. + 'run_home' is the directory from where to execute the script. + """ + if self.use_rpm: command = rpmfuncs.remove_packages_command(self.os, packages) - elif self.os in [OSType.DEBIAN, OSType.UBUNTU]: + elif self.use_deb: command = debfuncs.remove_packages_command(self.os, packages) else: msg = "Error removing packages ( OS not known ) " self.error(msg) raise RuntimeError, msg - out = err = "" - (out, err), proc = self.run_and_wait(command, home, - shfile = "rmpkg.sh", + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "rmpkg.sh"), pidfile = "rmpkg_pidfile", ecodefile = "rmpkg_exitcode", stdout = "rmpkg_stdout", stderr = "rmpkg_stderr", + overwrite = False, raise_on_error = True) return (out, err), proc @@ -340,6 +738,7 @@ class LinuxNode(ResourceManager): def run_and_wait(self, command, home, shfile = "cmd.sh", env = None, + overwrite = True, pidfile = "pidfile", ecodefile = "exitcode", stdin = None, @@ -348,19 +747,22 @@ class LinuxNode(ResourceManager): sudo = False, tty = False, raise_on_error = False): - """ - runs a command in background on the remote host, busy-waiting - until the command finishes execution. - This is more robust than doing a simple synchronized 'execute', - since in the remote host the command can continue to run detached - even if network disconnections occur """ - self.upload_command(command, home, + Uploads the 'command' to a bash script in the host. + Then runs the script detached in background in the host, and + busy-waites until the script finishes executing. + """ + + if not shfile.startswith("/"): + shfile = os.path.join(home, shfile) + + self.upload_command(command, shfile = shfile, ecodefile = ecodefile, - env = env) + env = env, + overwrite = overwrite) - command = "bash ./%s" % shfile + command = "bash %s" % shfile # run command in background in remote host (out, err), proc = self.run(command, home, pidfile = pidfile, @@ -371,7 +773,7 @@ class LinuxNode(ResourceManager): tty = tty) # check no errors occurred - if proc.poll() and err: + if proc.poll(): msg = " Failed to run command '%s' " % command self.error(msg, out, err) if raise_on_error: @@ -386,15 +788,19 @@ class LinuxNode(ResourceManager): # wait until command finishes to execute self.wait_run(pid, ppid) - (out, err), proc = self.check_errors(home, ecodefile, stderr) + (eout, err), proc = self.check_errors(home, + ecodefile = ecodefile, + stderr = stderr) # Out is what was written in the stderr file - if out or err: + if err: msg = " Failed to run command '%s' " % command - self.error(msg, out, err) + self.error(msg, eout, err) if raise_on_error: raise RuntimeError, msg + + (out, oerr), proc = self.check_output(home, stdout) return (out, err), proc @@ -420,47 +826,58 @@ class LinuxNode(ResourceManager): # Other error from 'cat' return ExitCode.ERROR - def upload_command(self, command, home, + def upload_command(self, command, shfile = "cmd.sh", ecodefile = "exitcode", + overwrite = True, env = None): """ Saves the command as a bash script file in the remote host, and forces to save the exit code of the command execution to the ecodefile """ - - # Prepare command to be executed as a bash script file - # Make sure command ends in ';' so the curly brackets syntax is correct - if not command.strip()[-1] == ';': - command += " ; " + if not (command.strip().endswith(";") or command.strip().endswith("&")): + command += ";" + # The exit code of the command will be stored in ecodefile - command = " { { %(command)s } ; echo $? > %(ecodefile)s ; } " % { + command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % { 'command': command, 'ecodefile': ecodefile, } # Export environment - environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) \ - if env else "" + environ = self.format_environment(env) # Add environ to command command = environ + command - dst = os.path.join(home, shfile) - return self.upload(command, dst, text = True) + return self.upload(command, shfile, text = True, overwrite = overwrite) + + def format_environment(self, env, inline = False): + """ Formats the environment variables for a command to be executed + either as an inline command + (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or + as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n) + """ + if not env: return "" + + # Remove extra white spaces + env = re.sub(r'\s+', ' ', env.strip()) + + sep = ";" if inline else "\n" + return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep def check_errors(self, home, ecodefile = "exitcode", stderr = "stderr"): - """ - Checks whether errors occurred while running a command. + """ Checks whether errors occurred while running a command. It first checks the exit code for the command, and only if the exit code is an error one it returns the error output. + """ - out = err = "" proc = None + err = "" - # get Exit code + # get exit code saved in the 'exitcode' file ecode = self.exitcode(home, ecodefile) if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]: @@ -468,15 +885,15 @@ class LinuxNode(ResourceManager): elif ecode > 0 or ecode == ExitCode.FILENOTFOUND: # The process returned an error code or didn't exist. # Check standard error. - (out, err), proc = self.check_output(home, stderr) - - # If the stderr file was not found, assume nothing happened. - # We just ignore the error. + (err, eerr), proc = self.check_output(home, stderr) + + # If the stderr file was not found, assume nothing bad happened, + # and just ignore the error. # (cat returns 1 for error "No such file or directory") if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: - out = err = "" - - return (out, err), proc + err = "" + + return ("", err), proc def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): """ Waits until the pid file for the command is generated, @@ -505,7 +922,7 @@ class LinuxNode(ResourceManager): def wait_run(self, pid, ppid, trial = 0): """ wait for a remote process to finish execution """ - start_delay = 1.0 + delay = 1.0 while True: status = self.status(pid, ppid) @@ -530,215 +947,95 @@ class LinuxNode(ResourceManager): return (out, err), proc def is_alive(self): + """ Checks if host is responsive + """ if self.localhost: return True out = err = "" - try: - # TODO: FIX NOT ALIVE!!!! - (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, - with_lock = True) - except: - import traceback - trace = traceback.format_exc() - msg = "Unresponsive host %s " % err - self.error(msg, out, trace) - return False + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("echo 'ALIVE'", + retry = 5, + blocking = True, + with_lock = True) + + if out.find("ALIVE") > -1: + return True + except: + trace = traceback.format_exc() + msg = "Unresponsive host. Error reaching host: %s " % trace + self.error(msg, out, err) + return False + + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 - if out.strip().startswith('ALIVE'): + if out.find("ALIVE") > -1: return True else: - msg = "Unresponsive host " + msg = "Unresponsive host. Wrong answer. " self.error(msg, out, err) return False - def copy(self, src, dst): - if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, - recursive = True, - strict_host_checking = False) - else: - with self._lock: - (out, err), proc = sshfuncs.rcopy( - src, dst, - port = self.get("port"), - identity = self.get("identity"), - server_key = self.get("serverKey"), - recursive = True, - strict_host_checking = False) - - return (out, err), proc + def find_home(self): + """ Retrieves host home directory + """ + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + retrydelay = 1.0 + for i in xrange(10): + try: + (out, err), proc = self.execute("echo ${HOME}", + retry = 5, + blocking = True, + with_lock = True) + + if out.strip() != "": + self._home_dir = out.strip() + break + except: + trace = traceback.format_exc() + msg = "Impossible to retrieve HOME directory" % trace + self.error(msg, out, err) + return False - def execute(self, command, - sudo = False, - stdin = None, - env = None, - tty = False, - forward_x11 = False, - timeout = None, - retry = 3, - err_on_timeout = True, - connect_timeout = 30, - strict_host_checking = False, - persistent = True, - blocking = True, - with_lock = False - ): - """ Notice that this invocation will block until the - execution finishes. If this is not the desired behavior, - use 'run' instead.""" + time.sleep(min(30.0, retrydelay)) + retrydelay *= 1.5 - if self.localhost: - (out, err), proc = execfuncs.lexec(command, - user = user, - sudo = sudo, - stdin = stdin, - env = env) - else: - if with_lock: - with self._lock: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) - else: - (out, err), proc = sshfuncs.rexec( - command, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - stdin = stdin, - identity = self.get("identity"), - server_key = self.get("serverKey"), - env = env, - tty = tty, - forward_x11 = forward_x11, - timeout = timeout, - retry = retry, - err_on_timeout = err_on_timeout, - connect_timeout = connect_timeout, - persistent = persistent, - blocking = blocking, - strict_host_checking = strict_host_checking - ) - - return (out, err), proc + if not self._home_dir: + msg = "Impossible to retrieve HOME directory" + self.error(msg, out, err) + raise RuntimeError, msg - def run(self, command, home, - create_home = False, - pidfile = 'pidfile', - stdin = None, - stdout = 'stdout', - stderr = 'stderr', - sudo = False, - tty = False): - - self.debug("Running command '%s'" % command) - - if self.localhost: - (out, err), proc = execfuncs.lspawn(command, pidfile, - stdout = stdout, - stderr = stderr, - stdin = stdin, - home = home, - create_home = create_home, - sudo = sudo, - user = user) - else: - with self._lock: - (out, err), proc = sshfuncs.rspawn( - command, - pidfile = pidfile, - home = home, - create_home = create_home, - stdin = stdin if stdin is not None else '/dev/null', - stdout = stdout if stdout else '/dev/null', - stderr = stderr if stderr else '/dev/null', - sudo = sudo, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey"), - tty = tty - ) + def filter_existing_files(self, src, dst): + """ Removes files that already exist in the Linux host from src list + """ + # construct a dictionary with { dst: src } + dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ), x ), + src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src}) - return (out, err), proc + command = [] + for d in dests.keys(): + command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} ) - def getpid(self, home, pidfile = "pidfile"): - if self.localhost: - pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) - else: - with self._lock: - pidtuple = sshfuncs.rgetpid( - os.path.join(home, pidfile), - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return pidtuple + command = ";".join(command) - def status(self, pid, ppid): - if self.localhost: - status = execfuncs.lstatus(pid, ppid) - else: - with self._lock: - status = sshfuncs.rstatus( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) - - return status + (out, err), proc = self.execute(command, retry = 1, with_lock = True) - def kill(self, pid, ppid, sudo = False): - out = err = "" - proc = None - status = self.status(pid, ppid) + for d in dests.keys(): + if out.find(d) > -1: + del dests[d] - if status == sshfuncs.ProcStatus.RUNNING: - if self.localhost: - (out, err), proc = execfuncs.lkill(pid, ppid, sudo) - else: - with self._lock: - (out, err), proc = sshfuncs.rkill( - pid, ppid, - host = self.get("hostname"), - user = self.get("username"), - port = self.get("port"), - agent = True, - sudo = sudo, - identity = self.get("identity"), - server_key = self.get("serverKey") - ) + if not dests: + return "" - return (out, err), proc + return " ".join(dests.values())