turn off using blowfish for scp transfers - not available on a stock ubuntu-15.04...
[nepi.git] / src / nepi / util / sshfuncs.py
index a8a9b86..a01bded 100644 (file)
@@ -1,3 +1,24 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Claudio Freire <claudio-daniel.freire@inria.fr>
+
+## TODO: This code needs reviewing !!!
+
 import base64
 import errno
 import hashlib
@@ -13,6 +34,8 @@ import threading
 import time
 import tempfile
 
+_re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
+
 logger = logging.getLogger("sshfuncs")
 
 def log(msg, level, out = None, err = None):
@@ -24,7 +47,6 @@ def log(msg, level, out = None, err = None):
 
     logger.log(level, msg)
 
-
 if hasattr(os, "devnull"):
     DEV_NULL = os.devnull
 else:
@@ -38,24 +60,36 @@ class STDOUT:
     redirect to whatever stdout was redirected to.
     """
 
-class RUNNING:
-    """
-    Process is still running
-    """
-
-class FINISHED:
+class ProcStatus:
     """
-    Process is finished
+    Codes for status of remote spawned process
     """
+    # Process is still running
+    RUNNING = 1
 
-class NOT_STARTED:
-    """
-    Process hasn't started running yet (this should be very rare)
-    """
+    # Process is finished
+    FINISHED = 2
+    
+    # Process hasn't started running yet (this should be very rare)
+    NOT_STARTED = 3
 
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
 
+def resolve_hostname(host):
+    ip = None
+
+    if host in ["localhost", "127.0.0.1", "::1"]:
+        p = subprocess.Popen("ip -o addr list", shell=True,
+                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        stdout, stderr = p.communicate()
+        m = _re_inet.findall(stdout)
+        ip = m[0][1].split("/")[0]
+    else:
+        ip = socket.gethostbyname(host)
+
+    return ip
+
 def gethostbyname(host):
     global hostbyname_cache
     global hostbyname_cache_lock
@@ -63,7 +97,7 @@ def gethostbyname(host):
     hostbyname = hostbyname_cache.get(host)
     if not hostbyname:
         with hostbyname_cache_lock:
-            hostbyname = socket.gethostbyname(host)
+            hostbyname = resolve_hostname(host)
             hostbyname_cache[host] = hostbyname
 
             msg = " Added hostbyname %s - %s " % (host, hostbyname)
@@ -187,27 +221,29 @@ def eintr_retry(func):
     return rv
 
 def rexec(command, host, user, 
-        port = None, 
+        port = None,
+        gwuser = None,
+        gw = None, 
         agent = True,
         sudo = False,
-        stdin = None,
         identity = None,
         server_key = None,
         env = None,
         tty = False,
-        timeout = None,
-        retry = 3,
-        err_on_timeout = True,
         connect_timeout = 30,
+        retry = 3,
         persistent = True,
         forward_x11 = False,
+        blocking = True,
         strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
     """
-    
+
     tmp_known_hosts = None
-    hostip = gethostbyname(host)
+    if not gw:
+        hostip = gethostbyname(host)
+    else: hostip = None
 
     args = ['ssh', '-C',
             # Don't bother with localhost. Makes test easier
@@ -216,6 +252,7 @@ def rexec(command, host, user,
             '-o', 'ConnectionAttempts=3',
             '-o', 'ServerAliveInterval=30',
             '-o', 'TCPKeepAlive=yes',
+            '-o', 'Batchmode=yes',
             '-l', user, hostip or host]
 
     if persistent and openssh_has_persist():
@@ -228,6 +265,10 @@ def rexec(command, host, user,
         # Do not check for Host key. Unsafe.
         args.extend(['-o', 'StrictHostKeyChecking=no'])
 
+    if gw:
+        proxycommand = _proxy_command(gw, gwuser, identity)
+        args.extend(['-o', proxycommand])
+
     if agent:
         args.append('-A')
 
@@ -235,6 +276,7 @@ def rexec(command, host, user,
         args.append('-p%d' % port)
 
     if identity:
+        identity = os.path.expanduser(identity)
         args.extend(('-i', identity))
 
     if tty:
@@ -249,57 +291,30 @@ def rexec(command, host, user,
         tmp_known_hosts = make_server_key_args(server_key, host, port)
         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
-    args.append(command)
-
-    for x in xrange(retry):
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args,
-                env = env,
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-        
-        # attach tempfile object to the process, to make sure the file stays
-        # alive until the process is finished with it
-        proc._known_hosts = tmp_known_hosts
-    
-        try:
-            out, err = _communicate(proc, stdin, timeout, err_on_timeout)
-            msg = " rexec - host %s - command %s " % (host, " ".join(args))
-            log(msg, logging.DEBUG, out, err)
+    if sudo:
+        command = "sudo " + command
 
-            if proc.poll():
-                skip = False
+    args.append(command)
 
-                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
-                    # SSH error, can safely retry
-                    skip = True 
-                elif retry:
-                    # Probably timed out or plain failed but can retry
-                    skip = True 
-                
-                if skip:
-                    t = x*2
-                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                            t, x, host, " ".join(args))
-                    log(msg, logging.DEBUG)
+    log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
 
-                    time.sleep(t)
-                    continue
-            break
-        except RuntimeError, e:
-            msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-            log(msg, logging.DEBUG, out, err)
+    stdout = stderr = stdin = subprocess.PIPE
+    if forward_x11:
+        stdout = stderr = stdin = None
 
-            if retry <= 0:
-                raise
-            retry -= 1
-        
-    return ((out, err), proc)
+    return _retry_rexec(args, log_msg, 
+            stderr = stderr,
+            stdin = stdin,
+            stdout = stdout,
+            env = env, 
+            retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = blocking)
 
 def rcopy(source, dest,
-        port = None, 
-        agent = True, 
+        port = None,
+        gwuser = None,
+        gw = None,
         recursive = False,
         identity = None,
         server_key = None,
@@ -311,298 +326,135 @@ def rcopy(source, dest,
     Source and destination should have the user and host encoded
     as per scp specs.
     
-    If source is a file object, a special mode will be used to
-    create the remote file with the same contents.
-    
-    If dest is a file object, the remote file (source) will be
-    read and written into dest.
-    
-    In these modes, recursive cannot be True.
-    
-    Source can be a list of files to copy to a single destination,
-    in which case it is advised that the destination be a folder.
+    Source can be a list of files to copy to a single destination, 
+    (in which case it is advised that the destination be a folder),
+    or a single file in a string.
     """
-    
-    if isinstance(source, file) and source.tell() == 0:
-        source = source.name
-    elif hasattr(source, 'read'):
-        tmp = tempfile.NamedTemporaryFile()
-        while True:
-            buf = source.read(65536)
-            if buf:
-                tmp.write(buf)
-            else:
-                break
-        tmp.seek(0)
-        source = tmp.name
-    
-    if isinstance(source, file) or isinstance(dest, file) \
-            or hasattr(source, 'read')  or hasattr(dest, 'write'):
-        assert not recursive
-        
-        # Parse source/destination as <user>@<server>:<path>
-        if isinstance(dest, basestring) and ':' in dest:
-            remspec, path = dest.split(':',1)
-        elif isinstance(source, basestring) and ':' in source:
-            remspec, path = source.split(':',1)
-        else:
-            raise ValueError, "Both endpoints cannot be local"
-        user,host = remspec.rsplit('@',1)
-        
-        tmp_known_hosts = None
-        hostip = gethostbyname(host)
-        
-        args = ['ssh', '-l', user, '-C',
-                # Don't bother with localhost. Makes test easier
-                '-o', 'NoHostAuthenticationForLocalhost=yes',
-                '-o', 'ConnectTimeout=60',
-                '-o', 'ConnectionAttempts=3',
-                '-o', 'ServerAliveInterval=30',
-                '-o', 'TCPKeepAlive=yes',
-                hostip or host ]
 
-        if openssh_has_persist():
-            args.extend([
-                '-o', 'ControlMaster=auto',
-                '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
-                '-o', 'ControlPersist=60' ])
-
-        if port:
-            args.append('-P%d' % port)
-
-        if identity:
-            args.extend(('-i', identity))
+    # Parse destination as <user>@<server>:<path>
+    if isinstance(dest, str) and ':' in dest:
+        remspec, path = dest.split(':',1)
+    elif isinstance(source, str) and ':' in source:
+        remspec, path = source.split(':',1)
+    else:
+        raise ValueError, "Both endpoints cannot be local"
+    user,host = remspec.rsplit('@',1)
+    
+    # plain scp
+    tmp_known_hosts = None
 
-        if server_key:
-            # Create a temporary server key file
-            tmp_known_hosts = make_server_key_args(server_key, host, port)
-            args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
-        
-        if isinstance(source, file) or hasattr(source, 'read'):
-            args.append('cat > %s' % (shell_escape(path),))
-        elif isinstance(dest, file) or hasattr(dest, 'write'):
-            args.append('cat %s' % (shell_escape(path),))
-        else:
-            raise AssertionError, "Unreachable code reached! :-Q"
-        
-        # connects to the remote host and starts a remote connection
-        if isinstance(source, file):
-            proc = subprocess.Popen(args, 
-                    stdout = open('/dev/null','w'),
-                    stderr = subprocess.PIPE,
-                    stdin = source)
-            err = proc.stderr.read()
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,err), proc)
-        elif isinstance(dest, file):
-            proc = subprocess.Popen(args, 
-                    stdout = open('/dev/null','w'),
-                    stderr = subprocess.PIPE,
-                    stdin = source)
-            err = proc.stderr.read()
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,err), proc)
-        elif hasattr(source, 'read'):
-            # file-like (but not file) source
-            proc = subprocess.Popen(args, 
-                    stdout = open('/dev/null','w'),
-                    stderr = subprocess.PIPE,
-                    stdin = subprocess.PIPE)
-            
-            buf = None
-            err = []
-            while True:
-                if not buf:
-                    buf = source.read(4096)
-                if not buf:
-                    #EOF
-                    break
-                
-                rdrdy, wrdy, broken = select.select(
-                    [proc.stderr],
-                    [proc.stdin],
-                    [proc.stderr,proc.stdin])
-                
-                if proc.stderr in rdrdy:
-                    # use os.read for fully unbuffered behavior
-                    err.append(os.read(proc.stderr.fileno(), 4096))
-                
-                if proc.stdin in wrdy:
-                    proc.stdin.write(buf)
-                    buf = None
-                
-                if broken:
-                    break
-            proc.stdin.close()
-            err.append(proc.stderr.read())
-                
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,''.join(err)), proc)
-        elif hasattr(dest, 'write'):
-            # file-like (but not file) dest
-            proc = subprocess.Popen(args, 
-                    stdout = subprocess.PIPE,
-                    stderr = subprocess.PIPE,
-                    stdin = open('/dev/null','w'))
+    args = ['scp', '-q', '-p', '-C',
+            # 2015-06-01 Thierry: I am commenting off blowfish
+            # as this is not available on a plain ubuntu 15.04 install
+            # this IMHO is too fragile, shoud be something the user
+            # decides explicitly (so he is at least aware of that dependency)
+            # Speed up transfer using blowfish cypher specification which is 
+            # faster than the default one (3des)
+            # '-c', 'blowfish',
+            # Don't bother with localhost. Makes test easier
+            '-o', 'NoHostAuthenticationForLocalhost=yes',
+            '-o', 'ConnectTimeout=60',
+            '-o', 'ConnectionAttempts=3',
+            '-o', 'ServerAliveInterval=30',
+            '-o', 'TCPKeepAlive=yes' ]
             
-            buf = None
-            err = []
-            while True:
-                rdrdy, wrdy, broken = select.select(
-                    [proc.stderr, proc.stdout],
-                    [],
-                    [proc.stderr, proc.stdout])
-                
-                if proc.stderr in rdrdy:
-                    # use os.read for fully unbuffered behavior
-                    err.append(os.read(proc.stderr.fileno(), 4096))
-                
-                if proc.stdout in rdrdy:
-                    # use os.read for fully unbuffered behavior
-                    buf = os.read(proc.stdout.fileno(), 4096)
-                    dest.write(buf)
-                    
-                    if not buf:
-                        #EOF
-                        break
-                
-                if broken:
-                    break
-            err.append(proc.stderr.read())
-                
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,''.join(err)), proc)
-        else:
-            raise AssertionError, "Unreachable code reached! :-Q"
-    else:
-        # Parse destination as <user>@<server>:<path>
-        if isinstance(dest, basestring) and ':' in dest:
-            remspec, path = dest.split(':',1)
-        elif isinstance(source, basestring) and ':' in source:
-            remspec, path = source.split(':',1)
-        else:
-            raise ValueError, "Both endpoints cannot be local"
-        user,host = remspec.rsplit('@',1)
-        
-        # plain scp
-        tmp_known_hosts = None
-
-        args = ['scp', '-q', '-p', '-C',
-                # Don't bother with localhost. Makes test easier
-                '-o', 'NoHostAuthenticationForLocalhost=yes',
-                '-o', 'ConnectTimeout=60',
-                '-o', 'ConnectionAttempts=3',
-                '-o', 'ServerAliveInterval=30',
-                '-o', 'TCPKeepAlive=yes' ]
-                
-        if port:
-            args.append('-P%d' % port)
+    if port:
+        args.append('-P%d' % port)
 
-        if recursive:
-            args.append('-r')
+    if gw:
+        proxycommand = _proxy_command(gw, gwuser, identity)
+        args.extend(['-o', proxycommand])
 
-        if identity:
-            args.extend(('-i', identity))
+    if recursive:
+        args.append('-r')
 
-        if server_key:
-            # Create a temporary server key file
-            tmp_known_hosts = make_server_key_args(server_key, host, port)
-            args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+    if identity:
+        identity = os.path.expanduser(identity)
+        args.extend(('-i', identity))
 
-        if not strict_host_checking:
-            # Do not check for Host key. Unsafe.
-            args.extend(['-o', 'StrictHostKeyChecking=no'])
+    if server_key:
+        # Create a temporary server key file
+        tmp_known_hosts = make_server_key_args(server_key, host, port)
+        args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
-        if isinstance(source,list):
-            args.extend(source)
-        else:
-            if openssh_has_persist():
-                args.extend([
-                    '-o', 'ControlMaster=auto',
-                    '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
-                    ])
-            args.append(source)
+    if not strict_host_checking:
+        # Do not check for Host key. Unsafe.
+        args.extend(['-o', 'StrictHostKeyChecking=no'])
+    
+    if isinstance(source, list):
+        args.extend(source)
+    else:
+        if openssh_has_persist():
+            args.extend([
+                '-o', 'ControlMaster=auto',
+                '-o', 'ControlPath=%s' % (make_control_path(False, False),)
+                ])
+        args.append(source)
 
+    if isinstance(dest, list):
+        args.extend(dest)
+    else:
         args.append(dest)
 
-        for x in xrange(retry):
-            # connects to the remote host and starts a remote connection
-            proc = subprocess.Popen(args,
-                    stdout = subprocess.PIPE,
-                    stdin = subprocess.PIPE, 
-                    stderr = subprocess.PIPE)
-            
-            # attach tempfile object to the process, to make sure the file stays
-            # alive until the process is finished with it
-            proc._known_hosts = tmp_known_hosts
-        
-            try:
-                (out, err) = proc.communicate()
-                eintr_retry(proc.wait)()
-                msg = " rcopy - host %s - command %s " % (host, " ".join(args))
-                log(msg, logging.DEBUG, out, err)
-
-                if proc.poll():
-                    t = x*2
-                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                            t, x, host, " ".join(args))
-                    log(msg, logging.DEBUG)
-
-                    time.sleep(t)
-                    continue
-
-                break
-            except RuntimeError, e:
-                msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-                log(msg, logging.DEBUG, out, err)
-
-                if retry <= 0:
-                    raise
-                retry -= 1
-            
-        return ((out, err), proc)
+    log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
+    
+    return _retry_rexec(args, log_msg, env = None, retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = True)
 
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
         stderr = STDOUT, 
-        stdin = '/dev/null', 
+        stdin = '/dev/null',
         home = None, 
         create_home = False, 
         sudo = False,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         identity = None, 
         server_key = None,
-        tty = False):
+        tty = False,
+        strict_host_checking = True):
     """
-    Spawn a remote command such that it will continue working asynchronously.
-    
-    Parameters:
-        command: the command to run - it should be a single line.
-        
-        pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
-        
-        stdout: path of a file to redirect standard output to - must be a string.
-            Defaults to /dev/null
-        stderr: path of a file to redirect standard error to - string or the special STDOUT value
-            to redirect to the same file stdout was redirected to. Defaults to STDOUT.
-        stdin: path of a file with input to be piped into the command's standard input
-        
-        home: path of a folder to use as working directory - should exist, unless you specify create_home
-        
-        create_home: if True, the home folder will be created first with mkdir -p
+    Spawn a remote command such that it will continue working asynchronously in 
+    background. 
+
+        :param command: The command to run, it should be a single line.
+        :type command: str
+
+        :param pidfile: Path to a file where to store the pid and ppid of the 
+                        spawned process
+        :type pidfile: str
+
+        :param stdout: Path to file to redirect standard output. 
+                       The default value is /dev/null
+        :type stdout: str
+
+        :param stderr: Path to file to redirect standard error.
+                       If the special STDOUT value is used, stderr will 
+                       be redirected to the same file as stdout
+        :type stderr: str
+
+        :param stdin: Path to a file with input to be piped into the command's standard input
+        :type stdin: str
+
+        :param home: Path to working directory folder. 
+                    It is assumed to exist unless the create_home flag is set.
+        :type home: str
+
+        :param create_home: Flag to force creation of the home folder before 
+                            running the command
+        :type create_home: bool
+        :param sudo: Flag forcing execution with sudo user
+        :type sudo: bool
         
-        sudo: whether the command needs to be executed as root
-        
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: tuple
+
         (stdout, stderr), process
         
         Of the spawning process, which only captures errors at spawning time.
@@ -615,7 +467,7 @@ def rspawn(command, pidfile,
     else:
         stderr = ' ' + stderr
     
-    daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+    daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
         'command' : command,
         'pidfile' : shell_escape(pidfile),
         'stdout' : stdout,
@@ -636,10 +488,13 @@ def rspawn(command, pidfile,
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
         server_key = server_key,
-        tty = tty ,
+        tty = tty,
+        strict_host_checking = strict_host_checking ,
         )
     
     if proc.wait():
@@ -648,27 +503,32 @@ def rspawn(command, pidfile,
     return ((out, err), proc)
 
 @eintr_retry
-def rcheckpid(pidfile,
+def rgetpid(pidfile,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
-    Check the pidfile of a process spawned with remote_spawn.
-    
-    Parameters:
-        pidfile: the pidfile passed to remote_span
+    Returns the pid and ppid of a process from a remote file where the 
+    information was stored.
+
+        :param home: Path to directory where the pidfile is located
+        :type home: str
+
+        :param pidfile: Name of file containing the pid information
+        :type pidfile: str
         
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: int
         
-        A (pid, ppid) tuple useful for calling remote_status and remote_kill,
-        or None if the pidfile isn't valid yet (maybe the process is still starting).
-    """
+        A (pid, ppid) tuple useful for calling rstatus and rkill,
+        or None if the pidfile isn't valid yet (can happen when process is staring up)
 
+    """
     (out,err),proc = rexec(
         "cat %(pidfile)s" % {
             'pidfile' : pidfile,
@@ -676,9 +536,12 @@ def rcheckpid(pidfile,
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
         
     if proc.wait():
@@ -696,22 +559,24 @@ def rstatus(pid, ppid,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
-    Check the status of a process spawned with remote_spawn.
+    Returns a code representing the the status of a remote process
+
+        :param pid: Process id of the process
+        :type pid: int
+
+        :param ppid: Parent process id of process
+        :type ppid: int
     
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        host/port/user/agent/identity: see rexec
+        :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
     
-    Returns:
-        
-        One of NOT_STARTED, RUNNING, FINISHED
     """
-
     (out,err),proc = rexec(
         # Check only by pid. pid+ppid does not always work (especially with sudo) 
         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
@@ -721,13 +586,16 @@ def rstatus(pid, ppid,
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
     
     if proc.wait():
-        return NOT_STARTED
+        return ProcStatus.NOT_STARTED
     
     status = False
     if err:
@@ -736,37 +604,38 @@ def rstatus(pid, ppid,
     elif out:
         status = (out.strip() == 'wait')
     else:
-        return NOT_STARTED
-    return RUNNING if status else FINISHED
+        return ProcStatus.NOT_STARTED
+    return ProcStatus.RUNNING if status else ProcStatus.FINISHED
 
 @eintr_retry
 def rkill(pid, ppid,
         host = None, 
         port = None, 
         user = None, 
+        gwuser = None,
+        gw = None,
         agent = None, 
         sudo = False,
         identity = None, 
         server_key = None, 
-        nowait = False):
+        nowait = False,
+        strict_host_checking = True):
     """
-    Kill a process spawned with remote_spawn.
-    
+    Sends a kill signal to a remote process.
+
     First tries a SIGTERM, and if the process does not end in 10 seconds,
     it sends a SIGKILL.
-    
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        sudo: whether the command was run with sudo - careful killing like this.
-        
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :param pid: Process id of process to be killed
+        :type pid: int
+
+        :param ppid: Parent process id of process to be killed
+        :type ppid: int
+
+        :param sudo: Flag indicating if sudo should be used to kill the process
+        :type sudo: bool
         
-        Nothing, should have killed the process
     """
-    
     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
     cmd = """
 SUBKILL="%(subkill)s" ;
@@ -800,9 +669,12 @@ fi
         host = host,
         port = port,
         user = user,
+        gwuser = gwuser,
+        gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
     
     # wait, don't leave zombies around
@@ -810,33 +682,105 @@ fi
 
     return (out, err), proc
 
+def _retry_rexec(args,
+        log_msg,
+        stdout = subprocess.PIPE,
+        stdin = subprocess.PIPE, 
+        stderr = subprocess.PIPE,
+        env = None,
+        retry = 3,
+        tmp_known_hosts = None,
+        blocking = True):
+
+    for x in xrange(retry):
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args,
+                env = env,
+                stdout = stdout,
+                stdin = stdin, 
+                stderr = stderr)
+        
+        # attach tempfile object to the process, to make sure the file stays
+        # alive until the process is finished with it
+        proc._known_hosts = tmp_known_hosts
+    
+        # The argument block == False forces to rexec to return immediately, 
+        # without blocking 
+        try:
+            err = out = " "
+            if blocking:
+                #(out, err) = proc.communicate()
+                # The method communicate was re implemented for performance issues
+                # when using python subprocess communicate method the ssh commands 
+                # last one minute each
+                out, err = _communicate(proc, input=None)
+
+            elif stdout:
+                out = proc.stdout.read()
+                if proc.poll() and stderr:
+                    err = proc.stderr.read()
+
+            log(log_msg, logging.DEBUG, out, err)
+
+            if proc.poll():
+                skip = False
+
+                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+                    # SSH error, can safely retry
+                    skip = True 
+                elif retry:
+                    # Probably timed out or plain failed but can retry
+                    skip = True 
+                
+                if skip:
+                    t = x*2
+                    msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
+                            t, x, " ".join(args))
+                    log(msg, logging.DEBUG)
+
+                    time.sleep(t)
+                    continue
+            break
+        except RuntimeError, e:
+            msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
+            log(msg, logging.DEBUG, out, err)
+
+            if retry <= 0:
+                raise
+            retry -= 1
+
+    return ((out, err), proc)
+
 # POSIX
-def _communicate(self, input, timeout=None, err_on_timeout=True):
+# Don't remove. The method communicate was re implemented for performance issues
+def _communicate(proc, input, timeout=None, err_on_timeout=True):
     read_set = []
     write_set = []
     stdout = None # Return
     stderr = None # Return
-    
+
     killed = False
-    
+
     if timeout is not None:
         timelimit = time.time() + timeout
         killtime = timelimit + 4
         bailtime = timelimit + 4
 
-    if self.stdin:
+    if proc.stdin:
         # Flush stdio buffer.  This might block, if the user has
         # been writing to .stdin in an uncontrolled fashion.
-        self.stdin.flush()
+        proc.stdin.flush()
         if input:
-            write_set.append(self.stdin)
+            write_set.append(proc.stdin)
         else:
-            self.stdin.close()
-    if self.stdout:
-        read_set.append(self.stdout)
+            proc.stdin.close()
+
+    if proc.stdout:
+        read_set.append(proc.stdout)
         stdout = []
-    if self.stderr:
-        read_set.append(self.stderr)
+
+    if proc.stderr:
+        read_set.append(proc.stderr)
         stderr = []
 
     input_offset = 0
@@ -851,16 +795,16 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
                 else:
                     signum = signal.SIGTERM
                 # Lets kill it
-                os.kill(self.pid, signum)
+                os.kill(proc.pid, signum)
                 select_timeout = 0.5
             else:
                 select_timeout = timelimit - curtime + 0.1
         else:
             select_timeout = 1.0
-        
+
         if select_timeout > 1.0:
             select_timeout = 1.0
-            
+
         try:
             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
         except select.error,e:
@@ -868,35 +812,37 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
                 raise
             else:
                 continue
-        
-        if not rlist and not wlist and not xlist and self.poll() is not None:
+
+        if not rlist and not wlist and not xlist and proc.poll() is not None:
             # timeout and process exited, say bye
             break
 
-        if self.stdin in wlist:
+        if proc.stdin in wlist:
             # When select has indicated that the file is writable,
             # we can write up to PIPE_BUF bytes without risk
             # blocking.  POSIX defines PIPE_BUF >= 512
-            bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+            bytes_written = os.write(proc.stdin.fileno(),
+                    buffer(input, input_offset, 512))
             input_offset += bytes_written
+
             if input_offset >= len(input):
-                self.stdin.close()
-                write_set.remove(self.stdin)
+                proc.stdin.close()
+                write_set.remove(proc.stdin)
 
-        if self.stdout in rlist:
-            data = os.read(self.stdout.fileno(), 1024)
+        if proc.stdout in rlist:
+            data = os.read(proc.stdout.fileno(), 1024)
             if data == "":
-                self.stdout.close()
-                read_set.remove(self.stdout)
+                proc.stdout.close()
+                read_set.remove(proc.stdout)
             stdout.append(data)
 
-        if self.stderr in rlist:
-            data = os.read(self.stderr.fileno(), 1024)
+        if proc.stderr in rlist:
+            data = os.read(proc.stderr.fileno(), 1024)
             if data == "":
-                self.stderr.close()
-                read_set.remove(self.stderr)
+                proc.stderr.close()
+                read_set.remove(proc.stderr)
             stderr.append(data)
-    
+
     # All data exchanged.  Translate lists into strings.
     if stdout is not None:
         stdout = ''.join(stdout)
@@ -907,19 +853,49 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
     # object do the translation: It is based on stdio, which is
     # impossible to combine with select (unless forcing no
     # buffering).
-    if self.universal_newlines and hasattr(file, 'newlines'):
+    if proc.universal_newlines and hasattr(file, 'newlines'):
         if stdout:
-            stdout = self._translate_newlines(stdout)
+            stdout = proc._translate_newlines(stdout)
         if stderr:
-            stderr = self._translate_newlines(stderr)
+            stderr = proc._translate_newlines(stderr)
 
     if killed and err_on_timeout:
-        errcode = self.poll()
+        errcode = proc.poll()
         raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
     else:
         if killed:
-            self.poll()
+            proc.poll()
         else:
-            self.wait()
+            proc.wait()
         return (stdout, stderr)
 
+def _proxy_command(gw, gwuser, gwidentity):
+    """
+    Constructs the SSH ProxyCommand option to add to the SSH command to connect
+    via a proxy
+        :param gw: SSH proxy hostname
+        :type gw:  str 
+       
+        :param gwuser: SSH proxy username
+        :type gwuser:  str
+
+        :param gwidentity: SSH proxy identity file 
+        :type gwidentity:  str
+
+  
+        :rtype: str 
+        
+        returns the SSH ProxyCommand option.
+    """
+
+    proxycommand = 'ProxyCommand=ssh -q '
+    if gwidentity:
+        proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
+    if gwuser:
+        proxycommand += '%s' % gwuser
+    else:
+        proxycommand += '%r'
+    proxycommand += '@%s -W %%h:%%p' % gw
+
+    return proxycommand
+