turn off using blowfish for scp transfers - not available on a stock ubuntu-15.04...
[nepi.git] / src / nepi / util / sshfuncs.py
index e8ff213..a01bded 100644 (file)
@@ -3,9 +3,8 @@
 #    Copyright (C) 2013 INRIA
 #
 #    This program is free software: you can redistribute it and/or modify
-#    it under the terms of the GNU General Public License as published by
-#    the Free Software Foundation, either version 3 of the License, or
-#    (at your option) any later version.
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
 #
 #    This program is distributed in the hope that it will be useful,
 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -35,6 +34,8 @@ import threading
 import time
 import tempfile
 
+_re_inet = re.compile("\d+:\s+(?P<name>[a-z0-9_-]+)\s+inet6?\s+(?P<inet>[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") 
+
 logger = logging.getLogger("sshfuncs")
 
 def log(msg, level, out = None, err = None):
@@ -75,6 +76,20 @@ class ProcStatus:
 hostbyname_cache = dict()
 hostbyname_cache_lock = threading.Lock()
 
+def resolve_hostname(host):
+    ip = None
+
+    if host in ["localhost", "127.0.0.1", "::1"]:
+        p = subprocess.Popen("ip -o addr list", shell=True,
+                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        stdout, stderr = p.communicate()
+        m = _re_inet.findall(stdout)
+        ip = m[0][1].split("/")[0]
+    else:
+        ip = socket.gethostbyname(host)
+
+    return ip
+
 def gethostbyname(host):
     global hostbyname_cache
     global hostbyname_cache_lock
@@ -82,7 +97,7 @@ def gethostbyname(host):
     hostbyname = hostbyname_cache.get(host)
     if not hostbyname:
         with hostbyname_cache_lock:
-            hostbyname = socket.gethostbyname(host)
+            hostbyname = resolve_hostname(host)
             hostbyname_cache[host] = hostbyname
 
             msg = " Added hostbyname %s - %s " % (host, hostbyname)
@@ -224,7 +239,7 @@ def rexec(command, host, user,
     """
     Executes a remote command, returns ((stdout,stderr),process)
     """
-    
+
     tmp_known_hosts = None
     if not gw:
         hostip = gethostbyname(host)
@@ -251,10 +266,7 @@ def rexec(command, host, user,
         args.extend(['-o', 'StrictHostKeyChecking=no'])
 
     if gw:
-        if gwuser:
-            proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
-        else:
-            proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
+        proxycommand = _proxy_command(gw, gwuser, identity)
         args.extend(['-o', proxycommand])
 
     if agent:
@@ -283,7 +295,7 @@ def rexec(command, host, user,
         command = "sudo " + command
 
     args.append(command)
-   
+
     log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) 
 
     stdout = stderr = stdin = subprocess.PIPE
@@ -332,9 +344,13 @@ def rcopy(source, dest,
     tmp_known_hosts = None
 
     args = ['scp', '-q', '-p', '-C',
+            # 2015-06-01 Thierry: I am commenting off blowfish
+            # as this is not available on a plain ubuntu 15.04 install
+            # this IMHO is too fragile, shoud be something the user
+            # decides explicitly (so he is at least aware of that dependency)
             # Speed up transfer using blowfish cypher specification which is 
             # faster than the default one (3des)
-            '-c', 'blowfish',
+            '-c', 'blowfish',
             # Don't bother with localhost. Makes test easier
             '-o', 'NoHostAuthenticationForLocalhost=yes',
             '-o', 'ConnectTimeout=60',
@@ -346,10 +362,7 @@ def rcopy(source, dest,
         args.append('-P%d' % port)
 
     if gw:
-        if gwuser:
-            proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw)
-        else:
-            proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw
+        proxycommand = _proxy_command(gw, gwuser, identity)
         args.extend(['-o', proxycommand])
 
     if recursive:
@@ -404,7 +417,8 @@ def rspawn(command, pidfile,
         agent = None, 
         identity = None, 
         server_key = None,
-        tty = False):
+        tty = False,
+        strict_host_checking = True):
     """
     Spawn a remote command such that it will continue working asynchronously in 
     background. 
@@ -439,7 +453,7 @@ def rspawn(command, pidfile,
         :param sudo: Flag forcing execution with sudo user
         :type sudo: bool
         
-        :rtype: touple
+        :rtype: tuple
 
         (stdout, stderr), process
         
@@ -479,7 +493,8 @@ def rspawn(command, pidfile,
         agent = agent,
         identity = identity,
         server_key = server_key,
-        tty = tty ,
+        tty = tty,
+        strict_host_checking = strict_host_checking ,
         )
     
     if proc.wait():
@@ -496,7 +511,8 @@ def rgetpid(pidfile,
         gw = None,
         agent = None, 
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
     Returns the pid and ppid of a process from a remote file where the 
     information was stored.
@@ -524,7 +540,8 @@ def rgetpid(pidfile,
         gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
         
     if proc.wait():
@@ -546,7 +563,8 @@ def rstatus(pid, ppid,
         gw = None,
         agent = None, 
         identity = None,
-        server_key = None):
+        server_key = None,
+        strict_host_checking = True):
     """
     Returns a code representing the the status of a remote process
 
@@ -572,7 +590,8 @@ def rstatus(pid, ppid,
         gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
     
     if proc.wait():
@@ -599,7 +618,8 @@ def rkill(pid, ppid,
         sudo = False,
         identity = None, 
         server_key = None, 
-        nowait = False):
+        nowait = False,
+        strict_host_checking = True):
     """
     Sends a kill signal to a remote process.
 
@@ -653,7 +673,8 @@ fi
         gw = gw,
         agent = agent,
         identity = identity,
-        server_key = server_key
+        server_key = server_key,
+        strict_host_checking = strict_host_checking
         )
     
     # wait, don't leave zombies around
@@ -688,7 +709,12 @@ def _retry_rexec(args,
         try:
             err = out = " "
             if blocking:
-                (out, err) = proc.communicate()
+                #(out, err) = proc.communicate()
+                # The method communicate was re implemented for performance issues
+                # when using python subprocess communicate method the ssh commands 
+                # last one minute each
+                out, err = _communicate(proc, input=None)
+
             elif stdout:
                 out = proc.stdout.read()
                 if proc.poll() and stderr:
@@ -722,6 +748,154 @@ def _retry_rexec(args,
             if retry <= 0:
                 raise
             retry -= 1
-        
+
     return ((out, err), proc)
 
+# POSIX
+# Don't remove. The method communicate was re implemented for performance issues
+def _communicate(proc, input, timeout=None, err_on_timeout=True):
+    read_set = []
+    write_set = []
+    stdout = None # Return
+    stderr = None # Return
+
+    killed = False
+
+    if timeout is not None:
+        timelimit = time.time() + timeout
+        killtime = timelimit + 4
+        bailtime = timelimit + 4
+
+    if proc.stdin:
+        # Flush stdio buffer.  This might block, if the user has
+        # been writing to .stdin in an uncontrolled fashion.
+        proc.stdin.flush()
+        if input:
+            write_set.append(proc.stdin)
+        else:
+            proc.stdin.close()
+
+    if proc.stdout:
+        read_set.append(proc.stdout)
+        stdout = []
+
+    if proc.stderr:
+        read_set.append(proc.stderr)
+        stderr = []
+
+    input_offset = 0
+    while read_set or write_set:
+        if timeout is not None:
+            curtime = time.time()
+            if timeout is None or curtime > timelimit:
+                if curtime > bailtime:
+                    break
+                elif curtime > killtime:
+                    signum = signal.SIGKILL
+                else:
+                    signum = signal.SIGTERM
+                # Lets kill it
+                os.kill(proc.pid, signum)
+                select_timeout = 0.5
+            else:
+                select_timeout = timelimit - curtime + 0.1
+        else:
+            select_timeout = 1.0
+
+        if select_timeout > 1.0:
+            select_timeout = 1.0
+
+        try:
+            rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
+        except select.error,e:
+            if e[0] != 4:
+                raise
+            else:
+                continue
+
+        if not rlist and not wlist and not xlist and proc.poll() is not None:
+            # timeout and process exited, say bye
+            break
+
+        if proc.stdin in wlist:
+            # When select has indicated that the file is writable,
+            # we can write up to PIPE_BUF bytes without risk
+            # blocking.  POSIX defines PIPE_BUF >= 512
+            bytes_written = os.write(proc.stdin.fileno(),
+                    buffer(input, input_offset, 512))
+            input_offset += bytes_written
+
+            if input_offset >= len(input):
+                proc.stdin.close()
+                write_set.remove(proc.stdin)
+
+        if proc.stdout in rlist:
+            data = os.read(proc.stdout.fileno(), 1024)
+            if data == "":
+                proc.stdout.close()
+                read_set.remove(proc.stdout)
+            stdout.append(data)
+
+        if proc.stderr in rlist:
+            data = os.read(proc.stderr.fileno(), 1024)
+            if data == "":
+                proc.stderr.close()
+                read_set.remove(proc.stderr)
+            stderr.append(data)
+
+    # All data exchanged.  Translate lists into strings.
+    if stdout is not None:
+        stdout = ''.join(stdout)
+    if stderr is not None:
+        stderr = ''.join(stderr)
+
+    # Translate newlines, if requested.  We cannot let the file
+    # object do the translation: It is based on stdio, which is
+    # impossible to combine with select (unless forcing no
+    # buffering).
+    if proc.universal_newlines and hasattr(file, 'newlines'):
+        if stdout:
+            stdout = proc._translate_newlines(stdout)
+        if stderr:
+            stderr = proc._translate_newlines(stderr)
+
+    if killed and err_on_timeout:
+        errcode = proc.poll()
+        raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+    else:
+        if killed:
+            proc.poll()
+        else:
+            proc.wait()
+        return (stdout, stderr)
+
+def _proxy_command(gw, gwuser, gwidentity):
+    """
+    Constructs the SSH ProxyCommand option to add to the SSH command to connect
+    via a proxy
+        :param gw: SSH proxy hostname
+        :type gw:  str 
+       
+        :param gwuser: SSH proxy username
+        :type gwuser:  str
+
+        :param gwidentity: SSH proxy identity file 
+        :type gwidentity:  str
+
+  
+        :rtype: str 
+        
+        returns the SSH ProxyCommand option.
+    """
+
+    proxycommand = 'ProxyCommand=ssh -q '
+    if gwidentity:
+        proxycommand += '-i %s ' % os.path.expanduser(gwidentity)
+    if gwuser:
+        proxycommand += '%s' % gwuser
+    else:
+        proxycommand += '%r'
+    proxycommand += '@%s -W %%h:%%p' % gw
+
+    return proxycommand
+