Merging ns-3 into nepi-3-dev
[nepi.git] / src / nepi / util / sshfuncs.py
index 1df46b9..b9b5342 100644 (file)
@@ -18,6 +18,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Claudio Freire <claudio-daniel.freire@inria.fr>
 
+## TODO: This code needs reviewing !!!
+
 import base64
 import errno
 import hashlib
@@ -210,15 +212,12 @@ def rexec(command, host, user,
         gw = None, 
         agent = True,
         sudo = False,
-        stdin = None,
         identity = None,
         server_key = None,
         env = None,
         tty = False,
-        timeout = None,
-        retry = 3,
-        err_on_timeout = True,
         connect_timeout = 30,
+        retry = 3,
         persistent = True,
         forward_x11 = False,
         blocking = True,
@@ -285,60 +284,21 @@ def rexec(command, host, user,
         command = "sudo " + command
 
     args.append(command)
-
-    for x in xrange(retry):
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args,
-                env = env,
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-       
-        # attach tempfile object to the process, to make sure the file stays
-        # alive until the process is finished with it
-        proc._known_hosts = tmp_known_hosts
     
-        # by default, rexec calls _communicate which will block 
-        # until the process has exit. The argument block == False 
-        # forces to rexec to return immediately, without blocking 
-        try:
-            if blocking:
-                out, err = _communicate(proc, stdin, timeout, err_on_timeout)
-            else:
-                err = proc.stderr.read()
-                out = proc.stdout.read()
-
-            msg = " rexec - host %s - command %s " % (host, " ".join(args))
-            log(msg, logging.DEBUG, out, err)
-
-            if proc.poll():
-                skip = False
-
-                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
-                    # SSH error, can safely retry
-                    skip = True 
-                elif retry:
-                    # Probably timed out or plain failed but can retry
-                    skip = True 
-                
-                if skip:
-                    t = x*2
-                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                            t, x, host, " ".join(args))
-                    log(msg, logging.DEBUG)
+    log_msg = " rexec - host %s - command %s " % (host, " ".join(args))
 
-                    time.sleep(t)
-                    continue
-            break
-        except RuntimeError, e:
-            msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-            log(msg, logging.DEBUG, out, err)
+    stdout = stderr = stdin = subprocess.PIPE
+    if forward_x11:
+        stdout = stderr = stdin = None
 
-            if retry <= 0:
-                raise
-            retry -= 1
-        
-    return ((out, err), proc)
+    return _retry_rexec(args, log_msg, 
+            stderr = stderr,
+            stdin = stdin,
+            stdout = stdout,
+            env = env, 
+            retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = blocking)
 
 def rcopy(source, dest,
         port = None,
@@ -410,59 +370,24 @@ def rcopy(source, dest,
     if not strict_host_checking:
         # Do not check for Host key. Unsafe.
         args.extend(['-o', 'StrictHostKeyChecking=no'])
-
-    if openssh_has_persist():
-        args.extend([
-            '-o', 'ControlMaster=auto',
-            '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
-            ])
-
-    if isinstance(dest, str):
-        dest = map(str.strip, dest.split(";"))
-
-    if isinstance(source, str):
-        source = map(str.strip, source.split(";"))
-
-    args.extend(source)
-
-    args.extend(dest)
-
-    for x in xrange(retry):
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args,
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-        
-        # attach tempfile object to the process, to make sure the file stays
-        # alive until the process is finished with it
-        proc._known_hosts = tmp_known_hosts
     
-        try:
-            (out, err) = proc.communicate()
-            eintr_retry(proc.wait)()
-            msg = " rcopy - host %s - command %s " % (host, " ".join(args))
-            log(msg, logging.DEBUG, out, err)
-
-            if proc.poll():
-                t = x*2
-                msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                        t, x, host, " ".join(args))
-                log(msg, logging.DEBUG)
-
-                time.sleep(t)
-                continue
+    if isinstance(source, list):
+        args.extend(source)
+    else:
+        if openssh_has_persist():
+            args.extend([
+                '-o', 'ControlMaster=auto',
+                '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
+                ])
+        args.append(source)
 
-            break
-        except RuntimeError, e:
-            msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-            log(msg, logging.DEBUG, out, err)
+    args.append(dest)
 
-            if retry <= 0:
-                raise
-            retry -= 1
-        
-    return ((out, err), proc)
+    log_msg = " rcopy - host %s - command %s " % (host, " ".join(args))
+    
+    return _retry_rexec(args, log_msg, env = None, retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = True)
 
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
@@ -736,120 +661,67 @@ fi
 
     return (out, err), proc
 
-# POSIX
-def _communicate(proc, input, timeout=None, err_on_timeout=True):
-    read_set = []
-    write_set = []
-    stdout = None # Return
-    stderr = None # Return
-    
-    killed = False
-    
-    if timeout is not None:
-        timelimit = time.time() + timeout
-        killtime = timelimit + 4
-        bailtime = timelimit + 4
-
-    if proc.stdin:
-        # Flush stdio buffer.  This might block, if the user has
-        # been writing to .stdin in an uncontrolled fashion.
-        proc.stdin.flush()
-        if input:
-            write_set.append(proc.stdin)
-        else:
-            proc.stdin.close()
-
-    if proc.stdout:
-        read_set.append(proc.stdout)
-        stdout = []
-
-    if proc.stderr:
-        read_set.append(proc.stderr)
-        stderr = []
-
-    input_offset = 0
-    while read_set or write_set:
-        if timeout is not None:
-            curtime = time.time()
-            if timeout is None or curtime > timelimit:
-                if curtime > bailtime:
-                    break
-                elif curtime > killtime:
-                    signum = signal.SIGKILL
-                else:
-                    signum = signal.SIGTERM
-                # Lets kill it
-                os.kill(proc.pid, signum)
-                select_timeout = 0.5
-            else:
-                select_timeout = timelimit - curtime + 0.1
-        else:
-            select_timeout = 1.0
+def _retry_rexec(args,
+        log_msg,
+        stdout = subprocess.PIPE,
+        stdin = subprocess.PIPE, 
+        stderr = subprocess.PIPE,
+        env = None,
+        retry = 3,
+        tmp_known_hosts = None,
+        blocking = True):
+
+    for x in xrange(retry):
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args,
+                env = env,
+                stdout = stdout,
+                stdin = stdin, 
+                stderr = stderr)
         
-        if select_timeout > 1.0:
-            select_timeout = 1.0
-            
+        # attach tempfile object to the process, to make sure the file stays
+        # alive until the process is finished with it
+        proc._known_hosts = tmp_known_hosts
+    
+        # The argument block == False forces to rexec to return immediately, 
+        # without blocking 
         try:
-            rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
-        except select.error,e:
-            if e[0] != 4:
-                raise
-            else:
-                continue
-        
-        if not rlist and not wlist and not xlist and proc.poll() is not None:
-            # timeout and process exited, say bye
+            err = out = " "
+            if blocking:
+                (out, err) = proc.communicate()
+            elif stdout:
+                out = proc.stdout.read()
+                if proc.poll() and stderr:
+                    err = proc.stderr.read()
+
+            log(log_msg, logging.DEBUG, out, err)
+
+            if proc.poll():
+                skip = False
+
+                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+                    # SSH error, can safely retry
+                    skip = True 
+                elif retry:
+                    # Probably timed out or plain failed but can retry
+                    skip = True 
+                
+                if skip:
+                    t = x*2
+                    msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
+                            t, x, " ".join(args))
+                    log(msg, logging.DEBUG)
+
+                    time.sleep(t)
+                    continue
             break
+        except RuntimeError, e:
+            msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
+            log(msg, logging.DEBUG, out, err)
 
-        if proc.stdin in wlist:
-            # When select has indicated that the file is writable,
-            # we can write up to PIPE_BUF bytes without risk
-            # blocking.  POSIX defines PIPE_BUF >= 512
-            bytes_written = os.write(proc.stdin.fileno(),
-                    buffer(input, input_offset, 512))
-            input_offset += bytes_written
-
-            if input_offset >= len(input):
-                proc.stdin.close()
-                write_set.remove(proc.stdin)
-
-        if proc.stdout in rlist:
-            data = os.read(proc.stdout.fileno(), 1024)
-            if data == "":
-                proc.stdout.close()
-                read_set.remove(proc.stdout)
-            stdout.append(data)
-
-        if proc.stderr in rlist:
-            data = os.read(proc.stderr.fileno(), 1024)
-            if data == "":
-                proc.stderr.close()
-                read_set.remove(proc.stderr)
-            stderr.append(data)
-    
-    # All data exchanged.  Translate lists into strings.
-    if stdout is not None:
-        stdout = ''.join(stdout)
-    if stderr is not None:
-        stderr = ''.join(stderr)
-
-    # Translate newlines, if requested.  We cannot let the file
-    # object do the translation: It is based on stdio, which is
-    # impossible to combine with select (unless forcing no
-    # buffering).
-    if proc.universal_newlines and hasattr(file, 'newlines'):
-        if stdout:
-            stdout = proc._translate_newlines(stdout)
-        if stderr:
-            stderr = proc._translate_newlines(stderr)
-
-    if killed and err_on_timeout:
-        errcode = proc.poll()
-        raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
-    else:
-        if killed:
-            proc.poll()
-        else:
-            proc.wait()
-        return (stdout, stderr)
+            if retry <= 0:
+                raise
+            retry -= 1
+        
+    return ((out, err), proc)