X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fsshfuncs.py;h=a01bdedad8d9b37b11daad973298337af7157852;hb=219ac627505b0eaf438a099d44a48f12e78a3671;hp=8c8ccf1166dea3e3c2c298fc3a4794d83bea5962;hpb=98cdcbd598dfc197252b0464654a8c7dd3c9c13a;p=nepi.git diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index 8c8ccf11..a01bdeda 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -35,6 +34,8 @@ import threading import time import tempfile +_re_inet = re.compile("\d+:\s+(?P[a-z0-9_-]+)\s+inet6?\s+(?P[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") + logger = logging.getLogger("sshfuncs") def log(msg, level, out = None, err = None): @@ -46,7 +47,6 @@ def log(msg, level, out = None, err = None): logger.log(level, msg) - if hasattr(os, "devnull"): DEV_NULL = os.devnull else: @@ -76,6 +76,20 @@ class ProcStatus: hostbyname_cache = dict() hostbyname_cache_lock = threading.Lock() +def resolve_hostname(host): + ip = None + + if host in ["localhost", "127.0.0.1", "::1"]: + p = subprocess.Popen("ip -o addr list", shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + m = _re_inet.findall(stdout) + ip = m[0][1].split("/")[0] + else: + ip = socket.gethostbyname(host) + + return ip + def gethostbyname(host): global hostbyname_cache global hostbyname_cache_lock @@ -83,7 +97,7 @@ def gethostbyname(host): hostbyname = hostbyname_cache.get(host) if not hostbyname: with hostbyname_cache_lock: - hostbyname = socket.gethostbyname(host) + hostbyname = resolve_hostname(host) hostbyname_cache[host] = hostbyname msg = " Added hostbyname %s - %s " % (host, hostbyname) @@ -207,10 +221,11 @@ def eintr_retry(func): return rv def rexec(command, host, user, - port = None, + port = None, + gwuser = None, + gw = None, agent = True, sudo = False, - stdin = None, identity = None, server_key = None, env = None, @@ -224,9 +239,11 @@ def rexec(command, host, user, """ Executes a remote command, returns ((stdout,stderr),process) """ - + tmp_known_hosts = None - hostip = gethostbyname(host) + if not gw: + hostip = gethostbyname(host) + else: hostip = None args = ['ssh', '-C', # Don't bother with localhost. Makes test easier @@ -235,6 +252,7 @@ def rexec(command, host, user, '-o', 'ConnectionAttempts=3', '-o', 'ServerAliveInterval=30', '-o', 'TCPKeepAlive=yes', + '-o', 'Batchmode=yes', '-l', user, hostip or host] if persistent and openssh_has_persist(): @@ -247,6 +265,10 @@ def rexec(command, host, user, # Do not check for Host key. Unsafe. args.extend(['-o', 'StrictHostKeyChecking=no']) + if gw: + proxycommand = _proxy_command(gw, gwuser, identity) + args.extend(['-o', proxycommand]) + if agent: args.append('-A') @@ -254,6 +276,7 @@ def rexec(command, host, user, args.append('-p%d' % port) if identity: + identity = os.path.expanduser(identity) args.extend(('-i', identity)) if tty: @@ -272,16 +295,26 @@ def rexec(command, host, user, command = "sudo " + command args.append(command) - - log_msg = " rexec - host %s - command %s " % (host, " ".join(args)) - return _retry_rexec(args, log_msg, env = env, retry = retry, + log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) + + stdout = stderr = stdin = subprocess.PIPE + if forward_x11: + stdout = stderr = stdin = None + + return _retry_rexec(args, log_msg, + stderr = stderr, + stdin = stdin, + stdout = stdout, + env = env, + retry = retry, tmp_known_hosts = tmp_known_hosts, blocking = blocking) def rcopy(source, dest, - port = None, - agent = True, + port = None, + gwuser = None, + gw = None, recursive = False, identity = None, server_key = None, @@ -293,14 +326,15 @@ def rcopy(source, dest, Source and destination should have the user and host encoded as per scp specs. - Source can be a list of files to copy to a single destination, - in which case it is advised that the destination be a folder. + Source can be a list of files to copy to a single destination, + (in which case it is advised that the destination be a folder), + or a single file in a string. """ - + # Parse destination as @: - if isinstance(dest, basestring) and ':' in dest: + if isinstance(dest, str) and ':' in dest: remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: + elif isinstance(source, str) and ':' in source: remspec, path = source.split(':',1) else: raise ValueError, "Both endpoints cannot be local" @@ -310,9 +344,13 @@ def rcopy(source, dest, tmp_known_hosts = None args = ['scp', '-q', '-p', '-C', + # 2015-06-01 Thierry: I am commenting off blowfish + # as this is not available on a plain ubuntu 15.04 install + # this IMHO is too fragile, shoud be something the user + # decides explicitly (so he is at least aware of that dependency) # Speed up transfer using blowfish cypher specification which is # faster than the default one (3des) - '-c', 'blowfish', + # '-c', 'blowfish', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', '-o', 'ConnectTimeout=60', @@ -323,10 +361,15 @@ def rcopy(source, dest, if port: args.append('-P%d' % port) + if gw: + proxycommand = _proxy_command(gw, gwuser, identity) + args.extend(['-o', proxycommand]) + if recursive: args.append('-r') if identity: + identity = os.path.expanduser(identity) args.extend(('-i', identity)) if server_key: @@ -337,20 +380,23 @@ def rcopy(source, dest, if not strict_host_checking: # Do not check for Host key. Unsafe. args.extend(['-o', 'StrictHostKeyChecking=no']) - - if isinstance(source,list): + + if isinstance(source, list): args.extend(source) else: if openssh_has_persist(): args.extend([ '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),) + '-o', 'ControlPath=%s' % (make_control_path(False, False),) ]) args.append(source) - args.append(dest) + if isinstance(dest, list): + args.extend(dest) + else: + args.append(dest) - log_msg = " rcopy - host %s - command %s " % (host, " ".join(args)) + log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args))) return _retry_rexec(args, log_msg, env = None, retry = retry, tmp_known_hosts = tmp_known_hosts, @@ -366,10 +412,13 @@ def rspawn(command, pidfile, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, identity = None, server_key = None, - tty = False): + tty = False, + strict_host_checking = True): """ Spawn a remote command such that it will continue working asynchronously in background. @@ -404,7 +453,7 @@ def rspawn(command, pidfile, :param sudo: Flag forcing execution with sudo user :type sudo: bool - :rtype: touple + :rtype: tuple (stdout, stderr), process @@ -439,10 +488,13 @@ def rspawn(command, pidfile, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, server_key = server_key, - tty = tty , + tty = tty, + strict_host_checking = strict_host_checking , ) if proc.wait(): @@ -455,9 +507,12 @@ def rgetpid(pidfile, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns the pid and ppid of a process from a remote file where the information was stored. @@ -481,9 +536,12 @@ def rgetpid(pidfile, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -501,9 +559,12 @@ def rstatus(pid, ppid, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns a code representing the the status of a remote process @@ -525,9 +586,12 @@ def rstatus(pid, ppid, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -548,11 +612,14 @@ def rkill(pid, ppid, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, sudo = False, identity = None, server_key = None, - nowait = False): + nowait = False, + strict_host_checking = True): """ Sends a kill signal to a remote process. @@ -602,9 +669,12 @@ fi host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) # wait, don't leave zombies around @@ -614,6 +684,9 @@ fi def _retry_rexec(args, log_msg, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE, env = None, retry = 3, tmp_known_hosts = None, @@ -623,9 +696,9 @@ def _retry_rexec(args, # connects to the remote host and starts a remote connection proc = subprocess.Popen(args, env = env, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) + stdout = stdout, + stdin = stdin, + stderr = stderr) # attach tempfile object to the process, to make sure the file stays # alive until the process is finished with it @@ -634,11 +707,18 @@ def _retry_rexec(args, # The argument block == False forces to rexec to return immediately, # without blocking try: + err = out = " " if blocking: - (out, err) = proc.communicate() - else: - err = proc.stderr.read() + #(out, err) = proc.communicate() + # The method communicate was re implemented for performance issues + # when using python subprocess communicate method the ssh commands + # last one minute each + out, err = _communicate(proc, input=None) + + elif stdout: out = proc.stdout.read() + if proc.poll() and stderr: + err = proc.stderr.read() log(log_msg, logging.DEBUG, out, err) @@ -668,7 +748,154 @@ def _retry_rexec(args, if retry <= 0: raise retry -= 1 - + return ((out, err), proc) +# POSIX +# Don't remove. The method communicate was re implemented for performance issues +def _communicate(proc, input, timeout=None, err_on_timeout=True): + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + killed = False + + if timeout is not None: + timelimit = time.time() + timeout + killtime = timelimit + 4 + bailtime = timelimit + 4 + + if proc.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + proc.stdin.flush() + if input: + write_set.append(proc.stdin) + else: + proc.stdin.close() + + if proc.stdout: + read_set.append(proc.stdout) + stdout = [] + + if proc.stderr: + read_set.append(proc.stderr) + stderr = [] + + input_offset = 0 + while read_set or write_set: + if timeout is not None: + curtime = time.time() + if timeout is None or curtime > timelimit: + if curtime > bailtime: + break + elif curtime > killtime: + signum = signal.SIGKILL + else: + signum = signal.SIGTERM + # Lets kill it + os.kill(proc.pid, signum) + select_timeout = 0.5 + else: + select_timeout = timelimit - curtime + 0.1 + else: + select_timeout = 1.0 + + if select_timeout > 1.0: + select_timeout = 1.0 + + try: + rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) + except select.error,e: + if e[0] != 4: + raise + else: + continue + + if not rlist and not wlist and not xlist and proc.poll() is not None: + # timeout and process exited, say bye + break + + if proc.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + bytes_written = os.write(proc.stdin.fileno(), + buffer(input, input_offset, 512)) + input_offset += bytes_written + + if input_offset >= len(input): + proc.stdin.close() + write_set.remove(proc.stdin) + + if proc.stdout in rlist: + data = os.read(proc.stdout.fileno(), 1024) + if data == "": + proc.stdout.close() + read_set.remove(proc.stdout) + stdout.append(data) + + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) + if data == "": + proc.stderr.close() + read_set.remove(proc.stderr) + stderr.append(data) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if proc.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = proc._translate_newlines(stdout) + if stderr: + stderr = proc._translate_newlines(stderr) + + if killed and err_on_timeout: + errcode = proc.poll() + raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) + else: + if killed: + proc.poll() + else: + proc.wait() + return (stdout, stderr) + +def _proxy_command(gw, gwuser, gwidentity): + """ + Constructs the SSH ProxyCommand option to add to the SSH command to connect + via a proxy + :param gw: SSH proxy hostname + :type gw: str + + :param gwuser: SSH proxy username + :type gwuser: str + + :param gwidentity: SSH proxy identity file + :type gwidentity: str + + + :rtype: str + + returns the SSH ProxyCommand option. + """ + + proxycommand = 'ProxyCommand=ssh -q ' + if gwidentity: + proxycommand += '-i %s ' % os.path.expanduser(gwidentity) + if gwuser: + proxycommand += '%s' % gwuser + else: + proxycommand += '%r' + proxycommand += '@%s -W %%h:%%p' % gw + + return proxycommand