From 98cdcbd598dfc197252b0464654a8c7dd3c9c13a Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Thu, 6 Feb 2014 12:53:04 +0100 Subject: [PATCH] Simplifying code of sshfuncs.py. Removing not used functionality --- src/nepi/resources/linux/node.py | 6 - src/nepi/util/sshfuncs.py | 518 +++++++------------------------ 2 files changed, 109 insertions(+), 415 deletions(-) diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 4e162ce4..ee6475ce 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -428,9 +428,7 @@ class LinuxNode(ResourceManager): env = None, tty = False, forward_x11 = False, - timeout = None, retry = 3, - err_on_timeout = True, connect_timeout = 30, strict_host_checking = False, persistent = True, @@ -463,9 +461,7 @@ class LinuxNode(ResourceManager): env = env, tty = tty, forward_x11 = forward_x11, - timeout = timeout, retry = retry, - err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, @@ -485,9 +481,7 @@ class LinuxNode(ResourceManager): env = env, tty = tty, forward_x11 = forward_x11, - timeout = timeout, retry = retry, - err_on_timeout = err_on_timeout, connect_timeout = connect_timeout, persistent = persistent, blocking = blocking, diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index 43ca61c0..8c8ccf11 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -18,6 +18,8 @@ # Author: Alina Quereilhac # Claudio Freire +## TODO: This code needs reviewing !!! + import base64 import errno import hashlib @@ -213,10 +215,8 @@ def rexec(command, host, user, server_key = None, env = None, tty = False, - timeout = None, - retry = 3, - err_on_timeout = True, connect_timeout = 30, + retry = 3, persistent = True, forward_x11 = False, blocking = True, @@ -272,60 +272,12 @@ def rexec(command, host, user, command = "sudo " + command args.append(command) - - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - env = env, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - # by default, rexec calls _communicate which will block - # until the process has exit. The argument block == False - # forces to rexec to return immediately, without blocking - try: - if blocking: - out, err = _communicate(proc, stdin, timeout, err_on_timeout) - else: - err = proc.stderr.read() - out = proc.stdout.read() + log_msg = " rexec - host %s - command %s " % (host, " ".join(args)) - msg = " rexec - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) - - if proc.poll(): - skip = False - - if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): - # SSH error, can safely retry - skip = True - elif retry: - # Probably timed out or plain failed but can retry - skip = True - - if skip: - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) - - time.sleep(t) - continue - break - except RuntimeError, e: - msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) - - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + return _retry_rexec(args, log_msg, env = env, retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = blocking) def rcopy(source, dest, port = None, @@ -341,263 +293,68 @@ def rcopy(source, dest, Source and destination should have the user and host encoded as per scp specs. - If source is a file object, a special mode will be used to - create the remote file with the same contents. - - If dest is a file object, the remote file (source) will be - read and written into dest. - - In these modes, recursive cannot be True. - Source can be a list of files to copy to a single destination, in which case it is advised that the destination be a folder. """ - if isinstance(source, file) and source.tell() == 0: - source = source.name - elif hasattr(source, 'read'): - tmp = tempfile.NamedTemporaryFile() - while True: - buf = source.read(65536) - if buf: - tmp.write(buf) - else: - break - tmp.seek(0) - source = tmp.name + # Parse destination as @: + if isinstance(dest, basestring) and ':' in dest: + remspec, path = dest.split(':',1) + elif isinstance(source, basestring) and ':' in source: + remspec, path = source.split(':',1) + else: + raise ValueError, "Both endpoints cannot be local" + user,host = remspec.rsplit('@',1) - if isinstance(source, file) or isinstance(dest, file) \ - or hasattr(source, 'read') or hasattr(dest, 'write'): - assert not recursive - - # Parse source/destination as @: - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) - else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - - tmp_known_hosts = None - hostip = gethostbyname(host) - - args = ['ssh', '-l', user, '-C', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - '-o', 'ConnectTimeout=60', - '-o', 'ConnectionAttempts=3', - '-o', 'ServerAliveInterval=30', - '-o', 'TCPKeepAlive=yes', - hostip or host ] - - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),), - '-o', 'ControlPersist=60' ]) - - if port: - args.append('-P%d' % port) - - if identity: - args.extend(('-i', identity)) + # plain scp + tmp_known_hosts = None - if server_key: - # Create a temporary server key file - tmp_known_hosts = make_server_key_args(server_key, host, port) - args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - - if isinstance(source, file) or hasattr(source, 'read'): - args.append('cat > %s' % (shell_escape(path),)) - elif isinstance(dest, file) or hasattr(dest, 'write'): - args.append('cat %s' % (shell_escape(path),)) - else: - raise AssertionError, "Unreachable code reached! :-Q" - - # connects to the remote host and starts a remote connection - if isinstance(source, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif isinstance(dest, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif hasattr(source, 'read'): - # file-like (but not file) source - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = subprocess.PIPE) - - buf = None - err = [] - while True: - if not buf: - buf = source.read(4096) - if not buf: - #EOF - break - - rdrdy, wrdy, broken = select.select( - [proc.stderr], - [proc.stdin], - [proc.stderr,proc.stdin]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdin in wrdy: - proc.stdin.write(buf) - buf = None - - if broken: - break - proc.stdin.close() - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - elif hasattr(dest, 'write'): - # file-like (but not file) dest - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - stdin = open('/dev/null','w')) + args = ['scp', '-q', '-p', '-C', + # Speed up transfer using blowfish cypher specification which is + # faster than the default one (3des) + '-c', 'blowfish', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + '-o', 'ConnectTimeout=60', + '-o', 'ConnectionAttempts=3', + '-o', 'ServerAliveInterval=30', + '-o', 'TCPKeepAlive=yes' ] - buf = None - err = [] - while True: - rdrdy, wrdy, broken = select.select( - [proc.stderr, proc.stdout], - [], - [proc.stderr, proc.stdout]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdout in rdrdy: - # use os.read for fully unbuffered behavior - buf = os.read(proc.stdout.fileno(), 4096) - dest.write(buf) - - if not buf: - #EOF - break - - if broken: - break - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - else: - raise AssertionError, "Unreachable code reached! :-Q" - else: - # Parse destination as @: - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) - else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - - # plain scp - tmp_known_hosts = None - - args = ['scp', '-q', '-p', '-C', - # Speed up transfer using blowfish cypher specification which is - # faster than the default one (3des) - '-c', 'blowfish', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - '-o', 'ConnectTimeout=60', - '-o', 'ConnectionAttempts=3', - '-o', 'ServerAliveInterval=30', - '-o', 'TCPKeepAlive=yes' ] - - if port: - args.append('-P%d' % port) - - if recursive: - args.append('-r') - - if identity: - args.extend(('-i', identity)) + if port: + args.append('-P%d' % port) - if server_key: - # Create a temporary server key file - tmp_known_hosts = make_server_key_args(server_key, host, port) - args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) + if recursive: + args.append('-r') - if not strict_host_checking: - # Do not check for Host key. Unsafe. - args.extend(['-o', 'StrictHostKeyChecking=no']) + if identity: + args.extend(('-i', identity)) - if isinstance(source,list): - args.extend(source) - else: - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),) - ]) - args.append(source) - - args.append(dest) - - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - - try: - (out, err) = proc.communicate() - eintr_retry(proc.wait)() - msg = " rcopy - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) + if server_key: + # Create a temporary server key file + tmp_known_hosts = make_server_key_args(server_key, host, port) + args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - if proc.poll(): - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) + if not strict_host_checking: + # Do not check for Host key. Unsafe. + args.extend(['-o', 'StrictHostKeyChecking=no']) - time.sleep(t) - continue + if isinstance(source,list): + args.extend(source) + else: + if openssh_has_persist(): + args.extend([ + '-o', 'ControlMaster=auto', + '-o', 'ControlPath=%s' % (make_control_path(agent, False),) + ]) + args.append(source) - break - except RuntimeError, e: - msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) + args.append(dest) - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + log_msg = " rcopy - host %s - command %s " % (host, " ".join(args)) + + return _retry_rexec(args, log_msg, env = None, retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = True) def rspawn(command, pidfile, stdout = '/dev/null', @@ -855,120 +612,63 @@ fi return (out, err), proc -# POSIX -def _communicate(proc, input, timeout=None, err_on_timeout=True): - read_set = [] - write_set = [] - stdout = None # Return - stderr = None # Return - - killed = False - - if timeout is not None: - timelimit = time.time() + timeout - killtime = timelimit + 4 - bailtime = timelimit + 4 - - if proc.stdin: - # Flush stdio buffer. This might block, if the user has - # been writing to .stdin in an uncontrolled fashion. - proc.stdin.flush() - if input: - write_set.append(proc.stdin) - else: - proc.stdin.close() - - if proc.stdout: - read_set.append(proc.stdout) - stdout = [] - - if proc.stderr: - read_set.append(proc.stderr) - stderr = [] - - input_offset = 0 - while read_set or write_set: - if timeout is not None: - curtime = time.time() - if timeout is None or curtime > timelimit: - if curtime > bailtime: - break - elif curtime > killtime: - signum = signal.SIGKILL - else: - signum = signal.SIGTERM - # Lets kill it - os.kill(proc.pid, signum) - select_timeout = 0.5 - else: - select_timeout = timelimit - curtime + 0.1 - else: - select_timeout = 1.0 +def _retry_rexec(args, + log_msg, + env = None, + retry = 3, + tmp_known_hosts = None, + blocking = True): + + for x in xrange(retry): + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + env = env, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) - if select_timeout > 1.0: - select_timeout = 1.0 - + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + + # The argument block == False forces to rexec to return immediately, + # without blocking try: - rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) - except select.error,e: - if e[0] != 4: - raise + if blocking: + (out, err) = proc.communicate() else: - continue - - if not rlist and not wlist and not xlist and proc.poll() is not None: - # timeout and process exited, say bye + err = proc.stderr.read() + out = proc.stdout.read() + + log(log_msg, logging.DEBUG, out, err) + + if proc.poll(): + skip = False + + if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): + # SSH error, can safely retry + skip = True + elif retry: + # Probably timed out or plain failed but can retry + skip = True + + if skip: + t = x*2 + msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( + t, x, " ".join(args)) + log(msg, logging.DEBUG) + + time.sleep(t) + continue break + except RuntimeError, e: + msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg ) + log(msg, logging.DEBUG, out, err) + + if retry <= 0: + raise + retry -= 1 + + return ((out, err), proc) - if proc.stdin in wlist: - # When select has indicated that the file is writable, - # we can write up to PIPE_BUF bytes without risk - # blocking. POSIX defines PIPE_BUF >= 512 - bytes_written = os.write(proc.stdin.fileno(), - buffer(input, input_offset, 512)) - input_offset += bytes_written - - if input_offset >= len(input): - proc.stdin.close() - write_set.remove(proc.stdin) - - if proc.stdout in rlist: - data = os.read(proc.stdout.fileno(), 1024) - if data == "": - proc.stdout.close() - read_set.remove(proc.stdout) - stdout.append(data) - - if proc.stderr in rlist: - data = os.read(proc.stderr.fileno(), 1024) - if data == "": - proc.stderr.close() - read_set.remove(proc.stderr) - stderr.append(data) - - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = ''.join(stdout) - if stderr is not None: - stderr = ''.join(stderr) - - # Translate newlines, if requested. We cannot let the file - # object do the translation: It is based on stdio, which is - # impossible to combine with select (unless forcing no - # buffering). - if proc.universal_newlines and hasattr(file, 'newlines'): - if stdout: - stdout = proc._translate_newlines(stdout) - if stderr: - stderr = proc._translate_newlines(stderr) - - if killed and err_on_timeout: - errcode = proc.poll() - raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) - else: - if killed: - proc.poll() - else: - proc.wait() - return (stdout, stderr) -- 2.43.0