From 7bc75937719b1512bc0515d544b1dc5f6903f537 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Mon, 22 Aug 2011 18:35:53 +0200 Subject: [PATCH] SSH timeout. It tends to... hang. Whatevah... --- src/nepi/testbeds/planetlab/application.py | 43 +++--- src/nepi/testbeds/planetlab/node.py | 12 +- src/nepi/testbeds/planetlab/tunproto.py | 28 +++- src/nepi/util/server.py | 147 +++++++++++++++++++-- 4 files changed, 192 insertions(+), 38 deletions(-) diff --git a/src/nepi/testbeds/planetlab/application.py b/src/nepi/testbeds/planetlab/application.py index 0cde9674..9a189ba4 100644 --- a/src/nepi/testbeds/planetlab/application.py +++ b/src/nepi/testbeds/planetlab/application.py @@ -191,7 +191,9 @@ class Dependency(object): try: self._popen_ssh_command( "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \ - % { 'home' : server.shell_escape(self.home_path) } + % { 'home' : server.shell_escape(self.home_path) }, + timeout = 120, + retry = 3 ) except RuntimeError, e: raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],) @@ -396,6 +398,7 @@ class Dependency(object): "cat %(token_path)s" % { 'token_path' : os.path.join(self.home_path, 'build.token'), }, + timeout = 120, noerrors = True) slave_token = "" if not proc.wait() and out: @@ -409,6 +412,7 @@ class Dependency(object): 'buildlog' : os.path.join(self.home_path, 'buildlog'), 'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'), }, + timeout = 120, noerrors = True) proc.wait() @@ -547,23 +551,30 @@ class Dependency(object): self._do_kill_build() @server.eintr_retry - def _popen_scp(self, src, dst, retry = True): - (out,err),proc = server.popen_scp( - src, - dst, - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) + def _popen_scp(self, src, dst, retry = 3): + while 1: + try: + (out,err),proc = server.popen_scp( + src, + dst, + port = None, + agent = None, + ident_key = self.node.ident_path, + server_key = self.node.server_key + ) - if server.eintr_retry(proc.wait)(): - raise RuntimeError, (out, err) - return (out, err), proc + if server.eintr_retry(proc.wait)(): + raise RuntimeError, (out, err) + return (out, err), proc + except: + if retry <= 0: + raise + else: + retry -= 1 @server.eintr_retry - def _popen_ssh_command(self, command, retry = True, noerrors=False): + def _popen_ssh_command(self, command, retry = 0, noerrors=False, timeout=None): (out,err),proc = server.popen_ssh_command( command, host = self.node.hostname, @@ -571,7 +582,9 @@ class Dependency(object): user = self.node.slicename, agent = None, ident_key = self.node.ident_path, - server_key = self.node.server_key + server_key = self.node.server_key, + timeout = timeout, + retry = retry ) if server.eintr_retry(proc.wait)(): diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index c3574625..5d9320c2 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -404,7 +404,8 @@ class Node(object): user = self.slicename, agent = None, ident_key = self.ident_path, - server_key = self.server_key + server_key = self.server_key, + timeout = 600, ) if proc.wait(): @@ -448,7 +449,9 @@ class Node(object): user = self.slicename, agent = None, ident_key = self.ident_path, - server_key = self.server_key + server_key = self.server_key, + timeout = 60, + err_on_timeout = False ) if proc.wait(): @@ -492,6 +495,8 @@ class Node(object): ident_key = self.ident_path, server_key = self.server_key, tty = True, # so that ps -N -T works as advertised... + timeout = 60, + retry = 3 ) proc.wait() @@ -670,7 +675,8 @@ class Node(object): agent = None, ident_key = self.ident_path, server_key = self.server_key, - stdin = '\n'.join(rules) + stdin = '\n'.join(rules), + timeout = 300 ) if proc.wait() or err: diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 4c5c3f1a..0cc72650 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -69,7 +69,9 @@ class TunProtoBase(object): user = local.node.slicename, agent = None, ident_key = local.node.ident_path, - server_key = local.node.server_key + server_key = local.node.server_key, + timeout = 60, + retry = 3 ) if proc.wait(): @@ -171,7 +173,8 @@ class TunProtoBase(object): user = local.node.slicename, agent = None, ident_key = local.node.ident_path, - server_key = local.node.server_key + server_key = local.node.server_key, + timeout = 300 ) if proc.wait(): @@ -350,7 +353,9 @@ class TunProtoBase(object): user = local.node.slicename, agent = None, ident_key = local.node.ident_path, - server_key = local.node.server_key + server_key = local.node.server_key, + timeout = 60, + err_on_timeout = False ) proc.wait() @@ -366,7 +371,9 @@ class TunProtoBase(object): user = local.node.slicename, agent = None, ident_key = local.node.ident_path, - server_key = local.node.server_key + server_key = local.node.server_key, + timeout = 60, + err_on_timeout = False ) proc.wait() @@ -384,7 +391,10 @@ class TunProtoBase(object): user = local.node.slicename, agent = None, ident_key = local.node.ident_path, - server_key = local.node.server_key + server_key = local.node.server_key, + timeout = 60, + retry = 3, + err_on_timeout = False ) proc.wait() @@ -406,7 +416,9 @@ class TunProtoBase(object): user = local.node.slicename, agent = None, ident_key = local.node.ident_path, - server_key = local.node.server_key + server_key = local.node.server_key, + timeout = 60, + err_on_timeout = False ) if proc.wait(): @@ -441,7 +453,9 @@ class TunProtoBase(object): user = local.node.slicename, agent = None, ident_key = local.node.ident_path, - server_key = local.node.server_key + server_key = local.node.server_key, + timeout = 60, + err_on_timeout = False ) if proc.wait(): diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 27930adb..9808b05e 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -8,6 +8,7 @@ import os.path import resource import select import socket +import signal import sys import subprocess import threading @@ -522,7 +523,10 @@ def popen_ssh_command(command, host, port, user, agent, stdin="", ident_key = None, server_key = None, - tty = False): + tty = False, + timeout = None, + retry = 0, + err_on_timeout = True): """ Executes a remote commands, returns ((stdout,stderr),process) """ @@ -548,17 +552,27 @@ def popen_ssh_command(command, host, port, user, agent, server_key, host, port, args) args.append(command) - # connects to the remote host and starts a remote connection - proc = subprocess.Popen(args, - stdout = subprocess.PIPE, - stdin = subprocess.PIPE, - stderr = subprocess.PIPE) - - # attach tempfile object to the process, to make sure the file stays - # alive until the process is finished with it - proc._known_hosts = tmp_known_hosts - - out, err = proc.communicate(stdin) + while 1: + # connects to the remote host and starts a remote connection + proc = subprocess.Popen(args, + stdout = subprocess.PIPE, + stdin = subprocess.PIPE, + stderr = subprocess.PIPE) + + # attach tempfile object to the process, to make sure the file stays + # alive until the process is finished with it + proc._known_hosts = tmp_known_hosts + + try: + out, err = _communicate(proc, stdin, timeout, err_on_timeout) + break + except RuntimeError,e: + if retry <= 0: + raise + if TRACE: + print " timedout -> ", e.args + retry -= 1 + if TRACE: print " -> ", out, err @@ -843,4 +857,111 @@ def popen_ssh_subprocess(python_code, host, port, user, agent, raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % ( msg, proc.stdout.read(), proc.stderr.read()) return proc - + + +# POSIX +def _communicate(self, input, timeout=None, err_on_timeout=True): + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + killed = False + + if timeout is not None: + timelimit = time.time() + timeout + killtime = timelimit + 4 + bailtime = timelimit + 4 + + if self.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + self.stdin.flush() + if input: + write_set.append(self.stdin) + else: + self.stdin.close() + if self.stdout: + read_set.append(self.stdout) + stdout = [] + if self.stderr: + read_set.append(self.stderr) + stderr = [] + + input_offset = 0 + while read_set or write_set: + if timeout is not None: + curtime = time.time() + if timeout is None or curtime > timelimit: + if curtime > bailtime: + break + elif curtime > killtime: + signum = signal.SIGKILL + else: + signum = signal.SIGTERM + # Lets kill it + os.kill(self.pid, signum) + select_timeout = 0.5 + else: + select_timeout = timelimit - curtime + 0.1 + else: + select_timeout = None + + try: + rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout) + except select.error,e: + if e[0] != 4: + raise + else: + continue + + if self.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512)) + input_offset += bytes_written + if input_offset >= len(input): + self.stdin.close() + write_set.remove(self.stdin) + + if self.stdout in rlist: + data = os.read(self.stdout.fileno(), 1024) + if data == "": + self.stdout.close() + read_set.remove(self.stdout) + stdout.append(data) + + if self.stderr in rlist: + data = os.read(self.stderr.fileno(), 1024) + if data == "": + self.stderr.close() + read_set.remove(self.stderr) + stderr.append(data) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if self.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = self._translate_newlines(stdout) + if stderr: + stderr = self._translate_newlines(stderr) + + if killed and err_on_timeout: + errcode = self.poll() + raise RuntimeError, ("Operation timed out", errcode, stdout, stderr) + else: + if killed: + self.poll() + else: + self.wait() + return (stdout, stderr) + -- 2.47.0