turn off using blowfish for scp transfers - not available on a stock ubuntu-15.04...
[nepi.git] / src / nepi / util / sshfuncs.py
index c00b494..a01bded 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -18,6 +17,8 @@
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #         Claudio Freire <claudio-daniel.freire@inria.fr>
 
+## TODO: This code needs reviewing !!!
+
 import base64
 import errno
 import hashlib
@@ -33,6 +34,8 @@ import threading
 import time
 import tempfile
 
+_re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
+
 logger = logging.getLogger("sshfuncs")
 
 def log(msg, level, out = None, err = None):
@@ -44,7 +47,6 @@ def log(msg, level, out = None, err = None):
 
     logger.log(level, msg)
 
-
 if hasattr(os, "devnull"):
     DEV_NULL = os.devnull
 else:
@@ -74,6 +76,20 @@ class ProcStatus:
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
 
+def resolve_hostname(host):
+    ip = None
+
+    if host in ["localhost", "127.0.0.1", "::1"]:
+        p = subprocess.Popen("ip -o addr list", shell=True,
+                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        stdout, stderr = p.communicate()
+        m = _re_inet.findall(stdout)
+        ip = m[0][1].split("/")[0]
+    else:
+        ip = socket.gethostbyname(host)
+
+    return ip
+
 def gethostbyname(host):
     global hostbyname_cache
     global hostbyname_cache_lock
@@ -81,7 +97,7 @@ def gethostbyname(host):
     hostbyname = hostbyname_cache.get(host)
     if not hostbyname:
         with hostbyname_cache_lock:
-            hostbyname = socket.gethostbyname(host)
+            hostbyname = resolve_hostname(host)
             hostbyname_cache[host] = hostbyname
 
             msg = " Added hostbyname %s - %s " % (host, hostbyname)
@@ -210,15 +226,12 @@ def rexec(command, host, user,
         gw = None, 
         agent = True,
         sudo = False,
-        stdin = None,
         identity = None,
         server_key = None,
         env = None,
         tty = False,
-        timeout = None,
-        retry = 3,
-        err_on_timeout = True,
         connect_timeout = 30,
+        retry = 3,
         persistent = True,
         forward_x11 = False,
         blocking = True,
@@ -226,7 +239,7 @@ def rexec(command, host, user,
     """
     Executes a remote command, returns ((stdout,stderr),process)
     """
-    
+
     tmp_known_hosts = None
     if not gw:
         hostip = gethostbyname(host)
@@ -253,10 +266,7 @@ def rexec(command, host, user,
         args.extend(['-o', 'StrictHostKeyChecking=no'])
 
     if gw:
-        if gwuser:
-            proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
-        else:
-            proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
+        proxycommand = _proxy_command(gw, gwuser, identity)
         args.extend(['-o', proxycommand])
 
     if agent:
@@ -266,6 +276,7 @@ def rexec(command, host, user,
         args.append('-p%d' % port)
 
     if identity:
+        identity = os.path.expanduser(identity)
         args.extend(('-i', identity))
 
     if tty:
@@ -285,65 +296,25 @@ def rexec(command, host, user,
 
     args.append(command)
 
-    for x in xrange(retry):
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args,
-                env = env,
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-       
-        # attach tempfile object to the process, to make sure the file stays
-        # alive until the process is finished with it
-        proc._known_hosts = tmp_known_hosts
-    
-        # by default, rexec calls _communicate which will block 
-        # until the process has exit. The argument block == False 
-        # forces to rexec to return immediately, without blocking 
-        try:
-            if blocking:
-                out, err = _communicate(proc, stdin, timeout, err_on_timeout)
-            else:
-                err = proc.stderr.read()
-                out = proc.stdout.read()
-
-            msg = " rexec - host %s - command %s " % (host, " ".join(args))
-            log(msg, logging.DEBUG, out, err)
-
-            if proc.poll():
-                skip = False
-
-                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
-                    # SSH error, can safely retry
-                    skip = True 
-                elif retry:
-                    # Probably timed out or plain failed but can retry
-                    skip = True 
-                
-                if skip:
-                    t = x*2
-                    msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                            t, x, host, " ".join(args))
-                    log(msg, logging.DEBUG)
+    log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
 
-                    time.sleep(t)
-                    continue
-            break
-        except RuntimeError, e:
-            msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-            log(msg, logging.DEBUG, out, err)
+    stdout = stderr = stdin = subprocess.PIPE
+    if forward_x11:
+        stdout = stderr = stdin = None
 
-            if retry <= 0:
-                raise
-            retry -= 1
-        
-    return ((out, err), proc)
+    return _retry_rexec(args, log_msg, 
+            stderr = stderr,
+            stdin = stdin,
+            stdout = stdout,
+            env = env, 
+            retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = blocking)
 
 def rcopy(source, dest,
         port = None,
         gwuser = None,
         gw = None,
-        agent = True, 
         recursive = False,
         identity = None,
         server_key = None,
@@ -357,7 +328,7 @@ def rcopy(source, dest,
     
     Source can be a list of files to copy to a single destination, 
     (in which case it is advised that the destination be a folder),
-    a single string or a string list of colon-separeted files.
+    or a single file in a string.
     """
 
     # Parse destination as <user>@<server>:<path>
@@ -373,9 +344,13 @@ def rcopy(source, dest,
     tmp_known_hosts = None
 
     args = ['scp', '-q', '-p', '-C',
+            # 2015-06-01 Thierry: I am commenting off blowfish
+            # as this is not available on a plain ubuntu 15.04 install
+            # this IMHO is too fragile, shoud be something the user
+            # decides explicitly (so he is at least aware of that dependency)
             # Speed up transfer using blowfish cypher specification which is 
             # faster than the default one (3des)
-            '-c', 'blowfish',
+            '-c', 'blowfish',
             # Don't bother with localhost. Makes test easier
             '-o', 'NoHostAuthenticationForLocalhost=yes',
             '-o', 'ConnectTimeout=60',
@@ -387,16 +362,14 @@ def rcopy(source, dest,
         args.append('-P%d' % port)
 
     if gw:
-        if gwuser:
-            proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
-        else:
-            proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
+        proxycommand = _proxy_command(gw, gwuser, identity)
         args.extend(['-o', proxycommand])
 
     if recursive:
         args.append('-r')
 
     if identity:
+        identity = os.path.expanduser(identity)
         args.extend(('-i', identity))
 
     if server_key:
@@ -407,59 +380,27 @@ def rcopy(source, dest,
     if not strict_host_checking:
         # Do not check for Host key. Unsafe.
         args.extend(['-o', 'StrictHostKeyChecking=no'])
-
-    if openssh_has_persist():
-        args.extend([
-            '-o', 'ControlMaster=auto',
-            '-o', 'ControlPath=%s' % (make_control_path(agent, False),)
-            ])
-
-    if isinstance(dest, str):
-        dest = map(str.strip, dest.split(";"))
-
-    if isinstance(source, str):
-        source = map(str.strip, source.split(";"))
-
-    args.extend(source)
-
-    args.extend(dest)
-
-    for x in xrange(retry):
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args,
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-        
-        # attach tempfile object to the process, to make sure the file stays
-        # alive until the process is finished with it
-        proc._known_hosts = tmp_known_hosts
     
-        try:
-            (out, err) = proc.communicate()
-            eintr_retry(proc.wait)()
-            msg = " rcopy - host %s - command %s " % (host, " ".join(args))
-            log(msg, logging.DEBUG, out, err)
-
-            if proc.poll():
-                t = x*2
-                msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( 
-                        t, x, host, " ".join(args))
-                log(msg, logging.DEBUG)
-
-                time.sleep(t)
-                continue
-
-            break
-        except RuntimeError, e:
-            msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args)
-            log(msg, logging.DEBUG, out, err)
+    if isinstance(source, list):
+        args.extend(source)
+    else:
+        if openssh_has_persist():
+            args.extend([
+                '-o', 'ControlMaster=auto',
+                '-o', 'ControlPath=%s' % (make_control_path(False, False),)
+                ])
+        args.append(source)
+
+    if isinstance(dest, list):
+        args.extend(dest)
+    else:
+        args.append(dest)
 
-            if retry <= 0:
-                raise
-            retry -= 1
-        
-    return ((out, err), proc)
+    log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args)))
+    
+    return _retry_rexec(args, log_msg, env = None, retry = retry, 
+            tmp_known_hosts = tmp_known_hosts,
+            blocking = True)
 
 def rspawn(command, pidfile, 
         stdout = '/dev/null', 
@@ -476,7 +417,8 @@ def rspawn(command, pidfile,
         agent = None, 
         identity = None, 
         server_key = None,
-        tty = False):
+        tty = False,
+        strict_host_checking = True):
     """
     Spawn a remote command such that it will continue working asynchronously in 
     background. 
@@ -511,7 +453,7 @@ def rspawn(command, pidfile,
         :param sudo: Flag forcing execution with sudo user
         :type sudo: bool
         
-        :rtype: touple
+        :rtype: tuple
 
         (stdout, stderr), process
         
@@ -551,7 +493,8 @@ def rspawn(command, pidfile,
         agent = agent,
         identity = identity,
         server_key = server_key,
-        tty = tty ,
+        tty = tty,
+        strict_host_checking = strict_host_checking ,
         )
     
     if proc.wait():
@@ -568,7 +511,8 @@ def rgetpid(pidfile,
         gw = None,
         agent = None, 
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
     Returns the pid and ppid of a process from a remote file where the 
     information was stored.
@@ -596,7 +540,8 @@ def rgetpid(pidfile,
         gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
         
     if proc.wait():
@@ -618,7 +563,8 @@ def rstatus(pid, ppid,
         gw = None,
         agent = None, 
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
     Returns a code representing the the status of a remote process
 
@@ -644,7 +590,8 @@ def rstatus(pid, ppid,
         gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
     
     if proc.wait():
@@ -671,7 +618,8 @@ def rkill(pid, ppid,
         sudo = False,
         identity = None, 
         server_key = None, 
-        nowait = False):
+        nowait = False,
+        strict_host_checking = True):
     """
     Sends a kill signal to a remote process.
 
@@ -725,7 +673,8 @@ fi
         gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
     
     # wait, don't leave zombies around
@@ -733,15 +682,85 @@ fi
 
     return (out, err), proc
 
+def _retry_rexec(args,
+        log_msg,
+        stdout = subprocess.PIPE,
+        stdin = subprocess.PIPE, 
+        stderr = subprocess.PIPE,
+        env = None,
+        retry = 3,
+        tmp_known_hosts = None,
+        blocking = True):
+
+    for x in xrange(retry):
+        # connects to the remote host and starts a remote connection
+        proc = subprocess.Popen(args,
+                env = env,
+                stdout = stdout,
+                stdin = stdin, 
+                stderr = stderr)
+        
+        # attach tempfile object to the process, to make sure the file stays
+        # alive until the process is finished with it
+        proc._known_hosts = tmp_known_hosts
+    
+        # The argument block == False forces to rexec to return immediately, 
+        # without blocking 
+        try:
+            err = out = " "
+            if blocking:
+                #(out, err) = proc.communicate()
+                # The method communicate was re implemented for performance issues
+                # when using python subprocess communicate method the ssh commands 
+                # last one minute each
+                out, err = _communicate(proc, input=None)
+
+            elif stdout:
+                out = proc.stdout.read()
+                if proc.poll() and stderr:
+                    err = proc.stderr.read()
+
+            log(log_msg, logging.DEBUG, out, err)
+
+            if proc.poll():
+                skip = False
+
+                if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '):
+                    # SSH error, can safely retry
+                    skip = True 
+                elif retry:
+                    # Probably timed out or plain failed but can retry
+                    skip = True 
+                
+                if skip:
+                    t = x*2
+                    msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( 
+                            t, x, " ".join(args))
+                    log(msg, logging.DEBUG)
+
+                    time.sleep(t)
+                    continue
+            break
+        except RuntimeError, e:
+            msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg )
+            log(msg, logging.DEBUG, out, err)
+
+            if retry <= 0:
+                raise
+            retry -= 1
+
+    return ((out, err), proc)
+
 # POSIX
+# Don't remove. The method communicate was re implemented for performance issues
 def _communicate(proc, input, timeout=None, err_on_timeout=True):
     read_set = []
     write_set = []
     stdout = None # Return
     stderr = None # Return
-    
+
     killed = False
-    
+
     if timeout is not None:
         timelimit = time.time() + timeout
         killtime = timelimit + 4
@@ -782,10 +801,10 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True):
                 select_timeout = timelimit - curtime + 0.1
         else:
             select_timeout = 1.0
-        
+
         if select_timeout > 1.0:
             select_timeout = 1.0
-            
+
         try:
             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
         except select.error,e:
@@ -793,7 +812,7 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True):
                 raise
             else:
                 continue
-        
+
         if not rlist and not wlist and not xlist and proc.poll() is not None:
             # timeout and process exited, say bye
             break
@@ -823,7 +842,7 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True):
                 proc.stderr.close()
                 read_set.remove(proc.stderr)
             stderr.append(data)
-    
+
     # All data exchanged.  Translate lists into strings.
     if stdout is not None:
         stdout = ''.join(stdout)
@@ -850,3 +869,33 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True):
             proc.wait()
         return (stdout, stderr)
 
+def _proxy_command(gw, gwuser, gwidentity):
+    """
+    Constructs the SSH ProxyCommand option to add to the SSH command to connect
+    via a proxy
+        :param gw: SSH proxy hostname
+        :type gw:  str 
+       
+        :param gwuser: SSH proxy username
+        :type gwuser:  str
+
+        :param gwidentity: SSH proxy identity file 
+        :type gwidentity:  str
+
+  
+        :rtype: str 
+        
+        returns the SSH ProxyCommand option.
+    """
+
+    proxycommand = 'ProxyCommand=ssh -q '
+    if gwidentity:
+        proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
+    if gwuser:
+        proxycommand += '%s' % gwuser
+    else:
+        proxycommand += '%r'
+    proxycommand += '@%s -W %%h:%%p' % gw
+
+    return proxycommand
+