X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fsshfuncs.py;h=a01bdedad8d9b37b11daad973298337af7157852;hb=219ac627505b0eaf438a099d44a48f12e78a3671;hp=ac43099b4200554b62ee4a2ee6c2909e9579192c;hpb=5857fa2d26b3aa135391cf936ccde907b9ff8c68;p=nepi.git diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index ac43099b..a01bdeda 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -35,6 +34,8 @@ import threading import time import tempfile +_re_inet = re.compile("\d+:\s+(?P[a-z0-9_-]+)\s+inet6?\s+(?P[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") + logger = logging.getLogger("sshfuncs") def log(msg, level, out = None, err = None): @@ -75,6 +76,20 @@ class ProcStatus: hostbyname_cache = dict() hostbyname_cache_lock = threading.Lock() +def resolve_hostname(host): + ip = None + + if host in ["localhost", "127.0.0.1", "::1"]: + p = subprocess.Popen("ip -o addr list", shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + m = _re_inet.findall(stdout) + ip = m[0][1].split("/")[0] + else: + ip = socket.gethostbyname(host) + + return ip + def gethostbyname(host): global hostbyname_cache global hostbyname_cache_lock @@ -82,7 +97,7 @@ def gethostbyname(host): hostbyname = hostbyname_cache.get(host) if not hostbyname: with hostbyname_cache_lock: - hostbyname = socket.gethostbyname(host) + hostbyname = resolve_hostname(host) hostbyname_cache[host] = hostbyname msg = " Added hostbyname %s - %s " % (host, hostbyname) @@ -224,7 +239,7 @@ def rexec(command, host, user, """ Executes a remote command, returns ((stdout,stderr),process) """ - + tmp_known_hosts = None if not gw: hostip = gethostbyname(host) @@ -251,10 +266,7 @@ def rexec(command, host, user, args.extend(['-o', 'StrictHostKeyChecking=no']) if gw: - if gwuser: - proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw) - else: - proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw + proxycommand = _proxy_command(gw, gwuser, identity) args.extend(['-o', proxycommand]) if agent: @@ -283,8 +295,8 @@ def rexec(command, host, user, command = "sudo " + command args.append(command) - - log_msg = " rexec - host %s - command %s " % (host, " ".join(args)) + + log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) stdout = stderr = stdin = subprocess.PIPE if forward_x11: @@ -332,9 +344,13 @@ def rcopy(source, dest, tmp_known_hosts = None args = ['scp', '-q', '-p', '-C', + # 2015-06-01 Thierry: I am commenting off blowfish + # as this is not available on a plain ubuntu 15.04 install + # this IMHO is too fragile, shoud be something the user + # decides explicitly (so he is at least aware of that dependency) # Speed up transfer using blowfish cypher specification which is # faster than the default one (3des) - '-c', 'blowfish', + # '-c', 'blowfish', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', '-o', 'ConnectTimeout=60', @@ -346,10 +362,7 @@ def rcopy(source, dest, args.append('-P%d' % port) if gw: - if gwuser: - proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw) - else: - proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw + proxycommand = _proxy_command(gw, gwuser, identity) args.extend(['-o', proxycommand]) if recursive: @@ -383,7 +396,7 @@ def rcopy(source, dest, else: args.append(dest) - log_msg = " rcopy - host %s - command %s " % (host, " ".join(args)) + log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args))) return _retry_rexec(args, log_msg, env = None, retry = retry, tmp_known_hosts = tmp_known_hosts, @@ -404,7 +417,8 @@ def rspawn(command, pidfile, agent = None, identity = None, server_key = None, - tty = False): + tty = False, + strict_host_checking = True): """ Spawn a remote command such that it will continue working asynchronously in background. @@ -439,7 +453,7 @@ def rspawn(command, pidfile, :param sudo: Flag forcing execution with sudo user :type sudo: bool - :rtype: touple + :rtype: tuple (stdout, stderr), process @@ -479,7 +493,8 @@ def rspawn(command, pidfile, agent = agent, identity = identity, server_key = server_key, - tty = tty , + tty = tty, + strict_host_checking = strict_host_checking , ) if proc.wait(): @@ -496,7 +511,8 @@ def rgetpid(pidfile, gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns the pid and ppid of a process from a remote file where the information was stored. @@ -524,7 +540,8 @@ def rgetpid(pidfile, gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -546,7 +563,8 @@ def rstatus(pid, ppid, gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns a code representing the the status of a remote process @@ -572,7 +590,8 @@ def rstatus(pid, ppid, gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -599,7 +618,8 @@ def rkill(pid, ppid, sudo = False, identity = None, server_key = None, - nowait = False): + nowait = False, + strict_host_checking = True): """ Sends a kill signal to a remote process. @@ -653,7 +673,8 @@ fi gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) # wait, don't leave zombies around @@ -688,7 +709,12 @@ def _retry_rexec(args, try: err = out = " " if blocking: - (out, err) = proc.communicate() + #(out, err) = proc.communicate() + # The method communicate was re implemented for performance issues + # when using python subprocess communicate method the ssh commands + # last one minute each + out, err = _communicate(proc, input=None) + elif stdout: out = proc.stdout.read() if proc.poll() and stderr: @@ -722,6 +748,154 @@ def _retry_rexec(args, if retry <= 0: raise retry -= 1 - + return ((out, err), proc) +# POSIX +# Don't remove. The method communicate was re implemented for performance issues +def _communicate(proc, input, timeout=None, err_on_timeout=True): + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + killed = False + + if timeout is not None: + timelimit = time.time() + timeout + killtime = timelimit + 4 + bailtime = timelimit + 4 + + if proc.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + proc.stdin.flush() + if input: + write_set.append(proc.stdin) + else: + proc.stdin.close() + + if proc.stdout: + read_set.append(proc.stdout) + stdout = [] + + if proc.stderr: + read_set.append(proc.stderr) + stderr = [] + + input_offset = 0 + while read_set or write_set: + if timeout is not None: + curtime = time.time() + if timeout is None or curtime > timelimit: + if curtime > bailtime: + break + elif curtime > killtime: + signum = signal.SIGKILL + else: + signum = signal.SIGTERM + # Lets kill it + os.kill(proc.pid, signum) + select_timeout = 0.5 + else: + select_timeout = timelimit - curtime + 0.1 + else: + select_timeout = 1.0 + + if select_timeout > 1.0: + select_timeout = 1.0 + + try: + rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) + except select.error,e: + if e[0] != 4: + raise + else: + continue + + if not rlist and not wlist and not xlist and proc.poll() is not None: + # timeout and process exited, say bye + break + + if proc.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + bytes_written = os.write(proc.stdin.fileno(), + buffer(input, input_offset, 512)) + input_offset += bytes_written + + if input_offset >= len(input): + proc.stdin.close() + write_set.remove(proc.stdin) + + if proc.stdout in rlist: + data = os.read(proc.stdout.fileno(), 1024) + if data == "": + proc.stdout.close() + read_set.remove(proc.stdout) + stdout.append(data) + + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) + if data == "": + proc.stderr.close() + read_set.remove(proc.stderr) + stderr.append(data) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if proc.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = proc._translate_newlines(stdout) + if stderr: + stderr = proc._translate_newlines(stderr) + + if killed and err_on_timeout: + errcode = proc.poll() + raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) + else: + if killed: + proc.poll() + else: + proc.wait() + return (stdout, stderr) + +def _proxy_command(gw, gwuser, gwidentity): + """ + Constructs the SSH ProxyCommand option to add to the SSH command to connect + via a proxy + :param gw: SSH proxy hostname + :type gw: str + + :param gwuser: SSH proxy username + :type gwuser: str + + :param gwidentity: SSH proxy identity file + :type gwidentity: str + + + :rtype: str + + returns the SSH ProxyCommand option. + """ + + proxycommand = 'ProxyCommand=ssh -q ' + if gwidentity: + proxycommand += '-i %s ' % os.path.expanduser(gwidentity) + if gwuser: + proxycommand += '%s' % gwuser + else: + proxycommand += '%r' + proxycommand += '@%s -W %%h:%%p' % gw + + return proxycommand +