try:
self._popen_ssh_command(
"mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \
- % { 'home' : server.shell_escape(self.home_path) }
+ % { 'home' : server.shell_escape(self.home_path) },
+ timeout = 120,
+ retry = 3
)
except RuntimeError, e:
raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],)
"cat %(token_path)s" % {
'token_path' : os.path.join(self.home_path, 'build.token'),
},
+ timeout = 120,
noerrors = True)
slave_token = ""
if not proc.wait() and out:
'buildlog' : os.path.join(self.home_path, 'buildlog'),
'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
},
+ timeout = 120,
noerrors = True)
proc.wait()
self._do_kill_build()
@server.eintr_retry
- def _popen_scp(self, src, dst, retry = True):
- (out,err),proc = server.popen_scp(
- src,
- dst,
- port = None,
- agent = None,
- ident_key = self.node.ident_path,
- server_key = self.node.server_key
- )
+ def _popen_scp(self, src, dst, retry = 3):
+ while 1:
+ try:
+ (out,err),proc = server.popen_scp(
+ src,
+ dst,
+ port = None,
+ agent = None,
+ ident_key = self.node.ident_path,
+ server_key = self.node.server_key
+ )
- if server.eintr_retry(proc.wait)():
- raise RuntimeError, (out, err)
- return (out, err), proc
+ if server.eintr_retry(proc.wait)():
+ raise RuntimeError, (out, err)
+ return (out, err), proc
+ except:
+ if retry <= 0:
+ raise
+ else:
+ retry -= 1
@server.eintr_retry
- def _popen_ssh_command(self, command, retry = True, noerrors=False):
+ def _popen_ssh_command(self, command, retry = 0, noerrors=False, timeout=None):
(out,err),proc = server.popen_ssh_command(
command,
host = self.node.hostname,
user = self.node.slicename,
agent = None,
ident_key = self.node.ident_path,
- server_key = self.node.server_key
+ server_key = self.node.server_key,
+ timeout = timeout,
+ retry = retry
)
if server.eintr_retry(proc.wait)():
user = self.slicename,
agent = None,
ident_key = self.ident_path,
- server_key = self.server_key
+ server_key = self.server_key,
+ timeout = 600,
)
if proc.wait():
user = self.slicename,
agent = None,
ident_key = self.ident_path,
- server_key = self.server_key
+ server_key = self.server_key,
+ timeout = 60,
+ err_on_timeout = False
)
if proc.wait():
ident_key = self.ident_path,
server_key = self.server_key,
tty = True, # so that ps -N -T works as advertised...
+ timeout = 60,
+ retry = 3
)
proc.wait()
agent = None,
ident_key = self.ident_path,
server_key = self.server_key,
- stdin = '\n'.join(rules)
+ stdin = '\n'.join(rules),
+ timeout = 300
)
if proc.wait() or err:
user = local.node.slicename,
agent = None,
ident_key = local.node.ident_path,
- server_key = local.node.server_key
+ server_key = local.node.server_key,
+ timeout = 60,
+ retry = 3
)
if proc.wait():
user = local.node.slicename,
agent = None,
ident_key = local.node.ident_path,
- server_key = local.node.server_key
+ server_key = local.node.server_key,
+ timeout = 300
)
if proc.wait():
user = local.node.slicename,
agent = None,
ident_key = local.node.ident_path,
- server_key = local.node.server_key
+ server_key = local.node.server_key,
+ timeout = 60,
+ err_on_timeout = False
)
proc.wait()
user = local.node.slicename,
agent = None,
ident_key = local.node.ident_path,
- server_key = local.node.server_key
+ server_key = local.node.server_key,
+ timeout = 60,
+ err_on_timeout = False
)
proc.wait()
user = local.node.slicename,
agent = None,
ident_key = local.node.ident_path,
- server_key = local.node.server_key
+ server_key = local.node.server_key,
+ timeout = 60,
+ retry = 3,
+ err_on_timeout = False
)
proc.wait()
user = local.node.slicename,
agent = None,
ident_key = local.node.ident_path,
- server_key = local.node.server_key
+ server_key = local.node.server_key,
+ timeout = 60,
+ err_on_timeout = False
)
if proc.wait():
user = local.node.slicename,
agent = None,
ident_key = local.node.ident_path,
- server_key = local.node.server_key
+ server_key = local.node.server_key,
+ timeout = 60,
+ err_on_timeout = False
)
if proc.wait():
import resource
import select
import socket
+import signal
import sys
import subprocess
import threading
stdin="",
ident_key = None,
server_key = None,
- tty = False):
+ tty = False,
+ timeout = None,
+ retry = 0,
+ err_on_timeout = True):
"""
Executes a remote commands, returns ((stdout,stderr),process)
"""
server_key, host, port, args)
args.append(command)
- # connects to the remote host and starts a remote connection
- proc = subprocess.Popen(args,
- stdout = subprocess.PIPE,
- stdin = subprocess.PIPE,
- stderr = subprocess.PIPE)
-
- # attach tempfile object to the process, to make sure the file stays
- # alive until the process is finished with it
- proc._known_hosts = tmp_known_hosts
-
- out, err = proc.communicate(stdin)
+ while 1:
+ # connects to the remote host and starts a remote connection
+ proc = subprocess.Popen(args,
+ stdout = subprocess.PIPE,
+ stdin = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+
+ # attach tempfile object to the process, to make sure the file stays
+ # alive until the process is finished with it
+ proc._known_hosts = tmp_known_hosts
+
+ try:
+ out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+ break
+ except RuntimeError,e:
+ if retry <= 0:
+ raise
+ if TRACE:
+ print " timedout -> ", e.args
+ retry -= 1
+
if TRACE:
print " -> ", out, err
raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
msg, proc.stdout.read(), proc.stderr.read())
return proc
-
+
+
+# POSIX
+def _communicate(self, input, timeout=None, err_on_timeout=True):
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ killed = False
+
+ if timeout is not None:
+ timelimit = time.time() + timeout
+ killtime = timelimit + 4
+ bailtime = timelimit + 4
+
+ if self.stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ self.stdin.flush()
+ if input:
+ write_set.append(self.stdin)
+ else:
+ self.stdin.close()
+ if self.stdout:
+ read_set.append(self.stdout)
+ stdout = []
+ if self.stderr:
+ read_set.append(self.stderr)
+ stderr = []
+
+ input_offset = 0
+ while read_set or write_set:
+ if timeout is not None:
+ curtime = time.time()
+ if timeout is None or curtime > timelimit:
+ if curtime > bailtime:
+ break
+ elif curtime > killtime:
+ signum = signal.SIGKILL
+ else:
+ signum = signal.SIGTERM
+ # Lets kill it
+ os.kill(self.pid, signum)
+ select_timeout = 0.5
+ else:
+ select_timeout = timelimit - curtime + 0.1
+ else:
+ select_timeout = None
+
+ try:
+ rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
+ except select.error,e:
+ if e[0] != 4:
+ raise
+ else:
+ continue
+
+ if self.stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+ input_offset += bytes_written
+ if input_offset >= len(input):
+ self.stdin.close()
+ write_set.remove(self.stdin)
+
+ if self.stdout in rlist:
+ data = os.read(self.stdout.fileno(), 1024)
+ if data == "":
+ self.stdout.close()
+ read_set.remove(self.stdout)
+ stdout.append(data)
+
+ if self.stderr in rlist:
+ data = os.read(self.stderr.fileno(), 1024)
+ if data == "":
+ self.stderr.close()
+ read_set.remove(self.stderr)
+ stderr.append(data)
+
+ # All data exchanged. Translate lists into strings.
+ if stdout is not None:
+ stdout = ''.join(stdout)
+ if stderr is not None:
+ stderr = ''.join(stderr)
+
+ # Translate newlines, if requested. We cannot let the file
+ # object do the translation: It is based on stdio, which is
+ # impossible to combine with select (unless forcing no
+ # buffering).
+ if self.universal_newlines and hasattr(file, 'newlines'):
+ if stdout:
+ stdout = self._translate_newlines(stdout)
+ if stderr:
+ stderr = self._translate_newlines(stderr)
+
+ if killed and err_on_timeout:
+ errcode = self.poll()
+ raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+ else:
+ if killed:
+ self.poll()
+ else:
+ self.wait()
+ return (stdout, stderr)
+