From: Lucia Guevgeozian Odizzio Date: Tue, 18 Mar 2014 13:11:13 +0000 (+0100) Subject: Replacing communicate method X-Git-Tag: nepi-3.1.0~105 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=c1da6150255f59ef30b2eacd3d8ca495f399fd22;p=nepi.git Replacing communicate method --- diff --git a/src/nepi/util/sshfuncs.py b/src/nepi/util/sshfuncs.py index e8ff2130..9a416aee 100644 --- a/src/nepi/util/sshfuncs.py +++ b/src/nepi/util/sshfuncs.py @@ -224,7 +224,7 @@ def rexec(command, host, user, """ Executes a remote command, returns ((stdout,stderr),process) """ - + tmp_known_hosts = None if not gw: hostip = gethostbyname(host) @@ -283,7 +283,7 @@ def rexec(command, host, user, command = "sudo " + command args.append(command) - + log_msg = " rexec - host %s - command %s " % (str(host), " ".join(map(str, args))) stdout = stderr = stdin = subprocess.PIPE @@ -688,7 +688,12 @@ def _retry_rexec(args, try: err = out = " " if blocking: - (out, err) = proc.communicate() + #(out, err) = proc.communicate() + # The method communicate was re implemented for performance issues + # when using python subprocess communicate method the ssh commands + # last one minute each + out, err = _communicate(proc, input=None) + elif stdout: out = proc.stdout.read() if proc.poll() and stderr: @@ -722,6 +727,125 @@ def _retry_rexec(args, if retry <= 0: raise retry -= 1 - + return ((out, err), proc) +# POSIX +# Don't remove. The method communicate was re implemented for performance issues +def _communicate(proc, input, timeout=None, err_on_timeout=True): + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + killed = False + + if timeout is not None: + timelimit = time.time() + timeout + killtime = timelimit + 4 + bailtime = timelimit + 4 + + if proc.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + proc.stdin.flush() + if input: + write_set.append(proc.stdin) + else: + proc.stdin.close() + + if proc.stdout: + read_set.append(proc.stdout) + stdout = [] + + if proc.stderr: + read_set.append(proc.stderr) + stderr = [] + + input_offset = 0 + while read_set or write_set: + if timeout is not None: + curtime = time.time() + if timeout is None or curtime > timelimit: + if curtime > bailtime: + break + elif curtime > killtime: + signum = signal.SIGKILL + else: + signum = signal.SIGTERM + # Lets kill it + os.kill(proc.pid, signum) + select_timeout = 0.5 + else: + select_timeout = timelimit - curtime + 0.1 + else: + select_timeout = 1.0 + + if select_timeout > 1.0: + select_timeout = 1.0 + + try: + rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) + except select.error,e: + if e[0] != 4: + raise + else: + continue + + if not rlist and not wlist and not xlist and proc.poll() is not None: + # timeout and process exited, say bye + break + + if proc.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + bytes_written = os.write(proc.stdin.fileno(), + buffer(input, input_offset, 512)) + input_offset += bytes_written + + if input_offset >= len(input): + proc.stdin.close() + write_set.remove(proc.stdin) + + if proc.stdout in rlist: + data = os.read(proc.stdout.fileno(), 1024) + if data == "": + proc.stdout.close() + read_set.remove(proc.stdout) + stdout.append(data) + + if proc.stderr in rlist: + data = os.read(proc.stderr.fileno(), 1024) + if data == "": + proc.stderr.close() + read_set.remove(proc.stderr) + stderr.append(data) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if proc.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = proc._translate_newlines(stdout) + if stderr: + stderr = proc._translate_newlines(stderr) + + if killed and err_on_timeout: + errcode = proc.poll() + raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) + else: + if killed: + proc.poll() + else: + proc.wait() + return (stdout, stderr) + +