Added unit tests for linux application
[nepi.git] / src / neco / resources / linux / node.py
index feaad46..4907c21 100644 (file)
-from neco.execution.resource import ResourceManager
-from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
-        rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING 
+from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux import rpmfuncs, debfuncs 
+from neco.util import sshfuncs, execfuncs 
 
-import cStringIO
+import collections
 import logging
-import os.path
-import subprocess
+import os
+import random
+import re
+import tempfile
+import time
+import threading
 
+# TODO: Verify files and dirs exists already
+# TODO: Blacklist nodes!
+# TODO: Unify delays!!
+
+reschedule_delay = "0.5s"
+
+@clsinit
 class LinuxNode(ResourceManager):
+    _rtype = "LinuxNode"
+
+    @classmethod
+    def _register_attributes(cls):
+        hostname = Attribute("hostname", "Hostname of the machine",
+                flags = Flags.ExecReadOnly)
+
+        username = Attribute("username", "Local account username", 
+                flags = Flags.Credential)
+
+        port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
+        
+        home = Attribute("home",
+                "Experiment home directory to store all experiment related files",
+                flags = Flags.ExecReadOnly)
+        
+        identity = Attribute("identity", "SSH identity file",
+                flags = Flags.Credential)
+        
+        server_key = Attribute("serverKey", "Server public key", 
+                flags = Flags.ExecReadOnly)
+        
+        clean_home = Attribute("cleanHome", "Remove all files and directories " + \
+                " from home folder before starting experiment", 
+                flags = Flags.ExecReadOnly)
+        
+        clean_processes = Attribute("cleanProcesses", 
+                "Kill all running processes before starting experiment",
+                flags = Flags.ExecReadOnly)
+        
+        tear_down = Attribute("tearDown", "Bash script to be executed before " + \
+                "releasing the resource",
+                flags = Flags.ExecReadOnly)
+
+        cls._register_attribute(hostname)
+        cls._register_attribute(username)
+        cls._register_attribute(port)
+        cls._register_attribute(home)
+        cls._register_attribute(identity)
+        cls._register_attribute(server_key)
+        cls._register_attribute(clean_home)
+        cls._register_attribute(clean_processes)
+        cls._register_attribute(tear_down)
+
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
-        self.ip = None
-        self.host = None
-        self.user = None
-        self.port = None
-        self.identity_file = None
-        self.enable_x11 = False
-        self.forward_agent = True
-
-        # packet management system - either yum or apt for now...
-        self._pm = None
-       
-        # Logging
-        loglevel = "debug"
-        self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
-                self.guid)
-        self._logger.setLevel(getattr(logging, loglevel.upper()))
-
-        # For ssh connections we use the ControlMaster option which 
-        # allows us to decrease the number of open ssh network connections.
-        # Subsequent ssh connections will reuse a same master connection.
-        # This might pose a problem when using X11 and ssh-agent, since
-        # display and agent forwarded will be those of the first connection,
-        # which created the master. 
-        # To avoid reusing a master created by a previous LinuxNode instance,
-        # we explicitly erase the ControlPath socket.
-        control_path = make_control_path(self.user, self.host, self.port)
-        try:
-            os.remove(control_path)
-        except:
-            pass
+        self._os = None
+        
+        # lock to avoid concurrency issues on methods used by applications 
+        self._lock = threading.Lock()
+
+        self._logger = logging.getLogger("LinuxNode")
+    
+    def log_message(self, msg):
+        return " guid %d - host %s - %s " % (self.guid, 
+                self.get("hostname"), msg)
 
     @property
-    def pm(self):
-        if self._pm:
-            return self._pm
+    def home(self):
+        return self.get("home") or "/tmp"
 
-        if (not (self.host or self.ip) or not self.user):
-            msg = "Can't resolve package management system. Insufficient data."
-            self._logger.error(msg)
-            raise RuntimeError(msg)
+    @property
+    def exp_dir(self):
+        exp_dir = os.path.join(self.home, self.ec.exp_id)
+        return exp_dir if exp_dir.startswith('/') or \
+                exp_dir.startswith("~/") else "~/"
 
-        out = self.execute("cat /etc/issue")
+    @property
+    def node_home(self):
+        node_home = "node-%d" % self.guid
+        return os.path.join(self.exp_dir, node_home)
 
-        if out.find("Fedora") == 0:
-            self._pm = "yum"
-        elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
-            self._pm = "apt-get"
+    @property
+    def os(self):
+        if self._os:
+            return self._os
+
+        if (not self.get("hostname") or not self.get("username")):
+            msg = "Can't resolve OS, insufficient data "
+            self.error(msg)
+            raise RuntimeError, msg
+
+        (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
+
+        if err and proc.poll():
+            msg = "Error detecting OS "
+            self.error(msg, out, err)
+            raise RuntimeError, "%s - %s - %s" %( msg, out, err )
+
+        if out.find("Fedora release 12") == 0:
+            self._os = "f12"
+        elif out.find("Fedora release 14") == 0:
+            self._os = "f14"
+        elif out.find("Debian") == 0: 
+            self._os = "debian"
+        elif out.find("Ubuntu") ==0:
+            self._os = "ubuntu"
         else:
-            msg = "Can't resolve package management system. Unknown OS."
-            self._logger.error(msg)
-            raise RuntimeError(msg)
+            msg = "Unsupported OS"
+            self.error(msg, out)
+            raise RuntimeError, "%s - %s " %( msg, out )
 
-        return self._pm
+        return self._os
 
     @property
-    def is_localhost(self):
-        return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
+    def localhost(self):
+        return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+
+    def provision(self, filters = None):
+        if not self.is_alive():
+            self._state = ResourceState.FAILED
+            self.error("Deploy failed. Unresponsive node")
+            return
+
+        if self.get("cleanProcesses"):
+            self.clean_processes()
+
+        if self.get("cleanHome"):
+            self.clean_home()
+       
+        self.mkdir(self.node_home)
+
+        super(LinuxNode, self).provision()
+
+    def deploy(self):
+        if self.state == ResourceState.NEW:
+            try:
+               self.discover()
+               self.provision()
+            except:
+                self._state = ResourceState.FAILED
+                raise
+
+        # Node needs to wait until all associated interfaces are 
+        # ready before it can finalize deployment
+        from neco.resources.linux.interface import LinuxInterface
+        ifaces = self.get_connected(LinuxInterface.rtype())
+        for iface in ifaces:
+            if iface.state < ResourceState.READY:
+                self.ec.schedule(reschedule_delay, self.deploy)
+                return 
+
+        super(LinuxNode, self).deploy()
+
+    def release(self):
+        tear_down = self.get("tearDown")
+        if tear_down:
+            self.execute(tear_down)
+
+        super(LinuxNode, self).release()
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
+    def clean_processes(self, killer = False):
+        self.info("Cleaning up processes")
+        
+        if killer:
+            # Hardcore kill
+            cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
+                "sudo -S killall python tcpdump || /bin/true ; " +
+                "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
+                "sudo -S killall -u root || /bin/true ; " +
+                "sudo -S killall -u root || /bin/true ; ")
+        else:
+            # Be gentler...
+            cmd = ("sudo -S killall tcpdump || /bin/true ; " +
+                "sudo -S killall tcpdump || /bin/true ; " +
+                "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
+                "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+
+        out = err = ""
+        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
+            
+    def clean_home(self):
+        self.info("Cleaning up home")
+
+        cmd = ("cd %s ; " % self.home +
+            "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
+            " -execdir rm -rf {} + ")
 
-    def install(self, packages):
-        if not isinstance(packages, list):
-            packages = [packages]
+        out = err = ""
+        (out, err), proc = self.execute(cmd, with_lock = True)
 
-        for p in packages:
-            self.execute("%s -y install %s" % (self.pm, p), sudo = True, 
-                    tty = True)
+    def upload(self, src, dst, text = False):
+        """ Copy content to destination
 
-    def uninstall(self, packages):
-        if not isinstance(packages, list):
-            packages = [packages]
+           src  content to copy. Can be a local file, directory or a list of files
 
-        for p in packages:
-            self.execute("%s -y remove %s" % (self.pm, p), sudo = True, 
-                    tty = True)
+           dst  destination path on the remote host (remote is always self.host)
 
-    def upload(self, src, dst):
-        if not os.path.isfile(src):
-            src = cStringIO.StringIO(src)
+           text src is text input, it must be stored into a temp file before uploading
+        """
+        # If source is a string input 
+        f = None
+        if text and not os.path.isfile(src):
+            # src is text input that should be uploaded as file
+            # create a temporal file with the content to upload
+            f = tempfile.NamedTemporaryFile(delete=False)
+            f.write(src)
+            f.close()
+            src = f.name
 
-        if not self.is_localhost:
+        if not self.localhost:
             # Build destination as <user>@<server>:<path>
-            dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
-        return self.copy(src, dst)
+            dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
+
+        result = self.copy(src, dst)
+
+        # clean up temp file
+        if f:
+            os.remove(f.name)
+
+        return result
 
     def download(self, src, dst):
-        if not self.is_localhost:
+        if not self.localhost:
             # Build destination as <user>@<server>:<path>
-            src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
+            src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
+
+    def install_packages(self, packages, home = None):
+        home = home or self.node_home
+
+        cmd = ""
+        if self.os in ["f12", "f14"]:
+            cmd = rpmfuncs.install_packages_command(self.os, packages)
+        elif self.os in ["debian", "ubuntu"]:
+            cmd = debfuncs.install_packages_command(self.os, packages)
+        else:
+            msg = "Error installing packages ( OS not known ) "
+            self.error(msg, self.os)
+            raise RuntimeError, msg
+
+        out = err = ""
+        (out, err), proc = self.run_and_wait(cmd, home, 
+            pidfile = "instpkg_pid",
+            stdout = "instpkg_out", 
+            stderr = "instpkg_err",
+            raise_on_error = True)
+
+        return (out, err), proc 
+
+    def remove_packages(self, packages, home = None):
+        home = home or self.node_home
+
+        cmd = ""
+        if self.os in ["f12", "f14"]:
+            cmd = rpmfuncs.remove_packages_command(self.os, packages)
+        elif self.os in ["debian", "ubuntu"]:
+            cmd = debfuncs.remove_packages_command(self.os, packages)
+        else:
+            msg = "Error removing packages ( OS not known ) "
+            self.error(msg)
+            raise RuntimeError, msg
+
+        out = err = ""
+        (out, err), proc = self.run_and_wait(cmd, home, 
+            pidfile = "rmpkg_pid",
+            stdout = "rmpkg_out", 
+            stderr = "rmpkg_err",
+            raise_on_error = True)
+         
+        return (out, err), proc 
+
+    def mkdir(self, path, clean = False):
+        if clean:
+            self.rmdir(path)
+
+        return self.execute("mkdir -p %s" % path, with_lock = True)
+
+    def rmdir(self, path):
+        return self.execute("rm -rf %s" % path, with_lock = True)
+
+    def run_and_wait(self, command, 
+            home = ".", 
+            pidfile = "pid", 
+            stdin = None, 
+            stdout = 'stdout', 
+            stderr = 'stderr', 
+            sudo = False,
+            tty = False,
+            raise_on_error = False):
+        """ runs a command in background on the remote host, but waits
+            until the command finishes execution.
+            This is more robust than doing a simple synchronized 'execute',
+            since in the remote host the command can continue to run detached
+            even if network disconnections occur
+        """
+        # run command in background in remote host
+        (out, err), proc = self.run(command, home, 
+                pidfile = pidfile,
+                stdin = stdin, 
+                stdout = stdout, 
+                stderr = stderr, 
+                sudo = sudo,
+                tty = tty)
+
+        # check no errors occurred
+        if proc.poll() and err:
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, out, err)
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        # Wait for pid file to be generated
+        pid, ppid = self.wait_pid(
+                home = home, 
+                pidfile = pidfile, 
+                raise_on_error = raise_on_error)
+
+        # wait until command finishes to execute
+        self.wait_run(pid, ppid)
+       
+        # check if execution errors occurred
+        (out, err), proc = self.check_output(home, stderr)
+
+        if err or out:
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, out, err)
+
+            if raise_on_error:
+                raise RuntimeError, msg
         
-    def is_alive(self, verbose = False):
-        if self.is_localhost:
+        return (out, err), proc
+    def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+        """ Waits until the pid file for the command is generated, 
+            and returns the pid and ppid of the process """
+        pid = ppid = None
+        delay = 1.0
+        for i in xrange(5):
+            pidtuple = self.checkpid(home = home, pidfile = pidfile)
+            
+            if pidtuple:
+                pid, ppid = pidtuple
+                break
+            else:
+                time.sleep(delay)
+                delay = min(30,delay*1.2)
+        else:
+            msg = " Failed to get pid for pidfile %s/%s " % (
+                    home, pidfile )
+            self.error(msg)
+            
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        return pid, ppid
+
+    def wait_run(self, pid, ppid, trial = 0):
+        """ wait for a remote process to finish execution """
+        delay = 1.0
+        first = True
+        bustspin = 0
+
+        while True:
+            status = self.status(pid, ppid)
+            
+            if status is sshfuncs.FINISHED:
+                break
+            elif status is not sshfuncs.RUNNING:
+                bustspin += 1
+                time.sleep(delay*(5.5+random.random()))
+                if bustspin > 12:
+                    break
+            else:
+                if first:
+                    first = False
+
+                time.sleep(delay*(0.5+random.random()))
+                delay = min(30,delay*1.2)
+                bustspin = 0
+
+    def check_output(self, home, filename):
+        """ checks file content """
+        (out, err), proc = self.execute("cat %s" % 
+            os.path.join(home, filename), retry = 1, with_lock = True)
+        return (out, err), proc
+
+    def is_alive(self):
+        if self.localhost:
             return True
 
+        out = err = ""
         try:
-            out = self.execute("echo 'ALIVE'",
-                timeout = 60,
-                err_on_timeout = False,
-                persistent = False)
+            (out, err), proc = self.execute("echo 'ALIVE'", with_lock = True)
         except:
-            if verbose:
-                self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+            import traceback
+            trace = traceback.format_exc()
+            msg = "Unresponsive host "
+            self.warn(msg, out, trace)
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
-            if verbose:
-                self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+            msg = "Unresponsive host "
+            self.warn(msg, out, err)
             return False
 
-    def mkdir(self, path, clean = True):
-        if clean:
-            self.rmdir(path)
-
-        return self.execute(
-            "mkdir -p %s" % path,
-            timeout = 120,
-            retry = 3
-            )
-
-    def rmdir(self, path):
-        return self.execute(
-            "rm -rf %s" % path,
-            timeout = 120,
-            retry = 3
-            )
+            # TODO!
+            #if self.check_bad_host(out,err):
+            #    self.blacklist()
 
     def copy(self, src, dst):
-        if self.is_localhost:
-            command = ["cp", "-R", src, dst]
-            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
-                    stderr=subprocess.PIPE)
-            out, err = p.communicate()
+        if self.localhost:
+            (out, err), proc =  execfuncs.lcopy(source, dest, 
+                    recursive = True)
         else:
-            (out, err), proc = eintr_retry(rcopy)(
-                src, dst, 
-                port = self.port,
-                agent = self.agent,
-                identity_file = self.identity_file)
-
-            if proc.wait():
-                msg = "Error uploading to %s got:\n%s%s" %\
-                        (self.host or self.ip, out, err)
-                self._logger.error(msg)
-                raise RuntimeError(msg)
+            with self._lock:
+                (out, err), proc = sshfuncs.rcopy(
+                    src, dst, 
+                    port = self.get("port"),
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    recursive = True)
 
-        return (out, err)
+        return (out, err), proc
 
     def execute(self, command,
             sudo = False,
             stdin = None, 
-            tty = False,
             env = None,
+            tty = False,
+            forward_x11 = False,
             timeout = None,
-            retry = 0,
+            retry = 3,
             err_on_timeout = True,
             connect_timeout = 30,
-            persistent = True):
+            persistent = True,
+            with_lock = False
+            ):
         """ Notice that this invocation will block until the
         execution finishes. If this is not the desired behavior,
         use 'run' instead."""
 
-        if self.is_localhost:
-            if env:
-                export = ''
-                for envkey, envval in env.iteritems():
-                    export += '%s=%s ' % (envkey, envval)
-                command = export + command
-
-            if sudo:
-                command = "sudo " + command
-
-            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
-                    stderr=subprocess.PIPE)
-            out, err = p.communicate()
+        if self.localhost:
+            (out, err), proc = execfuncs.lexec(command, 
+                    user = user,
+                    sudo = sudo,
+                    stdin = stdin,
+                    env = env)
         else:
-            (out, err), proc = eintr_retry(rexec)(
+            if with_lock:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rexec(
+                        command, 
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        stdin = stdin,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey"),
+                        env = env,
+                        tty = tty,
+                        forward_x11 = forward_x11,
+                        timeout = timeout,
+                        retry = retry,
+                        err_on_timeout = err_on_timeout,
+                        connect_timeout = connect_timeout,
+                        persistent = persistent
+                        )
+            else:
+                (out, err), proc = sshfuncs.rexec(
                     command, 
-                    self.host or self.ip, 
-                    self.user,
-                    port = self.port, 
-                    agent = self.forward_agent,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
                     sudo = sudo,
-                    stdin = stdin, 
-                    identity_file = self.identity_file,
-                    tty = tty,
-                    x11 = self.enable_x11,
+                    stdin = stdin,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
                     env = env,
+                    tty = tty,
+                    forward_x11 = forward_x11,
                     timeout = timeout,
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
-                    persistent = persistent)
-
-            if proc.wait():
-                msg = "Failed to execute command %s at node %s: %s %s" % \
-                        (command, self.host or self.ip, out, err,)
-                self._logger.warn(msg)
-                raise RuntimeError(msg)
+                    persistent = persistent
+                    )
 
-        return (out, err)
+        return (out, err), proc
 
-    def run(self, command, home, 
+    def run(self, command, 
+            home = None,
+            create_home = True,
+            pidfile = "pid",
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
-            sudo = False):
-        self._logger.info("Running %s", command)
-        
-        pidfile = './pid',
+            sudo = False,
+            tty = False):
 
-        if self.is_localhost:
-            if stderr == stdout:
-                stderr = '&1'
-            else:
-                stderr = ' ' + stderr
-            
-            daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
-                'command' : command,
-                'pidfile' : pidfile,
-                
-                'stdout' : stdout,
-                'stderr' : stderr,
-                'stdin' : stdin,
-            }
-            
-            cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
-                    'command' : daemon_command,
-                    
-                    'sudo' : 'sudo -S' if sudo else '',
-                    
-                    'pidfile' : pidfile,
-                    'gohome' : 'cd %s ; ' % home if home else '',
-                    'create' : 'mkdir -p %s ; ' % home if create_home else '',
-                }
-            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
-                    stderr=subprocess.PIPE)
-            out, err = p.communicate()
+        self.debug("Running command '%s'" % command)
+        
+        if self.localhost:
+            (out, err), proc = execfuncs.lspawn(command, pidfile, 
+                    stdout = stdout, 
+                    stderr = stderr, 
+                    stdin = stdin, 
+                    home = home, 
+                    create_home = create_home, 
+                    sudo = sudo,
+                    user = user) 
         else:
             # Start process in a "daemonized" way, using nohup and heavy
             # stdin/out redirection to avoid connection issues
-            (out,err), proc = rspawn(
-                command,
-                pidfile = pidfile,
-                home = home,
-                stdin = stdin if stdin is not None else '/dev/null',
-                stdout = stdout if stdout else '/dev/null',
-                stderr = stderr if stderr else '/dev/null',
-                sudo = sudo,
-                host = self.host,
-                user = self.user,
-                port = self.port,
-                agent = self.forward_agent,
-                identity_file = self.identity_file
-                )
-            
-            if proc.wait():
-                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
-
-        return (out, err)
-    
-    def checkpid(self, path):            
-        # Get PID/PPID
-        # NOTE: wait a bit for the pidfile to be created
-        pidtuple = rcheck_pid(
-            os.path.join(path, 'pid'),
-            host = self.host,
-            user = self.user,
-            port = self.port,
-            agent = self.forward_agent,
-            identity_file = self.identity_file
-            )
+            with self._lock:
+                (out,err), proc = sshfuncs.rspawn(
+                    command,
+                    pidfile = pidfile,
+                    home = home,
+                    create_home = create_home,
+                    stdin = stdin if stdin is not None else '/dev/null',
+                    stdout = stdout if stdout else '/dev/null',
+                    stderr = stderr if stderr else '/dev/null',
+                    sudo = sudo,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    tty = tty
+                    )
+
+        return (out, err), proc
+
+    def checkpid(self, home = ".", pidfile = "pid"):
+        if self.localhost:
+            pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
+        else:
+            with self._lock:
+                pidtuple = sshfuncs.rcheckpid(
+                    os.path.join(home, pidfile),
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey")
+                    )
         
         return pidtuple
     
     def status(self, pid, ppid):
-        status = rstatus(
-                pid, ppid,
-                host = self.host,
-                user = self.user,
-                port = self.port,
-                agent = self.forward_agent,
-                identity_file = self.identity_file
-                )
+        if self.localhost:
+            status = execfuncs.lstatus(pid, ppid)
+        else:
+            with self._lock:
+                status = sshfuncs.rstatus(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
            
         return status
     
     def kill(self, pid, ppid, sudo = False):
+        out = err = ""
+        proc = None
         status = self.status(pid, ppid)
-        if status == RUNNING:
-            # kill by ppid+pid - SIGTERM first, then try SIGKILL
-            rkill(
-                pid, ppid,
-                host = self.host,
-                user = self.user,
-                port = self.port,
-                agent = self.forward_agent,
-                sudo = sudo,
-                identity_file = self.identity_file
-                )
+
+        if status == sshfuncs.RUNNING:
+            if self.localhost:
+                (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
+            else:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rkill(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
+        return (out, err), proc
+
+    def check_bad_host(self, out, err):
+        badre = re.compile(r'(?:'
+                           r'|Error: disk I/O error'
+                           r')', 
+                           re.I)
+        return badre.search(out) or badre.search(err)
+
+    def blacklist(self):
+        # TODO!!!!
+        self.warn(" Blacklisting malfunctioning node ")
+        #import util
+        #util.appendBlacklist(self.hostname)