NS3Client: replacing socat for ssh
[nepi.git] / src / nepi / util / sshfuncs.py
index a88dc78..81bf5f9 100644 (file)
@@ -16,6 +16,9 @@
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Claudio Freire <claudio-daniel.freire@inria.fr>
+
+## TODO: This code needs reviewing !!!
 
 import base64
 import errno
 
 import base64
 import errno
@@ -57,20 +60,18 @@ class STDOUT:
     redirect to whatever stdout was redirected to.
     """
 
     redirect to whatever stdout was redirected to.
     """
 
-class RUNNING:
-    """
-    Process is still running
-    """
-
-class FINISHED:
+class ProcStatus:
     """
     """
-    Process is finished
+    Codes for status of remote spawned process
     """
     """
+    # Process is still running
+    RUNNING = 1
 
 
-class NOT_STARTED:
-    """
-    Process hasn't started running yet (this should be very rare)
-    """
+    # Process is finished
+    FINISHED = 2
+    
+    # Process hasn't started running yet (this should be very rare)
+    NOT_STARTED = 3
 
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
 
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
@@ -209,17 +210,15 @@ def rexec(command, host, user,
         port = None, 
         agent = True,
         sudo = False,
         port = None, 
         agent = True,
         sudo = False,
-        stdin = None,
         identity = None,
         server_key = None,
         env = None,
         tty = False,
         identity = None,
         server_key = None,
         env = None,
         tty = False,
-        timeout = None,
-        retry = 3,
-        err_on_timeout = True,
         connect_timeout = 30,
         connect_timeout = 30,
+        retry = 3,
         persistent = True,
         forward_x11 = False,
         persistent = True,
         forward_x11 = False,
+        blocking = True,
         strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
         strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
@@ -268,53 +267,25 @@ def rexec(command, host, user,
         tmp_known_hosts = make_server_key_args(server_key, host, port)
         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
         tmp_known_hosts = make_server_key_args(server_key, host, port)
         args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
-    args.append(command)
+    if sudo:
+        command = "sudo " + command
 
 
-    for x in xrange(retry):
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args,
-                env = env,
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-        
-        # attach tempfile object to the process, to make sure the file stays
-        # alive until the process is finished with it
-        proc._known_hosts = tmp_known_hosts
+    args.append(command)
     
     
-        try:
-            out, err = _communicate(proc, stdin, timeout, err_on_timeout)
-            msg = " rexec - host %s - command %s " % (host, " ".join(args))
-            log(msg, logging.DEBUG, out, err)
-
-            if proc.poll():
-                skip = False
+    log_msg = " rexec - host %s - command %s " % (host, " ".join(args))
 
 
-                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
-                    # SSH error, can safely retry
-                    skip = True 
-                elif retry:
-                    # Probably timed out or plain failed but can retry
-                    skip = True 
-                
-                if skip:
-                    t = x*2
-                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                            t, x, host, " ".join(args))
-                    log(msg, logging.DEBUG)
-
-                    time.sleep(t)
-                    continue
-            break
-        except RuntimeError, e:
-            msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-            log(msg, logging.DEBUG, out, err)
+    stdout = stderr = stdin = subprocess.PIPE
+    if forward_x11:
+        stdout = stderr = stdin = None
 
 
-            if retry <= 0:
-                raise
-            retry -= 1
-        
-    return ((out, err), proc)
+    return _retry_rexec(args, log_msg, 
+            stderr = stderr,
+            stdin = stdin,
+            stdout = stdout,
+            env = env, 
+            retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = blocking)
 
 def rcopy(source, dest,
         port = None, 
 
 def rcopy(source, dest,
         port = None, 
@@ -330,265 +301,73 @@ def rcopy(source, dest,
     Source and destination should have the user and host encoded
     as per scp specs.
     
     Source and destination should have the user and host encoded
     as per scp specs.
     
-    If source is a file object, a special mode will be used to
-    create the remote file with the same contents.
-    
-    If dest is a file object, the remote file (source) will be
-    read and written into dest.
-    
-    In these modes, recursive cannot be True.
-    
     Source can be a list of files to copy to a single destination,
     in which case it is advised that the destination be a folder.
     """
     
     Source can be a list of files to copy to a single destination,
     in which case it is advised that the destination be a folder.
     """
     
-    if isinstance(source, file) and source.tell() == 0:
-        source = source.name
-    elif hasattr(source, 'read'):
-        tmp = tempfile.NamedTemporaryFile()
-        while True:
-            buf = source.read(65536)
-            if buf:
-                tmp.write(buf)
-            else:
-                break
-        tmp.seek(0)
-        source = tmp.name
+    # Parse destination as <user>@<server>:<path>
+    if isinstance(dest, basestring) and ':' in dest:
+        remspec, path = dest.split(':',1)
+    elif isinstance(source, basestring) and ':' in source:
+        remspec, path = source.split(':',1)
+    else:
+        raise ValueError, "Both endpoints cannot be local"
+    user,host = remspec.rsplit('@',1)
     
     
-    if isinstance(source, file) or isinstance(dest, file) \
-            or hasattr(source, 'read')  or hasattr(dest, 'write'):
-        assert not recursive
-        
-        # Parse source/destination as <user>@<server>:<path>
-        if isinstance(dest, basestring) and ':' in dest:
-            remspec, path = dest.split(':',1)
-        elif isinstance(source, basestring) and ':' in source:
-            remspec, path = source.split(':',1)
-        else:
-            raise ValueError, "Both endpoints cannot be local"
-        user,host = remspec.rsplit('@',1)
-        
-        tmp_known_hosts = None
-        hostip = gethostbyname(host)
-        
-        args = ['ssh', '-l', user, '-C',
-                # Don't bother with localhost. Makes test easier
-                '-o', 'NoHostAuthenticationForLocalhost=yes',
-                '-o', 'ConnectTimeout=60',
-                '-o', 'ConnectionAttempts=3',
-                '-o', 'ServerAliveInterval=30',
-                '-o', 'TCPKeepAlive=yes',
-                hostip or host ]
-
-        if openssh_has_persist():
-            args.extend([
-                '-o', 'ControlMaster=auto',
-                '-o', 'ControlPath=%s' % (make_control_path(agent, False),),
-                '-o', 'ControlPersist=60' ])
-
-        if port:
-            args.append('-P%d' % port)
-
-        if identity:
-            args.extend(('-i', identity))
+    # plain scp
+    tmp_known_hosts = None
 
 
-        if server_key:
-            # Create a temporary server key file
-            tmp_known_hosts = make_server_key_args(server_key, host, port)
-            args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
-        
-        if isinstance(source, file) or hasattr(source, 'read'):
-            args.append('cat > %s' % (shell_escape(path),))
-        elif isinstance(dest, file) or hasattr(dest, 'write'):
-            args.append('cat %s' % (shell_escape(path),))
-        else:
-            raise AssertionError, "Unreachable code reached! :-Q"
-        
-        # connects to the remote host and starts a remote connection
-        if isinstance(source, file):
-            proc = subprocess.Popen(args, 
-                    stdout = open('/dev/null','w'),
-                    stderr = subprocess.PIPE,
-                    stdin = source)
-            err = proc.stderr.read()
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,err), proc)
-        elif isinstance(dest, file):
-            proc = subprocess.Popen(args, 
-                    stdout = open('/dev/null','w'),
-                    stderr = subprocess.PIPE,
-                    stdin = source)
-            err = proc.stderr.read()
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,err), proc)
-        elif hasattr(source, 'read'):
-            # file-like (but not file) source
-            proc = subprocess.Popen(args, 
-                    stdout = open('/dev/null','w'),
-                    stderr = subprocess.PIPE,
-                    stdin = subprocess.PIPE)
-            
-            buf = None
-            err = []
-            while True:
-                if not buf:
-                    buf = source.read(4096)
-                if not buf:
-                    #EOF
-                    break
-                
-                rdrdy, wrdy, broken = select.select(
-                    [proc.stderr],
-                    [proc.stdin],
-                    [proc.stderr,proc.stdin])
-                
-                if proc.stderr in rdrdy:
-                    # use os.read for fully unbuffered behavior
-                    err.append(os.read(proc.stderr.fileno(), 4096))
-                
-                if proc.stdin in wrdy:
-                    proc.stdin.write(buf)
-                    buf = None
-                
-                if broken:
-                    break
-            proc.stdin.close()
-            err.append(proc.stderr.read())
-                
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,''.join(err)), proc)
-        elif hasattr(dest, 'write'):
-            # file-like (but not file) dest
-            proc = subprocess.Popen(args, 
-                    stdout = subprocess.PIPE,
-                    stderr = subprocess.PIPE,
-                    stdin = open('/dev/null','w'))
+    args = ['scp', '-q', '-p', '-C',
+            # Speed up transfer using blowfish cypher specification which is 
+            # faster than the default one (3des)
+            '-c', 'blowfish',
+            # Don't bother with localhost. Makes test easier
+            '-o', 'NoHostAuthenticationForLocalhost=yes',
+            '-o', 'ConnectTimeout=60',
+            '-o', 'ConnectionAttempts=3',
+            '-o', 'ServerAliveInterval=30',
+            '-o', 'TCPKeepAlive=yes' ]
             
             
-            buf = None
-            err = []
-            while True:
-                rdrdy, wrdy, broken = select.select(
-                    [proc.stderr, proc.stdout],
-                    [],
-                    [proc.stderr, proc.stdout])
-                
-                if proc.stderr in rdrdy:
-                    # use os.read for fully unbuffered behavior
-                    err.append(os.read(proc.stderr.fileno(), 4096))
-                
-                if proc.stdout in rdrdy:
-                    # use os.read for fully unbuffered behavior
-                    buf = os.read(proc.stdout.fileno(), 4096)
-                    dest.write(buf)
-                    
-                    if not buf:
-                        #EOF
-                        break
-                
-                if broken:
-                    break
-            err.append(proc.stderr.read())
-                
-            proc._known_hosts = tmp_known_hosts
-            eintr_retry(proc.wait)()
-            return ((None,''.join(err)), proc)
-        else:
-            raise AssertionError, "Unreachable code reached! :-Q"
-    else:
-        # Parse destination as <user>@<server>:<path>
-        if isinstance(dest, basestring) and ':' in dest:
-            remspec, path = dest.split(':',1)
-        elif isinstance(source, basestring) and ':' in source:
-            remspec, path = source.split(':',1)
-        else:
-            raise ValueError, "Both endpoints cannot be local"
-        user,host = remspec.rsplit('@',1)
-        
-        # plain scp
-        tmp_known_hosts = None
-
-        args = ['scp', '-q', '-p', '-C',
-                # Don't bother with localhost. Makes test easier
-                '-o', 'NoHostAuthenticationForLocalhost=yes',
-                '-o', 'ConnectTimeout=60',
-                '-o', 'ConnectionAttempts=3',
-                '-o', 'ServerAliveInterval=30',
-                '-o', 'TCPKeepAlive=yes' ]
-                
-        if port:
-            args.append('-P%d' % port)
-
-        if recursive:
-            args.append('-r')
-
-        if identity:
-            args.extend(('-i', identity))
+    if port:
+        args.append('-P%d' % port)
 
 
-        if server_key:
-            # Create a temporary server key file
-            tmp_known_hosts = make_server_key_args(server_key, host, port)
-            args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
+    if recursive:
+        args.append('-r')
 
 
-        if not strict_host_checking:
-            # Do not check for Host key. Unsafe.
-            args.extend(['-o', 'StrictHostKeyChecking=no'])
+    if identity:
+        args.extend(('-i', identity))
 
 
-        if isinstance(source,list):
-            args.extend(source)
-        else:
-            if openssh_has_persist():
-                args.extend([
-                    '-o', 'ControlMaster=auto',
-                    '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
-                    ])
-            args.append(source)
-
-        args.append(dest)
-
-        for x in xrange(retry):
-            # connects to the remote host and starts a remote connection
-            proc = subprocess.Popen(args,
-                    stdout = subprocess.PIPE,
-                    stdin = subprocess.PIPE, 
-                    stderr = subprocess.PIPE)
-            
-            # attach tempfile object to the process, to make sure the file stays
-            # alive until the process is finished with it
-            proc._known_hosts = tmp_known_hosts
-        
-            try:
-                (out, err) = proc.communicate()
-                eintr_retry(proc.wait)()
-                msg = " rcopy - host %s - command %s " % (host, " ".join(args))
-                log(msg, logging.DEBUG, out, err)
+    if server_key:
+        # Create a temporary server key file
+        tmp_known_hosts = make_server_key_args(server_key, host, port)
+        args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)])
 
 
-                if proc.poll():
-                    t = x*2
-                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                            t, x, host, " ".join(args))
-                    log(msg, logging.DEBUG)
+    if not strict_host_checking:
+        # Do not check for Host key. Unsafe.
+        args.extend(['-o', 'StrictHostKeyChecking=no'])
 
 
-                    time.sleep(t)
-                    continue
+    if isinstance(source, list):
+        args.extend(source)
+    else:
+        if openssh_has_persist():
+            args.extend([
+                '-o', 'ControlMaster=auto',
+                '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
+                ])
+        args.append(source)
 
 
-                break
-            except RuntimeError, e:
-                msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-                log(msg, logging.DEBUG, out, err)
+    args.append(dest)
 
 
-                if retry <= 0:
-                    raise
-                retry -= 1
-            
-        return ((out, err), proc)
+    log_msg = " rcopy - host %s - command %s " % (host, " ".join(args))
+    
+    return _retry_rexec(args, log_msg, env = None, retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = True)
 
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
         stderr = STDOUT, 
 
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
         stderr = STDOUT, 
-        stdin = '/dev/null', 
+        stdin = '/dev/null',
         home = None, 
         create_home = False, 
         sudo = False,
         home = None, 
         create_home = False, 
         sudo = False,
@@ -600,28 +379,41 @@ def rspawn(command, pidfile,
         server_key = None,
         tty = False):
     """
         server_key = None,
         tty = False):
     """
-    Spawn a remote command such that it will continue working asynchronously.
-    
-    Parameters:
-        command: the command to run - it should be a single line.
-        
-        pidfile: path of a (ideally unique to this task) pidfile for tracking the process.
-        
-        stdout: path of a file to redirect standard output to - must be a string.
-            Defaults to /dev/null
-        stderr: path of a file to redirect standard error to - string or the special STDOUT value
-            to redirect to the same file stdout was redirected to. Defaults to STDOUT.
-        stdin: path of a file with input to be piped into the command's standard input
-        
-        home: path of a folder to use as working directory - should exist, unless you specify create_home
-        
-        create_home: if True, the home folder will be created first with mkdir -p
-        
-        sudo: whether the command needs to be executed as root
+    Spawn a remote command such that it will continue working asynchronously in 
+    background. 
+
+        :param command: The command to run, it should be a single line.
+        :type command: str
+
+        :param pidfile: Path to a file where to store the pid and ppid of the 
+                        spawned process
+        :type pidfile: str
+
+        :param stdout: Path to file to redirect standard output. 
+                       The default value is /dev/null
+        :type stdout: str
+
+        :param stderr: Path to file to redirect standard error.
+                       If the special STDOUT value is used, stderr will 
+                       be redirected to the same file as stdout
+        :type stderr: str
+
+        :param stdin: Path to a file with input to be piped into the command's standard input
+        :type stdin: str
+
+        :param home: Path to working directory folder. 
+                    It is assumed to exist unless the create_home flag is set.
+        :type home: str
+
+        :param create_home: Flag to force creation of the home folder before 
+                            running the command
+        :type create_home: bool
+        :param sudo: Flag forcing execution with sudo user
+        :type sudo: bool
         
         
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: touple
+
         (stdout, stderr), process
         
         Of the spawning process, which only captures errors at spawning time.
         (stdout, stderr), process
         
         Of the spawning process, which only captures errors at spawning time.
@@ -634,7 +426,7 @@ def rspawn(command, pidfile,
     else:
         stderr = ' ' + stderr
     
     else:
         stderr = ' ' + stderr
     
-    daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+    daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
         'command' : command,
         'pidfile' : shell_escape(pidfile),
         'stdout' : stdout,
         'command' : command,
         'pidfile' : shell_escape(pidfile),
         'stdout' : stdout,
@@ -667,7 +459,7 @@ def rspawn(command, pidfile,
     return ((out, err), proc)
 
 @eintr_retry
     return ((out, err), proc)
 
 @eintr_retry
-def rcheckpid(pidfile,
+def rgetpid(pidfile,
         host = None, 
         port = None, 
         user = None, 
         host = None, 
         port = None, 
         user = None, 
@@ -675,19 +467,21 @@ def rcheckpid(pidfile,
         identity = None,
         server_key = None):
     """
         identity = None,
         server_key = None):
     """
-    Check the pidfile of a process spawned with remote_spawn.
-    
-    Parameters:
-        pidfile: the pidfile passed to remote_span
+    Returns the pid and ppid of a process from a remote file where the 
+    information was stored.
+
+        :param home: Path to directory where the pidfile is located
+        :type home: str
+
+        :param pidfile: Name of file containing the pid information
+        :type pidfile: str
         
         
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :rtype: int
         
         
-        A (pid, ppid) tuple useful for calling remote_status and remote_kill,
-        or None if the pidfile isn't valid yet (maybe the process is still starting).
-    """
+        A (pid, ppid) tuple useful for calling rstatus and rkill,
+        or None if the pidfile isn't valid yet (can happen when process is staring up)
 
 
+    """
     (out,err),proc = rexec(
         "cat %(pidfile)s" % {
             'pidfile' : pidfile,
     (out,err),proc = rexec(
         "cat %(pidfile)s" % {
             'pidfile' : pidfile,
@@ -719,18 +513,17 @@ def rstatus(pid, ppid,
         identity = None,
         server_key = None):
     """
         identity = None,
         server_key = None):
     """
-    Check the status of a process spawned with remote_spawn.
+    Returns a code representing the the status of a remote process
+
+        :param pid: Process id of the process
+        :type pid: int
+
+        :param ppid: Parent process id of process
+        :type ppid: int
     
     
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        host/port/user/agent/identity: see rexec
+        :rtype: int (One of NOT_STARTED, RUNNING, FINISHED)
     
     
-    Returns:
-        
-        One of NOT_STARTED, RUNNING, FINISHED
     """
     """
-
     (out,err),proc = rexec(
         # Check only by pid. pid+ppid does not always work (especially with sudo) 
         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
     (out,err),proc = rexec(
         # Check only by pid. pid+ppid does not always work (especially with sudo) 
         " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait')  || echo 'done' ) | tail -n 1" % {
@@ -746,7 +539,7 @@ def rstatus(pid, ppid,
         )
     
     if proc.wait():
         )
     
     if proc.wait():
-        return NOT_STARTED
+        return ProcStatus.NOT_STARTED
     
     status = False
     if err:
     
     status = False
     if err:
@@ -755,8 +548,8 @@ def rstatus(pid, ppid,
     elif out:
         status = (out.strip() == 'wait')
     else:
     elif out:
         status = (out.strip() == 'wait')
     else:
-        return NOT_STARTED
-    return RUNNING if status else FINISHED
+        return ProcStatus.NOT_STARTED
+    return ProcStatus.RUNNING if status else ProcStatus.FINISHED
 
 @eintr_retry
 def rkill(pid, ppid,
 
 @eintr_retry
 def rkill(pid, ppid,
@@ -769,23 +562,21 @@ def rkill(pid, ppid,
         server_key = None, 
         nowait = False):
     """
         server_key = None, 
         nowait = False):
     """
-    Kill a process spawned with remote_spawn.
-    
+    Sends a kill signal to a remote process.
+
     First tries a SIGTERM, and if the process does not end in 10 seconds,
     it sends a SIGKILL.
     First tries a SIGTERM, and if the process does not end in 10 seconds,
     it sends a SIGKILL.
-    
-    Parameters:
-        pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid
-        
-        sudo: whether the command was run with sudo - careful killing like this.
-        
-        host/port/user/agent/identity: see rexec
-    
-    Returns:
+        :param pid: Process id of process to be killed
+        :type pid: int
+
+        :param ppid: Parent process id of process to be killed
+        :type ppid: int
+
+        :param sudo: Flag indicating if sudo should be used to kill the process
+        :type sudo: bool
         
         
-        Nothing, should have killed the process
     """
     """
-    
     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
     cmd = """
 SUBKILL="%(subkill)s" ;
     subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid }
     cmd = """
 SUBKILL="%(subkill)s" ;
@@ -829,116 +620,68 @@ fi
 
     return (out, err), proc
 
 
     return (out, err), proc
 
-# POSIX
-def _communicate(self, input, timeout=None, err_on_timeout=True):
-    read_set = []
-    write_set = []
-    stdout = None # Return
-    stderr = None # Return
-    
-    killed = False
-    
-    if timeout is not None:
-        timelimit = time.time() + timeout
-        killtime = timelimit + 4
-        bailtime = timelimit + 4
-
-    if self.stdin:
-        # Flush stdio buffer.  This might block, if the user has
-        # been writing to .stdin in an uncontrolled fashion.
-        self.stdin.flush()
-        if input:
-            write_set.append(self.stdin)
-        else:
-            self.stdin.close()
-    if self.stdout:
-        read_set.append(self.stdout)
-        stdout = []
-    if self.stderr:
-        read_set.append(self.stderr)
-        stderr = []
-
-    input_offset = 0
-    while read_set or write_set:
-        if timeout is not None:
-            curtime = time.time()
-            if timeout is None or curtime > timelimit:
-                if curtime > bailtime:
-                    break
-                elif curtime > killtime:
-                    signum = signal.SIGKILL
-                else:
-                    signum = signal.SIGTERM
-                # Lets kill it
-                os.kill(self.pid, signum)
-                select_timeout = 0.5
-            else:
-                select_timeout = timelimit - curtime + 0.1
-        else:
-            select_timeout = 1.0
+def _retry_rexec(args,
+        log_msg,
+        stdout = subprocess.PIPE,
+        stdin = subprocess.PIPE, 
+        stderr = subprocess.PIPE,
+        env = None,
+        retry = 3,
+        tmp_known_hosts = None,
+        blocking = True):
+
+    for x in xrange(retry):
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args,
+                env = env,
+                stdout = stdout,
+                stdin = stdin, 
+                stderr = stderr)
         
         
-        if select_timeout > 1.0:
-            select_timeout = 1.0
-            
+        # attach tempfile object to the process, to make sure the file stays
+        # alive until the process is finished with it
+        proc._known_hosts = tmp_known_hosts
+    
+        # The argument block == False forces to rexec to return immediately, 
+        # without blocking 
         try:
         try:
-            rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
-        except select.error,e:
-            if e[0] != 4:
+            err = out = " "
+            if blocking:
+                (out, err) = proc.communicate()
+            elif stdout:
+                out = proc.stdout.read()
+                if proc.poll() and stderr:
+                    err = proc.stderr.read()
+
+            log(log_msg, logging.DEBUG, out, err)
+
+            if proc.poll():
+                skip = False
+
+                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+                    # SSH error, can safely retry
+                    skip = True 
+                elif retry:
+                    # Probably timed out or plain failed but can retry
+                    skip = True 
+                
+                if skip:
+                    t = x*2
+                    msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
+                            t, x, " ".join(args))
+                    log(msg, logging.DEBUG)
+
+                    time.sleep(t)
+                    continue
+            break
+        except RuntimeError, e:
+            msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
+            log(msg, logging.DEBUG, out, err)
+
+            if retry <= 0:
                 raise
                 raise
-            else:
-                continue
+            retry -= 1
         
         
-        if not rlist and not wlist and not xlist and self.poll() is not None:
-            # timeout and process exited, say bye
-            break
+    return ((out, err), proc)
 
 
-        if self.stdin in wlist:
-            # When select has indicated that the file is writable,
-            # we can write up to PIPE_BUF bytes without risk
-            # blocking.  POSIX defines PIPE_BUF >= 512
-            bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
-            input_offset += bytes_written
-            if input_offset >= len(input):
-                self.stdin.close()
-                write_set.remove(self.stdin)
-
-        if self.stdout in rlist:
-            data = os.read(self.stdout.fileno(), 1024)
-            if data == "":
-                self.stdout.close()
-                read_set.remove(self.stdout)
-            stdout.append(data)
-
-        if self.stderr in rlist:
-            data = os.read(self.stderr.fileno(), 1024)
-            if data == "":
-                self.stderr.close()
-                read_set.remove(self.stderr)
-            stderr.append(data)
-    
-    # All data exchanged.  Translate lists into strings.
-    if stdout is not None:
-        stdout = ''.join(stdout)
-    if stderr is not None:
-        stderr = ''.join(stderr)
-
-    # Translate newlines, if requested.  We cannot let the file
-    # object do the translation: It is based on stdio, which is
-    # impossible to combine with select (unless forcing no
-    # buffering).
-    if self.universal_newlines and hasattr(file, 'newlines'):
-        if stdout:
-            stdout = self._translate_newlines(stdout)
-        if stderr:
-            stderr = self._translate_newlines(stderr)
-
-    if killed and err_on_timeout:
-        errcode = self.poll()
-        raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
-    else:
-        if killed:
-            self.poll()
-        else:
-            self.wait()
-        return (stdout, stderr)