X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Flinux%2Fnode.py;h=029ee8ad9103cbf8b72779b76be8511fa9dc631d;hb=9a1ef15a5791b8e8b2f3b57db475697f77233a86;hp=8387defc020922085bd30a28747518af5c258dce;hpb=62a8f5c90afe9033f532206c811cff8ea76b2c09;p=nepi.git diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 8387defc..029ee8ad 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -17,10 +17,12 @@ # # Author: Alina Quereilhac -from nepi.execution.attribute import Attribute, Flags -from nepi.execution.resource import ResourceManager, clsinit, ResourceState +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay from nepi.resources.linux import rpmfuncs, debfuncs -from nepi.util import sshfuncs, execfuncs +from nepi.util import sshfuncs, execfuncs +from nepi.util.sshfuncs import ProcStatus import collections import os @@ -29,18 +31,120 @@ import re import tempfile import time import threading +import traceback -# TODO: Verify files and dirs exists already -# TODO: Blacklist nodes! # TODO: Unify delays!! # TODO: Validate outcome of uploads!! -reschedule_delay = "0.5s" - - -@clsinit +class ExitCode: + """ + Error codes that the rexitcode function can return if unable to + check the exit code of a spawned process + """ + FILENOTFOUND = -1 + CORRUPTFILE = -2 + ERROR = -3 + OK = 0 + +class OSType: + """ + Supported flavors of Linux OS + """ + FEDORA_8 = "f8" + FEDORA_12 = "f12" + FEDORA_14 = "f14" + FEDORA = "fedora" + UBUNTU = "ubuntu" + DEBIAN = "debian" + +@clsinit_copy class LinuxNode(ResourceManager): + """ + .. class:: Class Args : + + :param ec: The Experiment controller + :type ec: ExperimentController + :param guid: guid of the RM + :type guid: int + + .. note:: + + There are different ways in which commands can be executed using the + LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run', + 'run_and_wait'). + + Brief explanation: + + * 'execute' (blocking mode) : + + HOW IT WORKS: 'execute', forks a process and run the + command, synchronously, attached to the terminal, in + foreground. + The execute method will block until the command returns + the result on 'out', 'err' (so until it finishes executing). + + USAGE: short-lived commands that must be executed attached + to a terminal and in foreground, for which it IS necessary + to block until the command has finished (e.g. if you want + to run 'ls' or 'cat'). + + * 'execute' (NON blocking mode - blocking = False) : + + HOW IT WORKS: Same as before, except that execute method + will return immediately (even if command still running). + + USAGE: long-lived commands that must be executed attached + to a terminal and in foreground, but for which it is not + necessary to block until the command has finished. (e.g. + start an application using X11 forwarding) + + * 'run' : + + HOW IT WORKS: Connects to the host ( using SSH if remote) + and launches the command in background, detached from any + terminal (daemonized), and returns. The command continues to + run remotely, but since it is detached from the terminal, + its pipes (stdin, stdout, stderr) can't be redirected to the + console (as normal non detached processes would), and so they + are explicitly redirected to files. The pidfile is created as + part of the process of launching the command. The pidfile + holds the pid and ppid of the process forked in background, + so later on it is possible to check whether the command is still + running. + + USAGE: long-lived commands that can run detached in background, + for which it is NOT necessary to block (wait) until the command + has finished. (e.g. start an application that is not using X11 + forwarding. It can run detached and remotely in background) + + * 'run_and_wait' : + + HOW IT WORKS: Similar to 'run' except that it 'blocks' until + the command has finished execution. It also checks whether + errors occurred during runtime by reading the exitcode file, + which contains the exit code of the command that was run + (checking stderr only is not always reliable since many + commands throw debugging info to stderr and the only way to + automatically know whether an error really happened is to + check the process exit code). + + Another difference with respect to 'run', is that instead + of directly executing the command as a bash command line, + it uploads the command to a bash script and runs the script. + This allows to use the bash script to debug errors, since + it remains at the remote host and can be run manually to + reproduce the error. + + USAGE: medium-lived commands that can run detached in + background, for which it IS necessary to block (wait) until + the command has finished. (e.g. Package installation, + source compilation, file download, etc) + + """ _rtype = "LinuxNode" + _help = "Controls Linux host machines ( either localhost or a host " \ + "that can be accessed using a SSH key)" + _backend_type = "linux" @classmethod def _register_attributes(cls): @@ -62,18 +166,34 @@ class LinuxNode(ResourceManager): server_key = Attribute("serverKey", "Server public key", flags = Flags.ExecReadOnly) - clean_home = Attribute("cleanHome", "Remove all files and directories " + \ - " from home folder before starting experiment", + clean_home = Attribute("cleanHome", "Remove all nepi files and directories " + " from node home folder before starting experiment", + type = Types.Bool, + default = False, + flags = Flags.ExecReadOnly) + + clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " + " from a previous same experiment, before the new experiment starts", + type = Types.Bool, + default = False, flags = Flags.ExecReadOnly) clean_processes = Attribute("cleanProcesses", "Kill all running processes before starting experiment", + type = Types.Bool, + default = False, flags = Flags.ExecReadOnly) tear_down = Attribute("tearDown", "Bash script to be executed before " + \ "releasing the resource", flags = Flags.ExecReadOnly) + gateway_user = Attribute("gatewayUser", "Gateway account username", + flags = Flags.ExecReadOnly) + + gateway = Attribute("gateway", "Hostname of the gateway machine", + flags = Flags.ExecReadOnly) + cls._register_attribute(hostname) cls._register_attribute(username) cls._register_attribute(port) @@ -81,32 +201,73 @@ class LinuxNode(ResourceManager): cls._register_attribute(identity) cls._register_attribute(server_key) cls._register_attribute(clean_home) + cls._register_attribute(clean_experiment) cls._register_attribute(clean_processes) cls._register_attribute(tear_down) + cls._register_attribute(gateway_user) + cls._register_attribute(gateway) def __init__(self, ec, guid): super(LinuxNode, self).__init__(ec, guid) self._os = None - - # lock to avoid concurrency issues on methods used by applications - self._lock = threading.Lock() + # home directory at Linux host + self._home_dir = "" + + # lock to prevent concurrent applications on the same node, + # to execute commands at the same time. There are potential + # concurrency issues when using SSH to a same host from + # multiple threads. There are also possible operational + # issues, e.g. an application querying the existence + # of a file or folder prior to its creation, and another + # application creating the same file or folder in between. + self._node_lock = threading.Lock() def log_message(self, msg): return " guid %d - host %s - %s " % (self.guid, self.get("hostname"), msg) @property - def home(self): - return self.get("home") or "" + def home_dir(self): + home = self.get("home") or "" + if not home.startswith("/"): + home = os.path.join(self._home_dir, home) + return home + + @property + def usr_dir(self): + return os.path.join(self.home_dir, "nepi-usr") + + @property + def lib_dir(self): + return os.path.join(self.usr_dir, "lib") + + @property + def bin_dir(self): + return os.path.join(self.usr_dir, "bin") + + @property + def src_dir(self): + return os.path.join(self.usr_dir, "src") + + @property + def share_dir(self): + return os.path.join(self.usr_dir, "share") + + @property + def exp_dir(self): + return os.path.join(self.home_dir, "nepi-exp") @property def exp_home(self): - return os.path.join(self.home, self.ec.exp_id) + return os.path.join(self.exp_dir, self.ec.exp_id) @property def node_home(self): - node_home = "node-%d" % self.guid - return os.path.join(self.exp_home, node_home) + return os.path.join(self.exp_home, "node-%d" % self.guid) + + @property + def run_home(self): + return os.path.join(self.node_home, self.ec.run_id) @property def os(self): @@ -118,21 +279,20 @@ class LinuxNode(ResourceManager): self.error(msg) raise RuntimeError, msg - (out, err), proc = self.execute("cat /etc/issue", with_lock = True) + out = self.get_os() - if err and proc.poll(): - msg = "Error detecting OS " - self.error(msg, out, err) - raise RuntimeError, "%s - %s - %s" %( msg, out, err ) - - if out.find("Fedora release 12") == 0: - self._os = "f12" + if out.find("Fedora release 8") == 0: + self._os = OSType.FEDORA_8 + elif out.find("Fedora release 12") == 0: + self._os = OSType.FEDORA_12 elif out.find("Fedora release 14") == 0: - self._os = "f14" + self._os = OSType.FEDORA_14 + elif out.find("Fedora release") == 0: + self._os = OSType.FEDORA elif out.find("Debian") == 0: - self._os = "debian" + self._os = OSType.DEBIAN elif out.find("Ubuntu") ==0: - self._os = "ubuntu" + self._os = OSType.UBUNTU else: msg = "Unsupported OS" self.error(msg, out) @@ -140,328 +300,155 @@ class LinuxNode(ResourceManager): return self._os + def get_os(self): + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + out = "" + try: + (out, err), proc = self.execute("cat /etc/issue", + with_lock = True, + blocking = True) + except: + trace = traceback.format_exc() + msg = "Error detecting OS: %s " % trace + self.error(msg, out, err) + + return out + + @property + def use_deb(self): + return self.os in [OSType.DEBIAN, OSType.UBUNTU] + + @property + def use_rpm(self): + return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8, + OSType.FEDORA] + @property def localhost(self): return self.get("hostname") in ['localhost', '127.0.0.7', '::1'] - def provision(self): + def do_provision(self): + # check if host is alive if not self.is_alive(): - self._state = ResourceState.FAILED msg = "Deploy failed. Unresponsive node %s" % self.get("hostname") self.error(msg) raise RuntimeError, msg + self.find_home() + if self.get("cleanProcesses"): self.clean_processes() if self.get("cleanHome"): self.clean_home() - + + if self.get("cleanExperiment"): + self.clean_experiment() + + # Create shared directory structure + self.mkdir(self.lib_dir) + self.mkdir(self.bin_dir) + self.mkdir(self.src_dir) + self.mkdir(self.share_dir) + + # Create experiment node home directory self.mkdir(self.node_home) - super(LinuxNode, self).provision() + super(LinuxNode, self).do_provision() - def deploy(self): + def do_deploy(self): if self.state == ResourceState.NEW: - try: - self.discover() - self.provision() - except: - self._state = ResourceState.FAILED - raise + self.info("Deploying node") + self.do_discover() + self.do_provision() # Node needs to wait until all associated interfaces are # ready before it can finalize deployment from nepi.resources.linux.interface import LinuxInterface - ifaces = self.get_connected(LinuxInterface.rtype()) + ifaces = self.get_connected(LinuxInterface.get_rtype()) for iface in ifaces: if iface.state < ResourceState.READY: self.ec.schedule(reschedule_delay, self.deploy) return - super(LinuxNode, self).deploy() + super(LinuxNode, self).do_deploy() + + def do_release(self): + rms = self.get_connected() + for rm in rms: + # Node needs to wait until all associated RMs are released + # before it can be released + if rm.state != ResourceState.RELEASED: + self.ec.schedule(reschedule_delay, self.release) + return - def release(self): tear_down = self.get("tearDown") if tear_down: self.execute(tear_down) - super(LinuxNode, self).release() + self.clean_processes() + + super(LinuxNode, self).do_release() def valid_connection(self, guid): # TODO: Validate! return True - def clean_processes(self, killer = False): + def clean_processes(self): self.info("Cleaning up processes") - - if killer: - # Hardcore kill - cmd = ("sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S killall python tcpdump || /bin/true ; " + - "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; " + - "sudo -S killall -u root || /bin/true ; ") - else: - # Be gentler... + + if self.get("username") != 'root': cmd = ("sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall tcpdump || /bin/true ; " + - "sudo -S killall -u %s || /bin/true ; " % self.get("username") + + "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " + "sudo -S killall -u %s || /bin/true ; " % self.get("username")) + else: + if self.state >= ResourceState.READY: + import pickle + pids = pickle.load(open("/tmp/save.proc", "rb")) + pids_temp = dict() + ps_aux = "ps aux |awk '{print $2,$11}'" + (out, err), proc = self.execute(ps_aux) + for line in out.strip().split("\n"): + parts = line.strip().split(" ") + pids_temp[parts[0]] = parts[1] + kill_pids = set(pids_temp.items()) - set(pids.items()) + kill_pids = ' '.join(dict(kill_pids).keys()) + + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " + + "kill %s || /bin/true ; " % kill_pids) + else: + cmd = ("killall tcpdump || /bin/true ; " + + "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ") out = err = "" - (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) - + (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) + def clean_home(self): + """ Cleans all NEPI related folders in the Linux host + """ self.info("Cleaning up home") - cmd = ( - # "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" + - "find . -maxdepth 1 -name 'nepi-*' " + - " -execdir rm -rf {} + " - ) - - if self.home: - cmd = "cd %s ; " % self.home + cmd - - out = err = "" - (out, err), proc = self.execute(cmd, with_lock = True) - - def upload(self, src, dst, text = False): - """ Copy content to destination - - src content to copy. Can be a local file, directory or a list of files - - dst destination path on the remote host (remote is always self.host) - - text src is text input, it must be stored into a temp file before uploading - """ - # If source is a string input - f = None - if text and not os.path.isfile(src): - # src is text input that should be uploaded as file - # create a temporal file with the content to upload - f = tempfile.NamedTemporaryFile(delete=False) - f.write(src) - f.close() - src = f.name - - if not self.localhost: - # Build destination as @: - dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) - - result = self.copy(src, dst) - - # clean up temp file - if f: - os.remove(f.name) - - return result - - def download(self, src, dst): - if not self.localhost: - # Build destination as @: - src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) - return self.copy(src, dst) - - def install_packages(self, packages, home = None): - home = home or self.node_home - - cmd = "" - if self.os in ["f12", "f14"]: - cmd = rpmfuncs.install_packages_command(self.os, packages) - elif self.os in ["debian", "ubuntu"]: - cmd = debfuncs.install_packages_command(self.os, packages) - else: - msg = "Error installing packages ( OS not known ) " - self.error(msg, self.os) - raise RuntimeError, msg - - out = err = "" - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "instpkg_pid", - stdout = "instpkg_out", - stderr = "instpkg_err", - raise_on_error = True) - - return (out, err), proc - - def remove_packages(self, packages, home = None): - home = home or self.node_home - - cmd = "" - if self.os in ["f12", "f14"]: - cmd = rpmfuncs.remove_packages_command(self.os, packages) - elif self.os in ["debian", "ubuntu"]: - cmd = debfuncs.remove_packages_command(self.os, packages) - else: - msg = "Error removing packages ( OS not known ) " - self.error(msg) - raise RuntimeError, msg - - out = err = "" - (out, err), proc = self.run_and_wait(cmd, home, - pidfile = "rmpkg_pid", - stdout = "rmpkg_out", - stderr = "rmpkg_err", - raise_on_error = True) - - return (out, err), proc + cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % ( + self.home_dir ) - def mkdir(self, path, clean = False): - if clean: - self.rmdir(path) - - return self.execute("mkdir -p %s" % path, with_lock = True) - - def rmdir(self, path): - return self.execute("rm -rf %s" % path, with_lock = True) + return self.execute(cmd, with_lock = True) - def run_and_wait(self, command, - home = ".", - pidfile = "pid", - stdin = None, - stdout = 'stdout', - stderr = 'stderr', - sudo = False, - tty = False, - raise_on_error = False): - """ runs a command in background on the remote host, but waits - until the command finishes execution. - This is more robust than doing a simple synchronized 'execute', - since in the remote host the command can continue to run detached - even if network disconnections occur + def clean_experiment(self): + """ Cleans all experiment related files in the Linux host. + It preserves NEPI files and folders that have a multi experiment + scope. """ - # run command in background in remote host - (out, err), proc = self.run(command, home, - pidfile = pidfile, - stdin = stdin, - stdout = stdout, - stderr = stderr, - sudo = sudo, - tty = tty) - - # check no errors occurred - if proc.poll() and err: - msg = " Failed to run command '%s' " % command - self.error(msg, out, err) - if raise_on_error: - raise RuntimeError, msg - - # Wait for pid file to be generated - pid, ppid = self.wait_pid( - home = home, - pidfile = pidfile, - raise_on_error = raise_on_error) - - # wait until command finishes to execute - self.wait_run(pid, ppid) - - # check if execution errors occurred - (out, err), proc = self.check_output(home, stderr) - - if err or out: - msg = " Failed to run command '%s' " % command - self.error(msg, out, err) - - if raise_on_error: - raise RuntimeError, msg + self.info("Cleaning up experiment files") - return (out, err), proc - - def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False): - """ Waits until the pid file for the command is generated, - and returns the pid and ppid of the process """ - pid = ppid = None - delay = 1.0 - for i in xrange(5): - pidtuple = self.checkpid(home = home, pidfile = pidfile) + cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % ( + self.exp_dir, + self.ec.exp_id ) - if pidtuple: - pid, ppid = pidtuple - break - else: - time.sleep(delay) - delay = min(30,delay*1.2) - else: - msg = " Failed to get pid for pidfile %s/%s " % ( - home, pidfile ) - self.error(msg) - - if raise_on_error: - raise RuntimeError, msg - - return pid, ppid - - def wait_run(self, pid, ppid, trial = 0): - """ wait for a remote process to finish execution """ - delay = 1.0 - first = True - bustspin = 0 - - while True: - status = self.status(pid, ppid) - - if status is sshfuncs.FINISHED: - break - elif status is not sshfuncs.RUNNING: - bustspin += 1 - time.sleep(delay*(5.5+random.random())) - if bustspin > 12: - break - else: - if first: - first = False - - time.sleep(delay*(0.5+random.random())) - delay = min(30,delay*1.2) - bustspin = 0 - - def check_output(self, home, filename): - """ checks file content """ - (out, err), proc = self.execute("cat %s" % - os.path.join(home, filename), retry = 1, with_lock = True) - return (out, err), proc - - def is_alive(self): - if self.localhost: - return True - - out = err = "" - try: - # TODO: FIX NOT ALIVE!!!! - (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, - with_lock = True) - except: - import traceback - trace = traceback.format_exc() - msg = "Unresponsive host %s " % err - self.error(msg, out, trace) - return False - - if out.strip().startswith('ALIVE'): - return True - else: - msg = "Unresponsive host " - self.error(msg, out, err) - return False - - def copy(self, src, dst): - if self.localhost: - (out, err), proc = execfuncs.lcopy(source, dest, - recursive = True, - strict_host_checking = False) - else: - with self._lock: - (out, err), proc = sshfuncs.rcopy( - src, dst, - port = self.get("port"), - identity = self.get("identity"), - server_key = self.get("serverKey"), - recursive = True, - strict_host_checking = False) - - return (out, err), proc + return self.execute(cmd, with_lock = True) def execute(self, command, sudo = False, @@ -475,6 +462,7 @@ class LinuxNode(ResourceManager): connect_timeout = 30, strict_host_checking = False, persistent = True, + blocking = True, with_lock = False ): """ Notice that this invocation will block until the @@ -483,18 +471,20 @@ class LinuxNode(ResourceManager): if self.localhost: (out, err), proc = execfuncs.lexec(command, - user = user, + user = self.get("username"), # still problem with localhost sudo = sudo, stdin = stdin, env = env) else: if with_lock: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rexec( command, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, stdin = stdin, @@ -508,6 +498,7 @@ class LinuxNode(ResourceManager): err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, + blocking = blocking, strict_host_checking = strict_host_checking ) else: @@ -516,6 +507,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, stdin = stdin, @@ -528,21 +521,22 @@ class LinuxNode(ResourceManager): retry = retry, err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, - persistent = persistent + persistent = persistent, + blocking = blocking, + strict_host_checking = strict_host_checking ) return (out, err), proc - def run(self, command, - home = None, + def run(self, command, home, create_home = False, - pidfile = "pid", + pidfile = 'pidfile', stdin = None, stdout = 'stdout', stderr = 'stderr', sudo = False, tty = False): - + self.debug("Running command '%s'" % command) if self.localhost: @@ -555,10 +549,8 @@ class LinuxNode(ResourceManager): sudo = sudo, user = user) else: - # Start process in a "daemonized" way, using nohup and heavy - # stdin/out redirection to avoid connection issues - with self._lock: - (out,err), proc = sshfuncs.rspawn( + with self._node_lock: + (out, err), proc = sshfuncs.rspawn( command, pidfile = pidfile, home = home, @@ -570,6 +562,8 @@ class LinuxNode(ResourceManager): host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey"), @@ -578,33 +572,37 @@ class LinuxNode(ResourceManager): return (out, err), proc - def checkpid(self, home = ".", pidfile = "pid"): + def getpid(self, home, pidfile = "pidfile"): if self.localhost: - pidtuple = execfuncs.lcheckpid(os.path.join(home, pidfile)) + pidtuple = execfuncs.lgetpid(os.path.join(home, pidfile)) else: - with self._lock: - pidtuple = sshfuncs.rcheckpid( + with self._node_lock: + pidtuple = sshfuncs.rgetpid( os.path.join(home, pidfile), host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey") ) return pidtuple - + def status(self, pid, ppid): if self.localhost: status = execfuncs.lstatus(pid, ppid) else: - with self._lock: + with self._node_lock: status = sshfuncs.rstatus( pid, ppid, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, identity = self.get("identity"), server_key = self.get("serverKey") @@ -617,27 +615,452 @@ class LinuxNode(ResourceManager): proc = None status = self.status(pid, ppid) - if status == sshfuncs.RUNNING: + if status == sshfuncs.ProcStatus.RUNNING: if self.localhost: (out, err), proc = execfuncs.lkill(pid, ppid, sudo) else: - with self._lock: + with self._node_lock: (out, err), proc = sshfuncs.rkill( pid, ppid, host = self.get("hostname"), user = self.get("username"), port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), agent = True, sudo = sudo, identity = self.get("identity"), server_key = self.get("serverKey") ) + + return (out, err), proc + + def copy(self, src, dst): + if self.localhost: + (out, err), proc = execfuncs.lcopy(source, dest, + recursive = True, + strict_host_checking = False) + else: + with self._node_lock: + (out, err), proc = sshfuncs.rcopy( + src, dst, + port = self.get("port"), + gwuser = self.get("gatewayUser"), + gw = self.get("gateway"), + identity = self.get("identity"), + server_key = self.get("serverKey"), + recursive = True, + strict_host_checking = False) + return (out, err), proc - def check_bad_host(self, out, err): - badre = re.compile(r'(?:' - r'|Error: disk I/O error' - r')', - re.I) - return badre.search(out) or badre.search(err) + def upload(self, src, dst, text = False, overwrite = True): + """ Copy content to destination + + src string with the content to copy. Can be: + - plain text + - a string with the path to a local file + - a string with a semi-colon separeted list of local files + - a string with a local directory + + dst string with destination path on the remote host (remote is + always self.host) + + text src is text input, it must be stored into a temp file before + uploading + """ + # If source is a string input + f = None + if text and not os.path.isfile(src): + # src is text input that should be uploaded as file + # create a temporal file with the content to upload + f = tempfile.NamedTemporaryFile(delete=False) + f.write(src) + f.close() + src = f.name + + # If dst files should not be overwritten, check that the files do not + # exits already + if isinstance(src, str): + src = map(str.strip, src.split(";")) + + if overwrite == False: + src = self.filter_existing_files(src, dst) + if not src: + return ("", ""), None + + if not self.localhost: + # Build destination as @: + dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst) + + result = self.copy(src, dst) + + # clean up temp file + if f: + os.remove(f.name) + + return result + + def download(self, src, dst): + if not self.localhost: + # Build destination as @: + src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src) + return self.copy(src, dst) + + def install_packages_command(self, packages): + command = "" + if self.use_rpm: + command = rpmfuncs.install_packages_command(self.os, packages) + elif self.use_deb: + command = debfuncs.install_packages_command(self.os, packages) + else: + msg = "Error installing packages ( OS not known ) " + self.error(msg, self.os) + raise RuntimeError, msg + + return command + + def install_packages(self, packages, home, run_home = None): + """ Install packages in the Linux host. + + 'home' is the directory to upload the package installation script. + 'run_home' is the directory from where to execute the script. + """ + command = self.install_packages_command(packages) + + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "instpkg.sh"), + pidfile = "instpkg_pidfile", + ecodefile = "instpkg_exitcode", + stdout = "instpkg_stdout", + stderr = "instpkg_stderr", + overwrite = False, + raise_on_error = True) + + return (out, err), proc + + def remove_packages(self, packages, home, run_home = None): + """ Uninstall packages from the Linux host. + + 'home' is the directory to upload the package un-installation script. + 'run_home' is the directory from where to execute the script. + """ + if self.use_rpm: + command = rpmfuncs.remove_packages_command(self.os, packages) + elif self.use_deb: + command = debfuncs.remove_packages_command(self.os, packages) + else: + msg = "Error removing packages ( OS not known ) " + self.error(msg) + raise RuntimeError, msg + + run_home = run_home or home + + (out, err), proc = self.run_and_wait(command, run_home, + shfile = os.path.join(home, "rmpkg.sh"), + pidfile = "rmpkg_pidfile", + ecodefile = "rmpkg_exitcode", + stdout = "rmpkg_stdout", + stderr = "rmpkg_stderr", + overwrite = False, + raise_on_error = True) + + return (out, err), proc + + def mkdir(self, path, clean = False): + if clean: + self.rmdir(path) + + return self.execute("mkdir -p %s" % path, with_lock = True) + + def rmdir(self, path): + return self.execute("rm -rf %s" % path, with_lock = True) + + def run_and_wait(self, command, home, + shfile = "cmd.sh", + env = None, + overwrite = True, + pidfile = "pidfile", + ecodefile = "exitcode", + stdin = None, + stdout = "stdout", + stderr = "stderr", + sudo = False, + tty = False, + raise_on_error = False): + """ + Uploads the 'command' to a bash script in the host. + Then runs the script detached in background in the host, and + busy-waites until the script finishes executing. + """ + + if not shfile.startswith("/"): + shfile = os.path.join(home, shfile) + + self.upload_command(command, + shfile = shfile, + ecodefile = ecodefile, + env = env, + overwrite = overwrite) + + command = "bash %s" % shfile + # run command in background in remote host + (out, err), proc = self.run(command, home, + pidfile = pidfile, + stdin = stdin, + stdout = stdout, + stderr = stderr, + sudo = sudo, + tty = tty) + + # check no errors occurred + if proc.poll(): + msg = " Failed to run command '%s' " % command + self.error(msg, out, err) + if raise_on_error: + raise RuntimeError, msg + + # Wait for pid file to be generated + pid, ppid = self.wait_pid( + home = home, + pidfile = pidfile, + raise_on_error = raise_on_error) + + # wait until command finishes to execute + self.wait_run(pid, ppid) + + (eout, err), proc = self.check_errors(home, + ecodefile = ecodefile, + stderr = stderr) + + # Out is what was written in the stderr file + if err: + msg = " Failed to run command '%s' " % command + self.error(msg, eout, err) + + if raise_on_error: + raise RuntimeError, msg + + (out, oerr), proc = self.check_output(home, stdout) + + return (out, err), proc + + def exitcode(self, home, ecodefile = "exitcode"): + """ + Get the exit code of an application. + Returns an integer value with the exit code + """ + (out, err), proc = self.check_output(home, ecodefile) + + # Succeeded to open file, return exit code in the file + if proc.wait() == 0: + try: + return int(out.strip()) + except: + # Error in the content of the file! + return ExitCode.CORRUPTFILE + + # No such file or directory + if proc.returncode == 1: + return ExitCode.FILENOTFOUND + + # Other error from 'cat' + return ExitCode.ERROR + + def upload_command(self, command, + shfile = "cmd.sh", + ecodefile = "exitcode", + overwrite = True, + env = None): + """ Saves the command as a bash script file in the remote host, and + forces to save the exit code of the command execution to the ecodefile + """ + + if not (command.strip().endswith(";") or command.strip().endswith("&")): + command += ";" + + # The exit code of the command will be stored in ecodefile + command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % { + 'command': command, + 'ecodefile': ecodefile, + } + + # Export environment + environ = self.format_environment(env) + + # Add environ to command + command = environ + command + + return self.upload(command, shfile, text = True, overwrite = overwrite) + + def format_environment(self, env, inline = False): + """ Formats the environment variables for a command to be executed + either as an inline command + (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or + as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n) + """ + if not env: return "" + + # Remove extra white spaces + env = re.sub(r'\s+', ' ', env.strip()) + + sep = ";" if inline else "\n" + return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep + + def check_errors(self, home, + ecodefile = "exitcode", + stderr = "stderr"): + """ Checks whether errors occurred while running a command. + It first checks the exit code for the command, and only if the + exit code is an error one it returns the error output. + + """ + proc = None + err = "" + + # get exit code saved in the 'exitcode' file + ecode = self.exitcode(home, ecodefile) + + if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]: + err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile) + elif ecode > 0 or ecode == ExitCode.FILENOTFOUND: + # The process returned an error code or didn't exist. + # Check standard error. + (err, eerr), proc = self.check_output(home, stderr) + + # If the stderr file was not found, assume nothing bad happened, + # and just ignore the error. + # (cat returns 1 for error "No such file or directory") + if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: + err = "" + + return ("", err), proc + + def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False): + """ Waits until the pid file for the command is generated, + and returns the pid and ppid of the process """ + pid = ppid = None + delay = 1.0 + + for i in xrange(2): + pidtuple = self.getpid(home = home, pidfile = pidfile) + + if pidtuple: + pid, ppid = pidtuple + break + else: + time.sleep(delay) + delay = delay * 1.5 + else: + msg = " Failed to get pid for pidfile %s/%s " % ( + home, pidfile ) + self.error(msg) + + if raise_on_error: + raise RuntimeError, msg + + return pid, ppid + + def wait_run(self, pid, ppid, trial = 0): + """ wait for a remote process to finish execution """ + delay = 1.0 + + while True: + status = self.status(pid, ppid) + + if status is ProcStatus.FINISHED: + break + elif status is not ProcStatus.RUNNING: + delay = delay * 1.5 + time.sleep(delay) + # If it takes more than 20 seconds to start, then + # asume something went wrong + if delay > 20: + break + else: + # The app is running, just wait... + time.sleep(0.5) + + def check_output(self, home, filename): + """ Retrives content of file """ + (out, err), proc = self.execute("cat %s" % + os.path.join(home, filename), retry = 1, with_lock = True) + return (out, err), proc + + def is_alive(self): + """ Checks if host is responsive + """ + if self.localhost: + return True + + out = err = "" + msg = "Unresponsive host. Wrong answer. " + + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + try: + (out, err), proc = self.execute("echo 'ALIVE'", + blocking = True, + with_lock = True) + + if out.find("ALIVE") > -1: + return True + except: + trace = traceback.format_exc() + msg = "Unresponsive host. Error reaching host: %s " % trace + + self.error(msg, out, err) + return False + + def find_home(self): + """ Retrieves host home directory + """ + # The underlying SSH layer will sometimes return an empty + # output (even if the command was executed without errors). + # To work arround this, repeat the operation N times or + # until the result is not empty string + msg = "Impossible to retrieve HOME directory" + try: + (out, err), proc = self.execute("echo ${HOME}", + blocking = True, + with_lock = True) + + if out.strip() != "": + self._home_dir = out.strip() + except: + trace = traceback.format_exc() + msg = "Impossible to retrieve HOME directory %s" % trace + + if not self._home_dir: + self.error(msg) + raise RuntimeError, msg + + def filter_existing_files(self, src, dst): + """ Removes files that already exist in the Linux host from src list + """ + # construct a dictionary with { dst: src } + dests = dict(map( + lambda s: (os.path.join(dst, os.path.basename(s)), s ), s)) \ + if len(src) > 1 else dict({dst: src[0]}) + + command = [] + for d in dests.keys(): + command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} ) + + command = ";".join(command) + + (out, err), proc = self.execute(command, retry = 1, with_lock = True) + + for d in dests.keys(): + if out.find(d) > -1: + del dests[d] + + if not dests: + return [] + + return dests.values()