Added Linux Application
[nepi.git] / src / neco / resources / linux / node.py
index e00562f..a030eb4 100644 (file)
-from neco.execution.resource import Resource
-from neco.util.sshfuncs import eintr_retry, rexec, rcopy, \
-        rspawn, rcheck_pid, rstatus, rkill, make_control_path, RUNNING 
+from neco.execution.attribute import Attribute, Flags
+from neco.execution.resource import ResourceManager, clsinit, ResourceState
+from neco.resources.linux import rpmfuncs, debfuncs 
+from neco.util import sshfuncs, execfuncs 
 
 
-import cStringIO
+import collections
 import logging
 import logging
-import os.path
-import subprocess
+import os
+import random
+import re
+import tempfile
+import time
+import threading
 
 
-class LinuxNode(Resource):
-    def __init__(self, ec, guif):
+# TODO: Verify files and dirs exists already
+# TODO: Blacklist node!
+
+DELAY ="1s"
+
+@clsinit
+class LinuxNode(ResourceManager):
+    _rtype = "LinuxNode"
+
+    @classmethod
+    def _register_attributes(cls):
+        hostname = Attribute("hostname", "Hostname of the machine",
+                flags = Flags.ExecReadOnly)
+
+        username = Attribute("username", "Local account username", 
+                flags = Flags.Credential)
+
+        port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
+        
+        home = Attribute("home",
+                "Experiment home directory to store all experiment related files",
+                flags = Flags.ExecReadOnly)
+        
+        identity = Attribute("identity", "SSH identity file",
+                flags = Flags.Credential)
+        
+        server_key = Attribute("serverKey", "Server public key", 
+                flags = Flags.ExecReadOnly)
+        
+        clean_home = Attribute("cleanHome", "Remove all files and directories " + \
+                " from home folder before starting experiment", 
+                flags = Flags.ExecReadOnly)
+        
+        clean_processes = Attribute("cleanProcesses", 
+                "Kill all running processes before starting experiment",
+                flags = Flags.ExecReadOnly)
+        
+        tear_down = Attribute("tearDown", "Bash script to be executed before " + \
+                "releasing the resource",
+                flags = Flags.ExecReadOnly)
+
+        cls._register_attribute(hostname)
+        cls._register_attribute(username)
+        cls._register_attribute(port)
+        cls._register_attribute(home)
+        cls._register_attribute(identity)
+        cls._register_attribute(server_key)
+        cls._register_attribute(clean_home)
+        cls._register_attribute(clean_processes)
+        cls._register_attribute(tear_down)
+
+    def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         super(LinuxNode, self).__init__(ec, guid)
-        self.ip = None
-        self.host = None
-        self.user = None
-        self.port = None
-        self.identity_file = None
-        self.enable_x11 = False
-        self.forward_agent = True
-
-        # packet management system - either yum or apt for now...
-        self._pm = None
-       
-        # Logging
-        loglevel = "debug"
-        self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
-                self.guid)
-        self._logger.setLevel(getattr(logging, loglevel.upper()))
-
-        # For ssh connections we use the ControlMaster option which 
-        # allows us to decrease the number of open ssh network connections.
-        # Subsequent ssh connections will reuse a same master connection.
-        # This might pose a problem when using X11 and ssh-agent, since
-        # display and agent forwarded will be those of the first connection,
-        # which created the master. 
-        # To avoid reusing a master created by a previous LinuxNode instance,
-        # we explicitly erase the ControlPath socket.
-        control_path = make_control_path(self.user, self.host, self.port)
-        try:
-            os.remove(control_path)
-        except:
-            pass
+        self._os = None
+        
+        # lock to avoid concurrency issues on methods used by applications 
+        self._lock = threading.Lock()
+
+        self._logger = logging.getLogger("neco.linux.Node.%d " % self.guid)
 
     @property
 
     @property
-    def pm(self):
-        if self._pm:
-            return self._pm
+    def home(self):
+        return self.get("home") or "/tmp"
 
 
-        if (not (self.host or self.ip) or not self.user):
-            msg = "Can't resolve package management system. Insufficient data."
-            self._logger.error(msg)
-            raise RuntimeError(msg)
+    @property
+    def exp_dir(self):
+        exp_dir = os.path.join(self.home, self.ec.exp_id)
+        return exp_dir if exp_dir.startswith('/') else "${HOME}/"
 
 
-        out = self.execute("cat /etc/issue")
+    @property
+    def node_dir(self):
+        node_dir = "node-%d" % self.guid
+        return os.path.join(self.exp_dir, node_dir)
 
 
-        if out.find("Fedora") == 0:
-            self._pm = "yum"
-        elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
-            self._pm = "apt-get"
+    @property
+    def os(self):
+        if self._os:
+            return self._os
+
+        if (not self.get("hostname") or not self.get("username")):
+            msg = "Can't resolve OS for guid %d. Insufficient data." % self.guid
+            self.logger.error(msg)
+            raise RuntimeError, msg
+
+        (out, err), proc = self.execute("cat /etc/issue")
+
+        if err and proc.poll():
+            msg = "Error detecting OS for host %s. err: %s " % (self.get("hostname"), err)
+            self.logger.error(msg)
+            raise RuntimeError, msg
+
+        if out.find("Fedora release 12") == 0:
+            self._os = "f12"
+        elif out.find("Fedora release 14") == 0:
+            self._os = "f14"
+        elif out.find("Debian") == 0: 
+            self._os = "debian"
+        elif out.find("Ubuntu") ==0:
+            self._os = "ubuntu"
         else:
         else:
-            msg = "Can't resolve package management system. Unknown OS."
-            self._logger.error(msg)
-            raise RuntimeError(msg)
+            msg = "Unsupported OS %s for host %s" % (out, self.get("hostname"))
+            self.logger.error(msg)
+            raise RuntimeError, msg
 
 
-        return self._pm
+        return self._os
 
     @property
 
     @property
-    def is_localhost(self):
-        return ( self.host or self.ip ) in ['localhost', '127.0.0.7', '::1']
+    def localhost(self):
+        return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+
+    def provision(self, filters = None):
+        if not self.is_alive():
+            self._state = ResourceState.FAILED
+            self.logger.error("Deploy failed. Unresponsive node")
+            return
+
+        if self.get("cleanProcesses"):
+            self.clean_processes()
+
+        if self.get("cleanHome"):
+            self.clean_home()
+       
+        self.mkdir(self.node_dir)
+
+        super(LinuxNode, self).provision()
 
 
-    def install(self, packages):
-        if not isinstance(packages, list):
-            packages = [packages]
+    def deploy(self):
+        if self.state == ResourceState.NEW:
+            self.discover()
+            self.provision()
 
 
-        for p in packages:
-            self.execute("%s -y install %s" % (self.pm, p), sudo = True, 
-                    tty = True)
+        # Node needs to wait until all associated interfaces are 
+        # ready before it can finalize deployment
+        from neco.resources.linux.interface import LinuxInterface
+        ifaces = self.get_connected(LinuxInterface.rtype())
+        for iface in ifaces:
+            if iface.state < ResourceState.READY:
+                self.ec.schedule(DELAY, self.deploy)
+                return 
 
 
-    def uninstall(self, packages):
-        if not isinstance(packages, list):
-            packages = [packages]
+        super(LinuxNode, self).deploy()
 
 
-        for p in packages:
-            self.execute("%s -y remove %s" % (self.pm, p), sudo = True, 
-                    tty = True)
+    def release(self):
+        tear_down = self.get("tearDown")
+        if tear_down:
+            self.execute(tear_down)
 
 
-    def upload(self, src, dst):
-        if not os.path.isfile(src):
-            src = cStringIO.StringIO(src)
+        super(LinuxNode, self).release()
 
 
-        if not self.is_localhost:
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
+    def clean_processes(self):
+        self.logger.info("Cleaning up processes")
+        
+        cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
+            "sudo -S killall python tcpdump || /bin/true ; " +
+            "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
+            "sudo -S killall -u root || /bin/true ; " +
+            "sudo -S killall -u root || /bin/true ; ")
+
+        out = err = ""
+        with self._lock:
+           (out, err), proc = self.execute(cmd) 
+            
+    def clean_home(self):
+        self.logger.info("Cleaning up home")
+
+        cmd = ("cd %s ; " % self.home +
+            "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)"+
+            " -execdir rm -rf {} + ")
+
+        out = err = ""
+        with self._lock:
+            (out, err), proc = self.execute(cmd)
+
+    def upload(self, src, dst, text = False):
+        """ Copy content to destination
+
+           src  content to copy. Can be a local file, directory or a list of files
+
+           dst  destination path on the remote host (remote is always self.host)
+
+           text src is text input, it must be stored into a temp file before uploading
+        """
+        # If source is a string input 
+        f = None
+        if text and not os.path.isfile(src):
+            # src is text input that should be uploaded as file
+            # create a temporal file with the content to upload
+            f = tempfile.NamedTemporaryFile(delete=False)
+            f.write(src)
+            f.close()
+            src = f.name
+
+        if not self.localhost:
             # Build destination as <user>@<server>:<path>
             # Build destination as <user>@<server>:<path>
-            dst = "%s@%s:%s" % (self.user, self.host or self.ip, dst)
-        return self.copy(src, dst)
+            dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
+
+        result = self.copy(src, dst)
+
+        # clean up temp file
+        if f:
+            os.remove(f.name)
+
+        return result
 
     def download(self, src, dst):
 
     def download(self, src, dst):
-        if not self.is_localhost:
+        if not self.localhost:
             # Build destination as <user>@<server>:<path>
             # Build destination as <user>@<server>:<path>
-            src = "%s@%s:%s" % (self.user, self.host or self.ip, src)
+            src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
         return self.copy(src, dst)
+
+    def install_packages(self, packages, home = None):
+        home = home or self.node_dir
+
+        cmd = ""
+        if self.os in ["f12", "f14"]:
+            cmd = rpmfuncs.install_packages_command(self.os, packages)
+        elif self.os in ["debian", "ubuntu"]:
+            cmd = debfuncs.install_packages_command(self.os, packages)
+        else:
+            msg = "Error installing packages. OS not known for host %s " % (
+                    self.get("hostname"))
+            self.logger.error(msg)
+            raise RuntimeError, msg
+
+        out = err = ""
+        with self._lock:
+            (out, err), proc = self.run_and_wait(cmd, home, 
+                pidfile = "instpkg_pid",
+                stdout = "instpkg_log", 
+                stderr = "instpkg_err", 
+                raise_on_error = True)
+
+        return (out, err), proc 
+
+    def remove_packages(self, packages, home = None):
+        home = home or self.node_dir
+
+        cmd = ""
+        if self.os in ["f12", "f14"]:
+            cmd = rpmfuncs.remove_packages_command(self.os, packages)
+        elif self.os in ["debian", "ubuntu"]:
+            cmd = debfuncs.remove_packages_command(self.os, packages)
+        else:
+            msg = "Error removing packages. OS not known for host %s " % (
+                    self.get("hostname"))
+            self.logger.error(msg)
+            raise RuntimeError, msg
+
+        out = err = ""
+        with self._lock:
+            (out, err), proc = self.run_and_wait(cmd, home, 
+                pidfile = "rmpkg_pid",
+                stdout = "rmpkg_log", 
+                stderr = "rmpkg_err", 
+                raise_on_error = True)
+         
+        return (out, err), proc 
+
+    def mkdir(self, path, clean = False):
+        if clean:
+            self.rmdir(path)
+
+        return self.execute("mkdir -p %s" % path)
+
+    def rmdir(self, path):
+        return self.execute("rm -rf %s" % path)
+
+    def run_and_wait(self, command, 
+            home = ".", 
+            pidfile = "pid", 
+            stdin = None, 
+            stdout = 'stdout', 
+            stderr = 'stderr', 
+            sudo = False,
+            raise_on_error = False):
+        """ runs a command in background on the remote host, but waits
+            until the command finishes execution.
+            This is more robust than doing a simple synchronized 'execute',
+            since in the remote host the command can continue to run detached
+            even if network disconnections occur
+        """
+        # run command in background in remote host
+        (out, err), proc = self.run(command, home, 
+                pidfile = pidfile,
+                stdin = stdin, 
+                stdout = stdout, 
+                stderr = stderr, 
+                sudo = sudo)
+
+        # check no errors occurred
+        if proc.poll() and err:
+            msg = " Failed to run command %s on host %s" % (
+                    command, self.get("hostname"))
+            self.logger.error(msg)
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        # Wait for pid file to be generated
+        pid, ppid = self.wait_pid(
+                home = home, 
+                pidfile = pidfile, 
+                raise_on_error = raise_on_error)
+
+        # wait until command finishes to execute
+        self.wait_run(pid, ppid)
+       
+        # check if execution errors occurred
+        (out, err), proc = self.check_output(home, stderr)
+
+        if err or out:
+            msg = "Error while running command %s on host %s. error output: %s" % (
+                    command, self.get("hostname"), out)
+            if err:
+                msg += " . err: %s" % err
+
+            self.logger.error(msg)
+            if raise_on_error:
+                raise RuntimeError, msg
         
         
-    def is_alive(self, verbose = False):
-        if self.is_localhost:
+        return (out, err), proc
+    def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
+        """ Waits until the pid file for the command is generated, 
+            and returns the pid and ppid of the process """
+        pid = ppid = None
+        delay = 1.0
+        for i in xrange(5):
+            pidtuple = self.checkpid(home = home, pidfile = pidfile)
+            
+            if pidtuple:
+                pid, ppid = pidtuple
+                break
+            else:
+                time.sleep(delay)
+                delay = min(30,delay*1.2)
+        else:
+            msg = " Failed to get pid for pidfile %s/%s on host %s" % (
+                    home, pidfile, self.get("hostname"))
+            self.logger.error(msg)
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        return pid, ppid
+
+    def wait_run(self, pid, ppid, trial = 0):
+        """ wait for a remote process to finish execution """
+        delay = 1.0
+        first = True
+        bustspin = 0
+
+        while True:
+            status = self.status(pid, ppid)
+            
+            if status is sshfuncs.FINISHED:
+                break
+            elif status is not sshfuncs.RUNNING:
+                bustspin += 1
+                time.sleep(delay*(5.5+random.random()))
+                if bustspin > 12:
+                    break
+            else:
+                if first:
+                    first = False
+
+                time.sleep(delay*(0.5+random.random()))
+                delay = min(30,delay*1.2)
+                bustspin = 0
+
+    def check_output(self, home, filename):
+        """ checks file content """
+        (out, err), proc = self.execute("cat %s" % 
+                os.path.join(home, filename))
+        return (out, err), proc
+
+    def is_alive(self):
+        if self.localhost:
             return True
 
             return True
 
+        out = err = ""
         try:
         try:
-            out = self.execute("echo 'ALIVE'",
-                timeout = 60,
-                err_on_timeout = False,
-                persistent = False)
+            (out, err), proc = self.execute("echo 'ALIVE'")
         except:
         except:
-            if verbose:
-                self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+            import traceback
+            trace = traceback.format_exc()
+            self.logger.warn("Unresponsive host %s. got:\n out: %s err: %s\n traceback: %s", 
+                    self.get("hostname"), out, err, trace)
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
             return False
 
         if out.strip().startswith('ALIVE'):
             return True
         else:
-            if verbose:
-                self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
+            self.logger.warn("Unresponsive host %s. got:\n%s%s", 
+                    self.get("hostname"), out, err)
             return False
 
             return False
 
-    def mkdir(self, path, clean = True):
-        if clean:
-            self.rmdir(path)
-
-        return self.execute(
-            "mkdir -p %s" % path,
-            timeout = 120,
-            retry = 3
-            )
-
-    def rmdir(self, path):
-        return self.execute(
-            "rm -rf %s" % path,
-            timeout = 120,
-            retry = 3
-            )
+            # TODO!
+            #if self.check_bad_host(out,err):
+            #    self.blacklist()
 
     def copy(self, src, dst):
 
     def copy(self, src, dst):
-        if self.is_localhost:
-            command = ["cp", "-R", src, dst]
-            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
-                    stderr=subprocess.PIPE)
-            out, err = p.communicate()
+        if self.localhost:
+            (out, err), proc =  execfuncs.lcopy(source, dest, 
+                    recursive = True)
         else:
         else:
-            (out, err), proc = eintr_retry(rcopy)(
+            (out, err), proc = self.safe_retry(sshfuncs.rcopy)(
                 src, dst, 
                 src, dst, 
-                port = self.port,
-                agent = self.agent,
-                identity_file = self.identity_file)
-
-            if proc.wait():
-                msg = "Error uploading to %s got:\n%s%s" %\
-                        (self.host or self.ip, out, err)
-                self._logger.error(msg)
-                raise RuntimeError(msg)
+                port = self.get("port"),
+                identity = self.get("identity"),
+                server_key = self.get("serverKey"),
+                recursive = True)
 
 
-        return (out, err)
+        return (out, err), proc
 
     def execute(self, command,
             sudo = False,
             stdin = None, 
 
     def execute(self, command,
             sudo = False,
             stdin = None, 
-            tty = False,
             env = None,
             env = None,
+            tty = False,
+            forward_x11 = False,
             timeout = None,
             retry = 0,
             err_on_timeout = True,
             connect_timeout = 30,
             timeout = None,
             retry = 0,
             err_on_timeout = True,
             connect_timeout = 30,
-            persistent = True):
+            persistent = True
+            ):
         """ Notice that this invocation will block until the
         execution finishes. If this is not the desired behavior,
         use 'run' instead."""
 
         """ Notice that this invocation will block until the
         execution finishes. If this is not the desired behavior,
         use 'run' instead."""
 
-        if self.is_localhost:
-            if env:
-                export = ''
-                for envkey, envval in env.iteritems():
-                    export += '%s=%s ' % (envkey, envval)
-                command = export + command
-
-            if sudo:
-                command = "sudo " + command
-
-            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
-                    stderr=subprocess.PIPE)
-            out, err = p.communicate()
+        if self.localhost:
+            (out, err), proc = execfuncs.lexec(command, 
+                    user = user,
+                    sudo = sudo,
+                    stdin = stdin,
+                    env = env)
         else:
         else:
-            (out, err), proc = eintr_retry(rexec)(
+            (out, err), proc = self.safe_retry(sshfuncs.rexec)(
                     command, 
                     command, 
-                    self.host or self.ip, 
-                    self.user,
-                    port = self.port, 
-                    agent = self.forward_agent,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
                     sudo = sudo,
                     sudo = sudo,
-                    stdin = stdin, 
-                    identity_file = self.identity_file,
-                    tty = tty,
-                    x11 = self.enable_x11,
+                    stdin = stdin,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
                     env = env,
                     env = env,
+                    tty = tty,
+                    forward_x11 = forward_x11,
                     timeout = timeout,
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
                     timeout = timeout,
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
-                    persistent = persistent)
-
-            if proc.wait():
-                msg = "Failed to execute command %s at node %s: %s %s" % \
-                        (command, self.host or self.ip, out, err,)
-                self._logger.warn(msg)
-                raise RuntimeError(msg)
+                    persistent = persistent
+                    )
 
 
-        return (out, err)
+        return (out, err), proc
 
 
-    def run(self, command, home, 
+    def run(self, command, 
+            home = None,
+            create_home = True,
+            pidfile = "pid",
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False):
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False):
-        self._logger.info("Running %s", command)
-        
-        pidfile = './pid',
 
 
-        if self.is_localhost:
-            if stderr == stdout:
-                stderr = '&1'
-            else:
-                stderr = ' ' + stderr
-            
-            daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
-                'command' : command,
-                'pidfile' : pidfile,
-                
-                'stdout' : stdout,
-                'stderr' : stderr,
-                'stdin' : stdin,
-            }
-            
-            cmd = "%(create)s%(gohome)s rm -f %(pidfile)s ; %(sudo)s nohup bash -c '%(command)s' " % {
-                    'command' : daemon_command,
-                    
-                    'sudo' : 'sudo -S' if sudo else '',
-                    
-                    'pidfile' : pidfile,
-                    'gohome' : 'cd %s ; ' % home if home else '',
-                    'create' : 'mkdir -p %s ; ' % home if create_home else '',
-                }
-            p = subprocess.Popen(command, stdout=subprocess.PIPE, 
-                    stderr=subprocess.PIPE)
-            out, err = p.communicate()
+        self.logger.info("Running %s", command)
+        
+        if self.localhost:
+            (out, err), proc = execfuncs.lspawn(command, pidfile, 
+                    stdout = stdout, 
+                    stderr = stderr, 
+                    stdin = stdin, 
+                    home = home, 
+                    create_home = create_home, 
+                    sudo = sudo,
+                    user = user) 
         else:
             # Start process in a "daemonized" way, using nohup and heavy
             # stdin/out redirection to avoid connection issues
         else:
             # Start process in a "daemonized" way, using nohup and heavy
             # stdin/out redirection to avoid connection issues
-            (out,err), proc = rspawn(
+            (out,err), proc = self.safe_retry(sshfuncs.rspawn)(
                 command,
                 pidfile = pidfile,
                 home = home,
                 command,
                 pidfile = pidfile,
                 home = home,
+                create_home = create_home,
                 stdin = stdin if stdin is not None else '/dev/null',
                 stdout = stdout if stdout else '/dev/null',
                 stderr = stderr if stderr else '/dev/null',
                 sudo = sudo,
                 stdin = stdin if stdin is not None else '/dev/null',
                 stdout = stdout if stdout else '/dev/null',
                 stderr = stderr if stderr else '/dev/null',
                 sudo = sudo,
-                host = self.host,
-                user = self.user,
-                port = self.port,
-                agent = self.forward_agent,
-                identity_file = self.identity_file
+                host = self.get("hostname"),
+                user = self.get("username"),
+                port = self.get("port"),
+                agent = True,
+                identity = self.get("identity"),
+                server_key = self.get("serverKey")
                 )
                 )
-            
-            if proc.wait():
-                raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
 
 
-        return (out, err)
-    
-    def checkpid(self, path):            
-        # Get PID/PPID
-        # NOTE: wait a bit for the pidfile to be created
-        pidtuple = rcheck_pid(
-            os.path.join(path, 'pid'),
-            host = self.host,
-            user = self.user,
-            port = self.port,
-            agent = self.forward_agent,
-            identity_file = self.identity_file
-            )
+        return (out, err), proc
+
+    def checkpid(self, home = ".", pidfile = "pid"):
+        if self.localhost:
+            pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
+        else:
+            pidtuple = sshfuncs.rcheckpid(
+                os.path.join(home, pidfile),
+                host = self.get("hostname"),
+                user = self.get("username"),
+                port = self.get("port"),
+                agent = True,
+                identity = self.get("identity"),
+                server_key = self.get("serverKey")
+                )
         
         return pidtuple
     
     def status(self, pid, ppid):
         
         return pidtuple
     
     def status(self, pid, ppid):
-        status = rstatus(
-                pid, ppid,
-                host = self.host,
-                user = self.user,
-                port = self.port,
-                agent = self.forward_agent,
-                identity_file = self.identity_file
-                )
+        if self.localhost:
+            status = execfuncs.lstatus(pid, ppid)
+        else:
+            status = sshfuncs.rstatus(
+                    pid, ppid,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey")
+                    )
            
         return status
     
     def kill(self, pid, ppid, sudo = False):
            
         return status
     
     def kill(self, pid, ppid, sudo = False):
+        out = err = ""
+        proc = None
         status = self.status(pid, ppid)
         status = self.status(pid, ppid)
-        if status == RUNNING:
-            # kill by ppid+pid - SIGTERM first, then try SIGKILL
-            rkill(
-                pid, ppid,
-                host = self.host,
-                user = self.user,
-                port = self.port,
-                agent = self.forward_agent,
-                sudo = sudo,
-                identity_file = self.identity_file
-                )
+
+        if status == sshfuncs.RUNNING:
+            if self.localhost:
+                (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
+            else:
+                (out, err), proc = self.safe_retry(sshfuncs.rkill)(
+                    pid, ppid,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    sudo = sudo,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey")
+                    )
+        return (out, err), proc
+
+    def check_bad_host(self, out, err):
+        badre = re.compile(r'(?:'
+                           r'|Error: disk I/O error'
+                           r')', 
+                           re.I)
+        return badre.search(out) or badre.search(err)
+
+    def blacklist(self):
+        # TODO!!!!
+        self.logger.warn("Blacklisting malfunctioning node %s", self.hostname)
+        #import util
+        #util.appendBlacklist(self.hostname)
+
+    def safe_retry(self, func):
+        """Retries a function invocation using a lock"""
+        import functools
+        @functools.wraps(func)
+        def rv(*p, **kw):
+            fail_msg = " Failed to execute function %s(%s, %s) at host %s" % (
+                func.__name__, p, kw, self.get("hostname"))
+            retry = kw.pop("_retry", False)
+            wlock = kw.pop("_with_lock", False)
+
+            out = err = ""
+            proc = None
+            for i in xrange(0 if retry else 4):
+                try:
+                    if wlock:
+                        with self._lock:
+                            (out, err), proc = func(*p, **kw)
+                    else:
+                        (out, err), proc = func(*p, **kw)
+                        
+                    if proc.poll():
+                        if retry:
+                            time.sleep(i*15)
+                            continue
+                        else:
+                            self.logger.error("%s. out: %s error: %s", fail_msg, out, err)
+                    break
+                except RuntimeError, e:
+                    if i >= 3:
+                        self.logger.error("%s. error: %s", fail_msg, e.args)
+            return (out, err), proc
+
+        return rv