X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fsshfuncs.py;h=1899de23ed6002e04bb337c149165b53512da0d2;hb=6285ca51026efb69642eea9dfc7c480e722d84a9;hp=c5f5cd0a33a9625dc630f982054f900e920afcf0;hpb=4896d77f40a611a22f9f1f8f2ae0e63e9008fee1;p=nepi.git diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index c5f5cd0a..1899de23 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -1,21 +1,23 @@ -""" - NEPI, a framework to manage network experiments - Copyright (C) 2013 INRIA - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . - -""" +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac +# Claudio Freire + +## TODO: This code needs reviewing !!! import base64 import errno @@ -32,18 +34,17 @@ import threading import time import tempfile +_re_inet = re.compile("\d+:\s+(?P[a-z0-9_-]+)\s+inet6?\s+(?P[a-f0-9.:/]+)\s+(brd\s+[0-9.]+)?.*scope\s+global.*") + logger = logging.getLogger("sshfuncs") -def log(msg, level, out = None, err = None): +def log(msg, level = logging.DEBUG, out = None, err = None): if out: msg += " - OUT: %s " % out - if err: msg += " - ERROR: %s " % err - logger.log(level, msg) - if hasattr(os, "devnull"): DEV_NULL = os.devnull else: @@ -56,25 +57,42 @@ class STDOUT: Special value that when given to rspawn in stderr causes stderr to redirect to whatever stdout was redirected to. """ + pass -class RUNNING: - """ - Process is still running - """ - -class FINISHED: +class ProcStatus: """ - Process is finished + Codes for status of remote spawned process """ + # Process is still running + RUNNING = 1 -class NOT_STARTED: - """ - Process hasn't started running yet (this should be very rare) - """ + # Process is finished + FINISHED = 2 + + # Process hasn't started running yet (this should be very rare) + NOT_STARTED = 3 hostbyname_cache = dict() hostbyname_cache_lock = threading.Lock() +def resolve_hostname(host): + ip = None + + if host in ["localhost", "127.0.0.1", "::1"]: + p = subprocess.Popen( + "ip -o addr list", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + m = _re_inet.findall(stdout) + ip = m[0][1].split("/")[0] + else: + ip = socket.gethostbyname(host) + + return ip + def gethostbyname(host): global hostbyname_cache global hostbyname_cache_lock @@ -82,7 +100,7 @@ def gethostbyname(host): hostbyname = hostbyname_cache.get(host) if not hostbyname: with hostbyname_cache_lock: - hostbyname = socket.gethostbyname(host) + hostbyname = resolve_hostname(host) hostbyname_cache[host] = hostbyname msg = " Added hostbyname %s - %s " % (host, hostbyname) @@ -102,12 +120,15 @@ def openssh_has_persist(): """ global OPENSSH_HAS_PERSIST if OPENSSH_HAS_PERSIST is None: - proc = subprocess.Popen(["ssh","-v"], - stdout = subprocess.PIPE, - stderr = subprocess.STDOUT, - stdin = open("/dev/null","r") ) - out,err = proc.communicate() - proc.wait() + with open("/dev/null") as null: + proc = subprocess.Popen( + ["ssh", "-v"], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = null, + ) + out,err = proc.communicate() + proc.wait() vre = re.compile(r'OpenSSH_(?:[6-9]|5[.][8-9]|5[.][1-9][0-9]|[1-9][0-9]).*', re.I) OPENSSH_HAS_PERSIST = bool(vre.match(out)) @@ -145,9 +166,8 @@ def make_server_key_args(server_key, host, port): if os.environ.get('NEPI_STRICT_AUTH_MODE',"").lower() not in ('1','true','on'): user_hosts_path = '%s/.ssh/known_hosts' % (os.environ.get('HOME',""),) if os.access(user_hosts_path, os.R_OK): - f = open(user_hosts_path, "r") - tmp_known_hosts.write(f.read()) - f.close() + with open(user_hosts_path, "r") as f: + tmp_known_hosts.write(f.read()) tmp_known_hosts.flush() @@ -174,12 +194,12 @@ def shell_escape(s): return s else: # unsafe string - escape - def escp(c): + def escape(c): if (32 <= ord(c) < 127 or c in ('\r','\n','\t')) and c not in ("'",'"'): return c else: return "'$'\\x%02x''" % (ord(c),) - s = ''.join(map(escp,s)) + s = ''.join(map(escape, s)) return "'%s'" % (s,) def eintr_retry(func): @@ -191,12 +211,12 @@ def eintr_retry(func): for i in xrange(0 if retry else 4): try: return func(*p, **kw) - except (select.error, socket.error), args: + except (select.error, socket.error) as args: if args[0] == errno.EINTR: continue else: raise - except OSError, e: + except OSError as e: if e.errno == errno.EINTR: continue else: @@ -206,27 +226,29 @@ def eintr_retry(func): return rv def rexec(command, host, user, - port = None, + port = None, + gwuser = None, + gw = None, agent = True, sudo = False, - stdin = None, identity = None, server_key = None, env = None, tty = False, - timeout = None, - retry = 3, - err_on_timeout = True, connect_timeout = 30, + retry = 3, persistent = True, forward_x11 = False, + blocking = True, strict_host_checking = True): """ Executes a remote command, returns ((stdout,stderr),process) """ - + tmp_known_hosts = None - hostip = gethostbyname(host) + if not gw: + hostip = gethostbyname(host) + else: hostip = None args = ['ssh', '-C', # Don't bother with localhost. Makes test easier @@ -235,6 +257,7 @@ def rexec(command, host, user, '-o', 'ConnectionAttempts=3', '-o', 'ServerAliveInterval=30', '-o', 'TCPKeepAlive=yes', + '-o', 'Batchmode=yes', '-l', user, hostip or host] if persistent and openssh_has_persist(): @@ -247,6 +270,10 @@ def rexec(command, host, user, # Do not check for Host key. Unsafe. args.extend(['-o', 'StrictHostKeyChecking=no']) + if gw: + proxycommand = _proxy_command(gw, gwuser, identity) + args.extend(['-o', proxycommand]) + if agent: args.append('-A') @@ -254,6 +281,7 @@ def rexec(command, host, user, args.append('-p%d' % port) if identity: + identity = os.path.expanduser(identity) args.extend(('-i', identity)) if tty: @@ -268,57 +296,30 @@ def rexec(command, host, user, tmp_known_hosts = make_server_key_args(server_key, host, port) args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - args.append(command) - - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - env = env, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - - try: - out, err = _communicate(proc, stdin, timeout, err_on_timeout) - msg = " rexec - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) + if sudo: + command = "sudo " + command - if proc.poll(): - skip = False + args.append(command) - if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): - # SSH error, can safely retry - skip = True - elif retry: - # Probably timed out or plain failed but can retry - skip = True - - if skip: - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) + log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) - time.sleep(t) - continue - break - except RuntimeError, e: - msg = " rexec EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) + stdout = stderr = stdin = subprocess.PIPE + if forward_x11: + stdout = stderr = stdin = None - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + return _retry_rexec(args, log_msg, + stderr = stderr, + stdin = stdin, + stdout = stdout, + env = env, + retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = blocking) def rcopy(source, dest, - port = None, - agent = True, + port = None, + gwuser = None, + gw = None, recursive = False, identity = None, server_key = None, @@ -330,298 +331,135 @@ def rcopy(source, dest, Source and destination should have the user and host encoded as per scp specs. - If source is a file object, a special mode will be used to - create the remote file with the same contents. - - If dest is a file object, the remote file (source) will be - read and written into dest. - - In these modes, recursive cannot be True. - - Source can be a list of files to copy to a single destination, - in which case it is advised that the destination be a folder. + Source can be a list of files to copy to a single destination, + (in which case it is advised that the destination be a folder), + or a single file in a string. """ - - if isinstance(source, file) and source.tell() == 0: - source = source.name - elif hasattr(source, 'read'): - tmp = tempfile.NamedTemporaryFile() - while True: - buf = source.read(65536) - if buf: - tmp.write(buf) - else: - break - tmp.seek(0) - source = tmp.name - - if isinstance(source, file) or isinstance(dest, file) \ - or hasattr(source, 'read') or hasattr(dest, 'write'): - assert not recursive - - # Parse source/destination as @: - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) - else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - - tmp_known_hosts = None - hostip = gethostbyname(host) - - args = ['ssh', '-l', user, '-C', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - '-o', 'ConnectTimeout=60', - '-o', 'ConnectionAttempts=3', - '-o', 'ServerAliveInterval=30', - '-o', 'TCPKeepAlive=yes', - hostip or host ] - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),), - '-o', 'ControlPersist=60' ]) - - if port: - args.append('-P%d' % port) - - if identity: - args.extend(('-i', identity)) + # Parse destination as @: + if isinstance(dest, str) and ':' in dest: + remspec, path = dest.split(':',1) + elif isinstance(source, str) and ':' in source: + remspec, path = source.split(':',1) + else: + raise ValueError("Both endpoints cannot be local") + user,host = remspec.rsplit('@',1) + + # plain scp + tmp_known_hosts = None - if server_key: - # Create a temporary server key file - tmp_known_hosts = make_server_key_args(server_key, host, port) - args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - - if isinstance(source, file) or hasattr(source, 'read'): - args.append('cat > %s' % (shell_escape(path),)) - elif isinstance(dest, file) or hasattr(dest, 'write'): - args.append('cat %s' % (shell_escape(path),)) - else: - raise AssertionError, "Unreachable code reached! :-Q" - - # connects to the remote host and starts a remote connection - if isinstance(source, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif isinstance(dest, file): - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = source) - err = proc.stderr.read() - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,err), proc) - elif hasattr(source, 'read'): - # file-like (but not file) source - proc = subprocess.Popen(args, - stdout = open('/dev/null','w'), - stderr = subprocess.PIPE, - stdin = subprocess.PIPE) - - buf = None - err = [] - while True: - if not buf: - buf = source.read(4096) - if not buf: - #EOF - break - - rdrdy, wrdy, broken = select.select( - [proc.stderr], - [proc.stdin], - [proc.stderr,proc.stdin]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdin in wrdy: - proc.stdin.write(buf) - buf = None - - if broken: - break - proc.stdin.close() - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - elif hasattr(dest, 'write'): - # file-like (but not file) dest - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - stdin = open('/dev/null','w')) + args = ['scp', '-q', '-p', '-C', + # 2015-06-01 Thierry: I am commenting off blowfish + # as this is not available on a plain ubuntu 15.04 install + # this IMHO is too fragile, shoud be something the user + # decides explicitly (so he is at least aware of that dependency) + # Speed up transfer using blowfish cypher specification which is + # faster than the default one (3des) + # '-c', 'blowfish', + # Don't bother with localhost. Makes test easier + '-o', 'NoHostAuthenticationForLocalhost=yes', + '-o', 'ConnectTimeout=60', + '-o', 'ConnectionAttempts=3', + '-o', 'ServerAliveInterval=30', + '-o', 'TCPKeepAlive=yes' ] - buf = None - err = [] - while True: - rdrdy, wrdy, broken = select.select( - [proc.stderr, proc.stdout], - [], - [proc.stderr, proc.stdout]) - - if proc.stderr in rdrdy: - # use os.read for fully unbuffered behavior - err.append(os.read(proc.stderr.fileno(), 4096)) - - if proc.stdout in rdrdy: - # use os.read for fully unbuffered behavior - buf = os.read(proc.stdout.fileno(), 4096) - dest.write(buf) - - if not buf: - #EOF - break - - if broken: - break - err.append(proc.stderr.read()) - - proc._known_hosts = tmp_known_hosts - eintr_retry(proc.wait)() - return ((None,''.join(err)), proc) - else: - raise AssertionError, "Unreachable code reached! :-Q" - else: - # Parse destination as @: - if isinstance(dest, basestring) and ':' in dest: - remspec, path = dest.split(':',1) - elif isinstance(source, basestring) and ':' in source: - remspec, path = source.split(':',1) - else: - raise ValueError, "Both endpoints cannot be local" - user,host = remspec.rsplit('@',1) - - # plain scp - tmp_known_hosts = None - - args = ['scp', '-q', '-p', '-C', - # Don't bother with localhost. Makes test easier - '-o', 'NoHostAuthenticationForLocalhost=yes', - '-o', 'ConnectTimeout=60', - '-o', 'ConnectionAttempts=3', - '-o', 'ServerAliveInterval=30', - '-o', 'TCPKeepAlive=yes' ] - - if port: - args.append('-P%d' % port) + if port: + args.append('-P%d' % port) - if recursive: - args.append('-r') + if gw: + proxycommand = _proxy_command(gw, gwuser, identity) + args.extend(['-o', proxycommand]) - if identity: - args.extend(('-i', identity)) + if recursive: + args.append('-r') - if server_key: - # Create a temporary server key file - tmp_known_hosts = make_server_key_args(server_key, host, port) - args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) + if identity: + identity = os.path.expanduser(identity) + args.extend(('-i', identity)) - if not strict_host_checking: - # Do not check for Host key. Unsafe. - args.extend(['-o', 'StrictHostKeyChecking=no']) + if server_key: + # Create a temporary server key file + tmp_known_hosts = make_server_key_args(server_key, host, port) + args.extend(['-o', 'UserKnownHostsFile=%s' % (tmp_known_hosts.name,)]) - if isinstance(source,list): - args.extend(source) - else: - if openssh_has_persist(): - args.extend([ - '-o', 'ControlMaster=auto', - '-o', 'ControlPath=%s' % (make_control_path(agent, False),) - ]) - args.append(source) + if not strict_host_checking: + # Do not check for Host key. Unsafe. + args.extend(['-o', 'StrictHostKeyChecking=no']) + + if isinstance(source, list): + args.extend(source) + else: + if openssh_has_persist(): + args.extend([ + '-o', 'ControlMaster=auto', + '-o', 'ControlPath=%s' % (make_control_path(False, False),) + ]) + args.append(source) + if isinstance(dest, list): + args.extend(dest) + else: args.append(dest) - for x in xrange(retry): - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - - try: - (out, err) = proc.communicate() - eintr_retry(proc.wait)() - msg = " rcopy - host %s - command %s " % (host, " ".join(args)) - log(msg, logging.DEBUG, out, err) - - if proc.poll(): - t = x*2 - msg = "SLEEPING %d ... ATEMPT %d - host %s - command %s " % ( - t, x, host, " ".join(args)) - log(msg, logging.DEBUG) - - time.sleep(t) - continue - - break - except RuntimeError, e: - msg = " rcopy EXCEPTION - host %s - command %s - TIMEOUT -> %s" % (host, " ".join(args), e.args) - log(msg, logging.DEBUG, out, err) - - if retry <= 0: - raise - retry -= 1 - - return ((out, err), proc) + log_msg = " rcopy - host %s - command %s " % (str(host), " ".join(map(str, args))) + + return _retry_rexec(args, log_msg, env = None, retry = retry, + tmp_known_hosts = tmp_known_hosts, + blocking = True) def rspawn(command, pidfile, - stdout = '/dev/null', - stderr = STDOUT, - stdin = '/dev/null', - home = None, - create_home = False, - sudo = False, - host = None, - port = None, - user = None, - agent = None, - identity = None, - server_key = None, - tty = False): + stdout = '/dev/null', + stderr = STDOUT, + stdin = '/dev/null', + home = None, + create_home = False, + sudo = False, + host = None, + port = None, + user = None, + gwuser = None, + gw = None, + agent = None, + identity = None, + server_key = None, + tty = False, + strict_host_checking = True): """ - Spawn a remote command such that it will continue working asynchronously. - - Parameters: - command: the command to run - it should be a single line. - - pidfile: path of a (ideally unique to this task) pidfile for tracking the process. - - stdout: path of a file to redirect standard output to - must be a string. - Defaults to /dev/null - stderr: path of a file to redirect standard error to - string or the special STDOUT value - to redirect to the same file stdout was redirected to. Defaults to STDOUT. - stdin: path of a file with input to be piped into the command's standard input - - home: path of a folder to use as working directory - should exist, unless you specify create_home - - create_home: if True, the home folder will be created first with mkdir -p - - sudo: whether the command needs to be executed as root + Spawn a remote command such that it will continue working asynchronously in + background. + + :param command: The command to run, it should be a single line. + :type command: str + + :param pidfile: Path to a file where to store the pid and ppid of the + spawned process + :type pidfile: str + + :param stdout: Path to file to redirect standard output. + The default value is /dev/null + :type stdout: str + + :param stderr: Path to file to redirect standard error. + If the special STDOUT value is used, stderr will + be redirected to the same file as stdout + :type stderr: str + + :param stdin: Path to a file with input to be piped into the command's standard input + :type stdin: str + + :param home: Path to working directory folder. + It is assumed to exist unless the create_home flag is set. + :type home: str + + :param create_home: Flag to force creation of the home folder before + running the command + :type create_home: bool + + :param sudo: Flag forcing execution with sudo user + :type sudo: bool - host/port/user/agent/identity: see rexec - - Returns: + :rtype: tuple + (stdout, stderr), process Of the spawning process, which only captures errors at spawning time. @@ -634,7 +472,7 @@ def rspawn(command, pidfile, else: stderr = ' ' + stderr - daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % { + daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % { 'command' : command, 'pidfile' : shell_escape(pidfile), 'stdout' : stdout, @@ -655,39 +493,47 @@ def rspawn(command, pidfile, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, server_key = server_key, - tty = tty , + tty = tty, + strict_host_checking = strict_host_checking , ) if proc.wait(): - raise RuntimeError, "Failed to set up application on host %s: %s %s" % (host, out,err,) + raise RuntimeError("Failed to set up application on host %s: %s %s" % (host, out,err,)) return ((out, err), proc) @eintr_retry -def rcheckpid(pidfile, - host = None, - port = None, - user = None, - agent = None, - identity = None, - server_key = None): +def rgetpid(pidfile, + host = None, + port = None, + user = None, + gwuser = None, + gw = None, + agent = None, + identity = None, + server_key = None, + strict_host_checking = True): """ - Check the pidfile of a process spawned with remote_spawn. - - Parameters: - pidfile: the pidfile passed to remote_span + Returns the pid and ppid of a process from a remote file where the + information was stored. + + :param home: Path to directory where the pidfile is located + :type home: str + + :param pidfile: Name of file containing the pid information + :type pidfile: str - host/port/user/agent/identity: see rexec - - Returns: + :rtype: int - A (pid, ppid) tuple useful for calling remote_status and remote_kill, - or None if the pidfile isn't valid yet (maybe the process is still starting). - """ + A (pid, ppid) tuple useful for calling rstatus and rkill, + or None if the pidfile isn't valid yet (can happen when process is staring up) + """ (out,err),proc = rexec( "cat %(pidfile)s" % { 'pidfile' : pidfile, @@ -695,9 +541,12 @@ def rcheckpid(pidfile, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): @@ -715,22 +564,24 @@ def rstatus(pid, ppid, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, identity = None, - server_key = None): + server_key = None, + strict_host_checking = True): """ - Check the status of a process spawned with remote_spawn. + Returns a code representing the the status of a remote process + + :param pid: Process id of the process + :type pid: int + + :param ppid: Parent process id of process + :type ppid: int - Parameters: - pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid - - host/port/user/agent/identity: see rexec + :rtype: int (One of NOT_STARTED, RUNNING, FINISHED) - Returns: - - One of NOT_STARTED, RUNNING, FINISHED """ - (out,err),proc = rexec( # Check only by pid. pid+ppid does not always work (especially with sudo) " (( ps --pid %(pid)d -o pid | grep -c %(pid)d && echo 'wait') || echo 'done' ) | tail -n 1" % { @@ -740,13 +591,16 @@ def rstatus(pid, ppid, host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) if proc.wait(): - return NOT_STARTED + return ProcStatus.NOT_STARTED status = False if err: @@ -755,37 +609,38 @@ def rstatus(pid, ppid, elif out: status = (out.strip() == 'wait') else: - return NOT_STARTED - return RUNNING if status else FINISHED + return ProcStatus.NOT_STARTED + return ProcStatus.RUNNING if status else ProcStatus.FINISHED @eintr_retry def rkill(pid, ppid, host = None, port = None, user = None, + gwuser = None, + gw = None, agent = None, sudo = False, identity = None, server_key = None, - nowait = False): + nowait = False, + strict_host_checking = True): """ - Kill a process spawned with remote_spawn. - + Sends a kill signal to a remote process. + First tries a SIGTERM, and if the process does not end in 10 seconds, it sends a SIGKILL. - - Parameters: - pid/ppid: pid and parent-pid of the spawned process. See remote_check_pid - - sudo: whether the command was run with sudo - careful killing like this. - - host/port/user/agent/identity: see rexec - - Returns: + + :param pid: Process id of process to be killed + :type pid: int + + :param ppid: Parent process id of process to be killed + :type ppid: int + + :param sudo: Flag indicating if sudo should be used to kill the process + :type sudo: bool - Nothing, should have killed the process """ - subkill = "$(ps --ppid %(pid)d -o pid h)" % { 'pid' : pid } cmd = """ SUBKILL="%(subkill)s" ; @@ -819,9 +674,12 @@ fi host = host, port = port, user = user, + gwuser = gwuser, + gw = gw, agent = agent, identity = identity, - server_key = server_key + server_key = server_key, + strict_host_checking = strict_host_checking ) # wait, don't leave zombies around @@ -829,33 +687,109 @@ fi return (out, err), proc +def _retry_rexec(args, + log_msg, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE, + env = None, + retry = 3, + tmp_known_hosts = None, + blocking = True): + + for x in xrange(retry): + # display command actually invoked when debug is turned on + message = " ".join( [ "'{}'".format(arg) for arg in args ] ) + log("sshfuncs: invoking {}".format(message), logging.DEBUG) + # connects to the remote host and starts a remote connection + proc = subprocess.Popen( + args, + env = env, + stdout = stdout, + stdin = stdin, + stderr = stderr, + ) + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + + # The argument block == False forces to rexec to return immediately, + # without blocking + try: + err = out = " " + if blocking: + #(out, err) = proc.communicate() + # The method communicate was re implemented for performance issues + # when using python subprocess communicate method the ssh commands + # last one minute each + out, err = _communicate(proc, input=None) + + elif stdout: + out = proc.stdout.read() + if proc.poll() and stderr: + err = proc.stderr.read() + + log(log_msg, logging.DEBUG, out, err) + + if proc.poll(): + skip = False + + if err.strip().startswith('ssh: ') or err.strip().startswith('mux_client_hello_exchange: '): + # SSH error, can safely retry + skip = True + elif retry: + # Probably timed out or plain failed but can retry + skip = True + + if skip: + t = x*2 + msg = "SLEEPING %d ... ATEMPT %d - command %s " % ( + t, x, " ".join(args)) + log(msg, logging.DEBUG) + + time.sleep(t) + continue + break + except RuntimeError as e: + msg = " rexec EXCEPTION - TIMEOUT -> %s \n %s" % ( e.args, log_msg ) + log(msg, logging.DEBUG, out, err) + + if retry <= 0: + raise + retry -= 1 + + return ((out, err), proc) + # POSIX -def _communicate(self, input, timeout=None, err_on_timeout=True): +# Don't remove. The method communicate was re implemented for performance issues +def _communicate(proc, input, timeout=None, err_on_timeout=True): read_set = [] write_set = [] stdout = None # Return stderr = None # Return - + killed = False - + if timeout is not None: timelimit = time.time() + timeout killtime = timelimit + 4 bailtime = timelimit + 4 - if self.stdin: + if proc.stdin: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. - self.stdin.flush() + proc.stdin.flush() if input: - write_set.append(self.stdin) + write_set.append(proc.stdin) else: - self.stdin.close() - if self.stdout: - read_set.append(self.stdout) + proc.stdin.close() + + if proc.stdout: + read_set.append(proc.stdout) stdout = [] - if self.stderr: - read_set.append(self.stderr) + + if proc.stderr: + read_set.append(proc.stderr) stderr = [] input_offset = 0 @@ -870,52 +804,54 @@ def _communicate(self, input, timeout=None, err_on_timeout=True): else: signum = signal.SIGTERM # Lets kill it - os.kill(self.pid, signum) + os.kill(proc.pid, signum) select_timeout = 0.5 else: select_timeout = timelimit - curtime + 0.1 else: select_timeout = 1.0 - + if select_timeout > 1.0: select_timeout = 1.0 - + try: rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) - except select.error,e: + except select.error as e: if e[0] != 4: raise else: continue - - if not rlist and not wlist and not xlist and self.poll() is not None: + + if not rlist and not wlist and not xlist and proc.poll() is not None: # timeout and process exited, say bye break - if self.stdin in wlist: + if proc.stdin in wlist: # When select has indicated that the file is writable, # we can write up to PIPE_BUF bytes without risk # blocking. POSIX defines PIPE_BUF >= 512 - bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512)) + bytes_written = os.write(proc.stdin.fileno(), + buffer(input, input_offset, 512)) input_offset += bytes_written + if input_offset >= len(input): - self.stdin.close() - write_set.remove(self.stdin) + proc.stdin.close() + write_set.remove(proc.stdin) - if self.stdout in rlist: - data = os.read(self.stdout.fileno(), 1024) + if proc.stdout in rlist: + data = os.read(proc.stdout.fileno(), 1024) if data == "": - self.stdout.close() - read_set.remove(self.stdout) + proc.stdout.close() + read_set.remove(proc.stdout) stdout.append(data) - if self.stderr in rlist: - data = os.read(self.stderr.fileno(), 1024) + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) if data == "": - self.stderr.close() - read_set.remove(self.stderr) + proc.stderr.close() + read_set.remove(proc.stderr) stderr.append(data) - + # All data exchanged. Translate lists into strings. if stdout is not None: stdout = ''.join(stdout) @@ -926,19 +862,49 @@ def _communicate(self, input, timeout=None, err_on_timeout=True): # object do the translation: It is based on stdio, which is # impossible to combine with select (unless forcing no # buffering). - if self.universal_newlines and hasattr(file, 'newlines'): + if proc.universal_newlines and hasattr(file, 'newlines'): if stdout: - stdout = self._translate_newlines(stdout) + stdout = proc._translate_newlines(stdout) if stderr: - stderr = self._translate_newlines(stderr) + stderr = proc._translate_newlines(stderr) if killed and err_on_timeout: - errcode = self.poll() - raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) + errcode = proc.poll() + raise RuntimeError("Operation timed out", errcode, stdout, stderr) else: if killed: - self.poll() + proc.poll() else: - self.wait() + proc.wait() return (stdout, stderr) +def _proxy_command(gw, gwuser, gwidentity): + """ + Constructs the SSH ProxyCommand option to add to the SSH command to connect + via a proxy + :param gw: SSH proxy hostname + :type gw: str + + :param gwuser: SSH proxy username + :type gwuser: str + + :param gwidentity: SSH proxy identity file + :type gwidentity: str + + + :rtype: str + + returns the SSH ProxyCommand option. + """ + + proxycommand = 'ProxyCommand=ssh -q ' + if gwidentity: + proxycommand += '-i %s ' % os.path.expanduser(gwidentity) + if gwuser: + proxycommand += '%s' % gwuser + else: + proxycommand += '%r' + proxycommand += '@%s -W %%h:%%p' % gw + + return proxycommand +