X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fsshfuncs.py;h=a01bdedad8d9b37b11daad973298337af7157852;hb=219ac627505b0eaf438a099d44a48f12e78a3671;hp=28539419682aa81e246d6bf652b4b1c13bff4ea1;hpb=9a1ef15a5791b8e8b2f3b57db475697f77233a86;p=nepi.git diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index 28539419..a01bdeda 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -18,6 +17,8 @@ # Author: Alina Quereilhac # Claudio Freire +## TODO: This code needs reviewing !!! + import base64 import errno import hashlib @@ -33,6 +34,8 @@ import threading import time import tempfile +_re_inet = re.compile("\d+:\s+(?P[a-z0-9_-]+)\s+inet6?\s+(?P[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") + logger = logging.getLogger("sshfuncs") def log(msg, level, out = None, err = None): @@ -44,7 +47,6 @@ def log(msg, level, out = None, err = None): logger.log(level, msg) - if hasattr(os, "devnull"): DEV_NULL = os.devnull else: @@ -74,6 +76,20 @@ class ProcStatus: hostbyname_cache = dict() hostbyname_cache_lock = threading.Lock() +def resolve_hostname(host): + ip = None + + if host in ["localhost", "127.0.0.1", "::1"]: + p = subprocess.Popen("ip -o addr list", shell=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + m = _re_inet.findall(stdout) + ip = m[0][1].split("/")[0] + else: + ip = socket.gethostbyname(host) + + return ip + def gethostbyname(host): global hostbyname_cache global hostbyname_cache_lock @@ -81,7 +97,7 @@ def gethostbyname(host): hostbyname = hostbyname_cache.get(host) if not hostbyname: with hostbyname_cache_lock: - hostbyname = socket.gethostbyname(host) + hostbyname = resolve_hostname(host) hostbyname_cache[host] = hostbyname msg = " Added hostbyname %s - %s " % (host, hostbyname) @@ -210,15 +226,12 @@ def rexec(command, host, user, gw = None, agent = True, sudo = False, - stdin = None, identity = None, server_key = None, env = None, tty = False, - timeout = None, - retry = 3, - err_on_timeout = True, connect_timeout = 30, + retry = 3, persistent = True, forward_x11 = False, blocking = True, @@ -226,7 +239,7 @@ def rexec(command, host, user, """ Executes a remote command, returns ((stdout,stderr),process) """ - + tmp_known_hosts = None if not gw: hostip = gethostbyname(host) @@ -253,10 +266,7 @@ def rexec(command, host, user, args.extend(['-o', 'StrictHostKeyChecking=no']) if gw: - if gwuser: - proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw) - else: - proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw + proxycommand = _proxy_command(gw, gwuser, identity) args.extend(['-o', proxycommand]) if agent: @@ -266,6 +276,7 @@ def rexec(command, host, user, args.append('-p%d' % port) if identity: + identity = os.path.expanduser(identity) args.extend(('-i', identity)) if tty: @@ -285,65 +296,25 @@ def rexec(command, host, user, args.append(command) - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - env = env, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - - # by default, rexec calls _communicate which will block - # until the process has exit. The argument block == False - # forces to rexec to return immediately, without blocking - try: - if blocking: - out, err = _communicate(proc, stdin, timeout, err_on_timeout) - else: - err = proc.stderr.read() - out = proc.stdout.read() - - msg = " rexec - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) - - if proc.poll(): - skip = False - - if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): - # SSH error, can safely retry - skip = True - elif retry: - # Probably timed out or plain failed but can retry - skip = True - - if skip: - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) + log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) - time.sleep(t) - continue - break - except RuntimeError, e: - msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) + stdout = stderr = stdin = subprocess.PIPE + if forward_x11: + stdout = stderr = stdin = None - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + return _retry_rexec(args, log_msg, + stderr = stderr, + stdin = stdin, + stdout = stdout, + env = env, + retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = blocking) def rcopy(source, dest, port = None, gwuser = None, gw = None, - agent = True, recursive = False, identity = None, server_key = None, @@ -357,8 +328,7 @@ def rcopy(source, dest, Source can be a list of files to copy to a single destination, (in which case it is advised that the destination be a folder), - a single file in a string or a semi-colon separated list of files - in a string. + or a single file in a string. """ # Parse destination as @: @@ -374,9 +344,13 @@ def rcopy(source, dest, tmp_known_hosts = None args = ['scp', '-q', '-p', '-C', + # 2015-06-01 Thierry: I am commenting off blowfish + # as this is not available on a plain ubuntu 15.04 install + # this IMHO is too fragile, shoud be something the user + # decides explicitly (so he is at least aware of that dependency) # Speed up transfer using blowfish cypher specification which is # faster than the default one (3des) - '-c', 'blowfish', + # '-c', 'blowfish', # Don't bother with localhost. Makes test easier '-o', 'NoHostAuthenticationForLocalhost=yes', '-o', 'ConnectTimeout=60', @@ -388,16 +362,14 @@ def rcopy(source, dest, args.append('-P%d' % port) if gw: - if gwuser: - proxycommand = 'ProxyCommand=ssh %s@%s -W %%h:%%p' % (gwuser, gw) - else: - proxycommand = 'ProxyCommand=ssh %%r@%s -W %%h:%%p' % gw + proxycommand = _proxy_command(gw, gwuser, identity) args.extend(['-o', proxycommand]) if recursive: args.append('-r') if identity: + identity = os.path.expanduser(identity) args.extend(('-i', identity)) if server_key: @@ -408,59 +380,27 @@ def rcopy(source, dest, if not strict_host_checking: # Do not check for Host key. Unsafe. args.extend(['-o', 'StrictHostKeyChecking=no']) - - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),) - ]) - - if isinstance(dest, str): - dest = map(str.strip, dest.split(";")) - - if isinstance(source, str): - source = map(str.strip, source.split(";")) - - args.extend(source) - - args.extend(dest) - - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - try: - (out, err) = proc.communicate() - eintr_retry(proc.wait)() - msg = " rcopy - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) - - if proc.poll(): - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) - - time.sleep(t) - continue - - break - except RuntimeError, e: - msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) + if isinstance(source, list): + args.extend(source) + else: + if openssh_has_persist(): + args.extend([ + '-o', 'ControlMaster=auto', + '-o', 'ControlPath=%s' % (make_control_path(False, False),) + ]) + args.append(source) + + if isinstance(dest, list): + args.extend(dest) + else: + args.append(dest) - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args))) + + return _retry_rexec(args, log_msg, env = None, retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = True) def rspawn(command, pidfile, stdout = '/dev/null', @@ -477,7 +417,8 @@ def rspawn(command, pidfile, agent = None, identity = None, server_key = None, - tty = False): + tty = False, + strict_host_checking = True): """ Spawn a remote command such that it will continue working asynchronously in background. @@ -512,7 +453,7 @@ def rspawn(command, pidfile, :param sudo: Flag forcing execution with sudo user :type sudo: bool - :rtype: touple + :rtype: tuple (stdout, stderr), process @@ -552,7 +493,8 @@ def rspawn(command, pidfile, agent = agent, identity = identity, server_key = server_key, - tty = tty , + tty = tty, + strict_host_checking = strict_host_checking , ) if proc.wait(): @@ -569,7 +511,8 @@ def rgetpid(pidfile, gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns the pid and ppid of a process from a remote file where the information was stored. @@ -597,7 +540,8 @@ def rgetpid(pidfile, gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -619,7 +563,8 @@ def rstatus(pid, ppid, gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ Returns a code representing the the status of a remote process @@ -645,7 +590,8 @@ def rstatus(pid, ppid, gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -672,7 +618,8 @@ def rkill(pid, ppid, sudo = False, identity = None, server_key = None, - nowait = False): + nowait = False, + strict_host_checking = True): """ Sends a kill signal to a remote process. @@ -726,7 +673,8 @@ fi gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) # wait, don't leave zombies around @@ -734,15 +682,85 @@ fi return (out, err), proc +def _retry_rexec(args, + log_msg, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE, + env = None, + retry = 3, + tmp_known_hosts = None, + blocking = True): + + for x in xrange(retry): + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + env = env, + stdout = stdout, + stdin = stdin, + stderr = stderr) + + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + + # The argument block == False forces to rexec to return immediately, + # without blocking + try: + err = out = " " + if blocking: + #(out, err) = proc.communicate() + # The method communicate was re implemented for performance issues + # when using python subprocess communicate method the ssh commands + # last one minute each + out, err = _communicate(proc, input=None) + + elif stdout: + out = proc.stdout.read() + if proc.poll() and stderr: + err = proc.stderr.read() + + log(log_msg, logging.DEBUG, out, err) + + if proc.poll(): + skip = False + + if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): + # SSH error, can safely retry + skip = True + elif retry: + # Probably timed out or plain failed but can retry + skip = True + + if skip: + t = x*2 + msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( + t, x, " ".join(args)) + log(msg, logging.DEBUG) + + time.sleep(t) + continue + break + except RuntimeError, e: + msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg ) + log(msg, logging.DEBUG, out, err) + + if retry <= 0: + raise + retry -= 1 + + return ((out, err), proc) + # POSIX +# Don't remove. The method communicate was re implemented for performance issues def _communicate(proc, input, timeout=None, err_on_timeout=True): read_set = [] write_set = [] stdout = None # Return stderr = None # Return - + killed = False - + if timeout is not None: timelimit = time.time() + timeout killtime = timelimit + 4 @@ -783,10 +801,10 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True): select_timeout = timelimit - curtime + 0.1 else: select_timeout = 1.0 - + if select_timeout > 1.0: select_timeout = 1.0 - + try: rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) except select.error,e: @@ -794,7 +812,7 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True): raise else: continue - + if not rlist and not wlist and not xlist and proc.poll() is not None: # timeout and process exited, say bye break @@ -824,7 +842,7 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True): proc.stderr.close() read_set.remove(proc.stderr) stderr.append(data) - + # All data exchanged. Translate lists into strings. if stdout is not None: stdout = ''.join(stdout) @@ -851,3 +869,33 @@ def _communicate(proc, input, timeout=None, err_on_timeout=True): proc.wait() return (stdout, stderr) +def _proxy_command(gw, gwuser, gwidentity): + """ + Constructs the SSH ProxyCommand option to add to the SSH command to connect + via a proxy + :param gw: SSH proxy hostname + :type gw: str + + :param gwuser: SSH proxy username + :type gwuser: str + + :param gwidentity: SSH proxy identity file + :type gwidentity: str + + + :rtype: str + + returns the SSH ProxyCommand option. + """ + + proxycommand = 'ProxyCommand=ssh -q ' + if gwidentity: + proxycommand += '-i %s ' % os.path.expanduser(gwidentity) + if gwuser: + proxycommand += '%s' % gwuser + else: + proxycommand += '%r' + proxycommand += '@%s -W %%h:%%p' % gw + + return proxycommand +