X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fsshfuncs.py;h=a01bdedad8d9b37b11daad973298337af7157852;hb=219ac627505b0eaf438a099d44a48f12e78a3671;hp=458af49581af85a6b87969c3cf00f6bea0871742;hpb=68adac66099b08e3daae7a84b29af0f7c69ee955;p=nepi.git diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index 458af495..a01bdeda 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -35,6 +34,8 @@ import threading import time import tempfile +_re_inet = re.compile("\d+:\s+(?P[a-z0-9_-]+)\s+inet6?\s+(?P[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") + logger = logging.getLogger("sshfuncs") def log(msg, level, out = None, err = None): @@ -46,7 +47,6 @@ def log(msg, level, out = None, err = None): logger.log(level, msg) - if hasattr(os, "devnull"): DEV_NULL = os.devnull else: @@ -76,6 +76,20 @@ class ProcStatus: hostbyname_cache = dict() hostbyname_cache_lock = threading.Lock() +def resolve_hostname(host): + ip = None + + if host in ["localhost", "127.0.0.1", "::1"]: + p = subprocess.Popen("ip -o addr list", shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + m = _re_inet.findall(stdout) + ip = m[0][1].split("/")[0] + else: + ip = socket.gethostbyname(host) + + return ip + def gethostbyname(host): global hostbyname_cache global hostbyname_cache_lock @@ -83,7 +97,7 @@ def gethostbyname(host): hostbyname = hostbyname_cache.get(host) if not hostbyname: with hostbyname_cache_lock: - hostbyname = socket.gethostbyname(host) + hostbyname = resolve_hostname(host) hostbyname_cache[host] = hostbyname msg = " Added hostbyname %s - %s " % (host, hostbyname) @@ -206,80 +220,10 @@ def eintr_retry(func): return func(*p, **kw) return rv -def socat(local_socket_name, remote_socket_name, - host, user, - port = None, - agent = True, - sudo = False, - identity = None, - server_key = None, - env = None, - tty = False, - connect_timeout = 30, - retry = 3, - strict_host_checking = True): - """ - Executes a remote command, returns ((stdout,stderr),process) - """ - - tmp_known_hosts = None - hostip = gethostbyname(host) - - - args = ["socat"] - args.append("UNIX-LISTEN:%s,unlink-early,fork" % local_socket_name) - - ssh_args = ['ssh', '-C', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - '-o', 'ConnectTimeout=%d' % (int(connect_timeout),), - '-o', 'ConnectionAttempts=3', - '-o', 'ServerAliveInterval=30', - '-o', 'TCPKeepAlive=yes', - '-l', user, hostip or host] - - if not strict_host_checking: - # Do not check for Host key. Unsafe. - ssh_args.extend(['-o', 'StrictHostKeyChecking=no']) - - if agent: - ssh_args.append('-A') - - if port: - ssh_args.append('-p%d' % port) - - if identity: - ssh_args.extend(('-i', identity)) - - if tty: - ssh_args.append('-t') - ssh_args.append('-t') - - if server_key: - # Create a temporary server key file - tmp_known_hosts = make_server_key_args(server_key, host, port) - ssh_args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - - ssh_cmd = " ".join(ssh_args) - - exec_cmd = "EXEC:'%s socat STDIO UNIX-CONNECT\:%s'" % (ssh_cmd, - remote_socket_name) - - args.append(exec_cmd) - - log_msg = " socat - host %s - command %s " % (host, " ".join(args)) - - return _retry_rexec(args, log_msg, - stdout = None, - stdin = None, - stderr = None, - env = env, - retry = retry, - tmp_known_hosts = tmp_known_hosts, - blocking = False) - def rexec(command, host, user, - port = None, + port = None, + gwuser = None, + gw = None, agent = True, sudo = False, identity = None, @@ -295,9 +239,11 @@ def rexec(command, host, user, """ Executes a remote command, returns ((stdout,stderr),process) """ - + tmp_known_hosts = None - hostip = gethostbyname(host) + if not gw: + hostip = gethostbyname(host) + else: hostip = None args = ['ssh', '-C', # Don't bother with localhost. Makes test easier @@ -306,6 +252,7 @@ def rexec(command, host, user, '-o', 'ConnectionAttempts=3', '-o', 'ServerAliveInterval=30', '-o', 'TCPKeepAlive=yes', + '-o', 'Batchmode=yes', '-l', user, hostip or host] if persistent and openssh_has_persist(): @@ -318,6 +265,10 @@ def rexec(command, host, user, # Do not check for Host key. Unsafe. args.extend(['-o', 'StrictHostKeyChecking=no']) + if gw: + proxycommand = _proxy_command(gw, gwuser, identity) + args.extend(['-o', proxycommand]) + if agent: args.append('-A') @@ -325,6 +276,7 @@ def rexec(command, host, user, args.append('-p%d' % port) if identity: + identity = os.path.expanduser(identity) args.extend(('-i', identity)) if tty: @@ -343,8 +295,8 @@ def rexec(command, host, user, command = "sudo " + command args.append(command) - - log_msg = " rexec - host %s - command %s " % (host, " ".join(args)) + + log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) stdout = stderr = stdin = subprocess.PIPE if forward_x11: @@ -360,8 +312,9 @@ def rexec(command, host, user, blocking = blocking) def rcopy(source, dest, - port = None, - agent = True, + port = None, + gwuser = None, + gw = None, recursive = False, identity = None, server_key = None, @@ -373,14 +326,15 @@ def rcopy(source, dest, Source and destination should have the user and host encoded as per scp specs. - Source can be a list of files to copy to a single destination, - in which case it is advised that the destination be a folder. + Source can be a list of files to copy to a single destination, + (in which case it is advised that the destination be a folder), + or a single file in a string. """ - + # Parse destination as @: - if isinstance(dest, basestring) and ':' in dest: + if isinstance(dest, str) and ':' in dest: remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: + elif isinstance(source, str) and ':' in source: remspec, path = source.split(':',1) else: raise ValueError, "Both endpoints cannot be local" @@ -390,9 +344,13 @@ def rcopy(source, dest, tmp_known_hosts = None args = ['scp', '-q', '-p', '-C', + # 2015-06-01 Thierry: I am commenting off blowfish + # as this is not available on a plain ubuntu 15.04 install + # this IMHO is too fragile, shoud be something the user + # decides explicitly (so he is at least aware of that dependency) # Speed up transfer using blowfish cypher specification which is # faster than the default one (3des) - '-c', 'blowfish', + # '-c', 'blowfish', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', '-o', 'ConnectTimeout=60', @@ -403,10 +361,15 @@ def rcopy(source, dest, if port: args.append('-P%d' % port) + if gw: + proxycommand = _proxy_command(gw, gwuser, identity) + args.extend(['-o', proxycommand]) + if recursive: args.append('-r') if identity: + identity = os.path.expanduser(identity) args.extend(('-i', identity)) if server_key: @@ -417,20 +380,23 @@ def rcopy(source, dest, if not strict_host_checking: # Do not check for Host key. Unsafe. args.extend(['-o', 'StrictHostKeyChecking=no']) - + if isinstance(source, list): args.extend(source) else: if openssh_has_persist(): args.extend([ '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),) + '-o', 'ControlPath=%s' % (make_control_path(False, False),) ]) args.append(source) - args.append(dest) + if isinstance(dest, list): + args.extend(dest) + else: + args.append(dest) - log_msg = " rcopy - host %s - command %s " % (host, " ".join(args)) + log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args))) return _retry_rexec(args, log_msg, env = None, retry = retry, tmp_known_hosts = tmp_known_hosts, @@ -446,10 +412,13 @@ def rspawn(command, pidfile, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, identity = None, server_key = None, - tty = False): + tty = False, + strict_host_checking = True): """ Spawn a remote command such that it will continue working asynchronously in background. @@ -484,7 +453,7 @@ def rspawn(command, pidfile, :param sudo: Flag forcing execution with sudo user :type sudo: bool - :rtype: touple + :rtype: tuple (stdout, stderr), process @@ -519,10 +488,13 @@ def rspawn(command, pidfile, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, server_key = server_key, - tty = tty , + tty = tty, + strict_host_checking = strict_host_checking , ) if proc.wait(): @@ -535,9 +507,12 @@ def rgetpid(pidfile, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns the pid and ppid of a process from a remote file where the information was stored. @@ -561,9 +536,12 @@ def rgetpid(pidfile, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -581,9 +559,12 @@ def rstatus(pid, ppid, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns a code representing the the status of a remote process @@ -605,9 +586,12 @@ def rstatus(pid, ppid, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -628,11 +612,14 @@ def rkill(pid, ppid, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, sudo = False, identity = None, server_key = None, - nowait = False): + nowait = False, + strict_host_checking = True): """ Sends a kill signal to a remote process. @@ -682,9 +669,12 @@ fi host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) # wait, don't leave zombies around @@ -719,7 +709,12 @@ def _retry_rexec(args, try: err = out = " " if blocking: - (out, err) = proc.communicate() + #(out, err) = proc.communicate() + # The method communicate was re implemented for performance issues + # when using python subprocess communicate method the ssh commands + # last one minute each + out, err = _communicate(proc, input=None) + elif stdout: out = proc.stdout.read() if proc.poll() and stderr: @@ -753,7 +748,154 @@ def _retry_rexec(args, if retry <= 0: raise retry -= 1 - + return ((out, err), proc) +# POSIX +# Don't remove. The method communicate was re implemented for performance issues +def _communicate(proc, input, timeout=None, err_on_timeout=True): + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + killed = False + + if timeout is not None: + timelimit = time.time() + timeout + killtime = timelimit + 4 + bailtime = timelimit + 4 + + if proc.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + proc.stdin.flush() + if input: + write_set.append(proc.stdin) + else: + proc.stdin.close() + + if proc.stdout: + read_set.append(proc.stdout) + stdout = [] + + if proc.stderr: + read_set.append(proc.stderr) + stderr = [] + + input_offset = 0 + while read_set or write_set: + if timeout is not None: + curtime = time.time() + if timeout is None or curtime > timelimit: + if curtime > bailtime: + break + elif curtime > killtime: + signum = signal.SIGKILL + else: + signum = signal.SIGTERM + # Lets kill it + os.kill(proc.pid, signum) + select_timeout = 0.5 + else: + select_timeout = timelimit - curtime + 0.1 + else: + select_timeout = 1.0 + + if select_timeout > 1.0: + select_timeout = 1.0 + + try: + rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) + except select.error,e: + if e[0] != 4: + raise + else: + continue + + if not rlist and not wlist and not xlist and proc.poll() is not None: + # timeout and process exited, say bye + break + + if proc.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + bytes_written = os.write(proc.stdin.fileno(), + buffer(input, input_offset, 512)) + input_offset += bytes_written + + if input_offset >= len(input): + proc.stdin.close() + write_set.remove(proc.stdin) + + if proc.stdout in rlist: + data = os.read(proc.stdout.fileno(), 1024) + if data == "": + proc.stdout.close() + read_set.remove(proc.stdout) + stdout.append(data) + + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) + if data == "": + proc.stderr.close() + read_set.remove(proc.stderr) + stderr.append(data) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if proc.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = proc._translate_newlines(stdout) + if stderr: + stderr = proc._translate_newlines(stderr) + + if killed and err_on_timeout: + errcode = proc.poll() + raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) + else: + if killed: + proc.poll() + else: + proc.wait() + return (stdout, stderr) + +def _proxy_command(gw, gwuser, gwidentity): + """ + Constructs the SSH ProxyCommand option to add to the SSH command to connect + via a proxy + :param gw: SSH proxy hostname + :type gw: str + + :param gwuser: SSH proxy username + :type gwuser: str + + :param gwidentity: SSH proxy identity file + :type gwidentity: str + + + :rtype: str + + returns the SSH ProxyCommand option. + """ + + proxycommand = 'ProxyCommand=ssh -q ' + if gwidentity: + proxycommand += '-i %s ' % os.path.expanduser(gwidentity) + if gwuser: + proxycommand += '%s' % gwuser + else: + proxycommand += '%r' + proxycommand += '@%s -W %%h:%%p' % gw + + return proxycommand