SSH timeout. It tends to... hang. Whatevah...
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 22 Aug 2011 16:35:53 +0000 (18:35 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 22 Aug 2011 16:35:53 +0000 (18:35 +0200)
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/server.py

index 0cde967..9a189ba 100644 (file)
@@ -191,7 +191,9 @@ class Dependency(object):
         try:
             self._popen_ssh_command(
                 "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \
-                    % { 'home' : server.shell_escape(self.home_path) }
+                    % { 'home' : server.shell_escape(self.home_path) },
+                timeout = 120,
+                retry = 3
                 )
         except RuntimeError, e:
             raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],)
@@ -396,6 +398,7 @@ class Dependency(object):
                 "cat %(token_path)s" % {
                     'token_path' : os.path.join(self.home_path, 'build.token'),
                 },
+                timeout = 120,
                 noerrors = True)
             slave_token = ""
             if not proc.wait() and out:
@@ -409,6 +412,7 @@ class Dependency(object):
                         'buildlog' : os.path.join(self.home_path, 'buildlog'),
                         'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
                     },
+                    timeout = 120,
                     noerrors = True)
                 
                 proc.wait()
@@ -547,23 +551,30 @@ class Dependency(object):
         self._do_kill_build()
 
     @server.eintr_retry
-    def _popen_scp(self, src, dst, retry = True):
-        (out,err),proc = server.popen_scp(
-            src,
-            dst, 
-            port = None,
-            agent = None,
-            ident_key = self.node.ident_path,
-            server_key = self.node.server_key
-            )
+    def _popen_scp(self, src, dst, retry = 3):
+        while 1:
+            try:
+                (out,err),proc = server.popen_scp(
+                    src,
+                    dst, 
+                    port = None,
+                    agent = None,
+                    ident_key = self.node.ident_path,
+                    server_key = self.node.server_key
+                    )
 
-        if server.eintr_retry(proc.wait)():
-            raise RuntimeError, (out, err)
-        return (out, err), proc
+                if server.eintr_retry(proc.wait)():
+                    raise RuntimeError, (out, err)
+                return (out, err), proc
+            except:
+                if retry <= 0:
+                    raise
+                else:
+                    retry -= 1
   
 
     @server.eintr_retry
-    def _popen_ssh_command(self, command, retry = True, noerrors=False):
+    def _popen_ssh_command(self, command, retry = 0, noerrors=False, timeout=None):
         (out,err),proc = server.popen_ssh_command(
             command,
             host = self.node.hostname,
@@ -571,7 +582,9 @@ class Dependency(object):
             user = self.node.slicename,
             agent = None,
             ident_key = self.node.ident_path,
-            server_key = self.node.server_key
+            server_key = self.node.server_key,
+            timeout = timeout,
+            retry = retry
             )
 
         if server.eintr_retry(proc.wait)():
index c357462..5d9320c 100644 (file)
@@ -404,7 +404,8 @@ class Node(object):
                     user = self.slicename,
                     agent = None,
                     ident_key = self.ident_path,
-                    server_key = self.server_key
+                    server_key = self.server_key,
+                    timeout = 600,
                     )
                 
                 if proc.wait():
@@ -448,7 +449,9 @@ class Node(object):
             user = self.slicename,
             agent = None,
             ident_key = self.ident_path,
-            server_key = self.server_key
+            server_key = self.server_key,
+            timeout = 60,
+            err_on_timeout = False
             )
         
         if proc.wait():
@@ -492,6 +495,8 @@ class Node(object):
                 ident_key = self.ident_path,
                 server_key = self.server_key,
                 tty = True, # so that ps -N -T works as advertised...
+                timeout = 60,
+                retry = 3
                 )
             proc.wait()
     
@@ -670,7 +675,8 @@ class Node(object):
             agent = None,
             ident_key = self.ident_path,
             server_key = self.server_key,
-            stdin = '\n'.join(rules)
+            stdin = '\n'.join(rules),
+            timeout = 300
             )
         
         if proc.wait() or err:
index 4c5c3f1..0cc7265 100644 (file)
@@ -69,7 +69,9 @@ class TunProtoBase(object):
             user = local.node.slicename,
             agent = None,
             ident_key = local.node.ident_path,
-            server_key = local.node.server_key
+            server_key = local.node.server_key,
+            timeout = 60,
+            retry = 3
             )
         
         if proc.wait():
@@ -171,7 +173,8 @@ class TunProtoBase(object):
             user = local.node.slicename,
             agent = None,
             ident_key = local.node.ident_path,
-            server_key = local.node.server_key
+            server_key = local.node.server_key,
+            timeout = 300
             )
         
         if proc.wait():
@@ -350,7 +353,9 @@ class TunProtoBase(object):
                 user = local.node.slicename,
                 agent = None,
                 ident_key = local.node.ident_path,
-                server_key = local.node.server_key
+                server_key = local.node.server_key,
+                timeout = 60,
+                err_on_timeout = False
                 )
             proc.wait()
 
@@ -366,7 +371,9 @@ class TunProtoBase(object):
                 user = local.node.slicename,
                 agent = None,
                 ident_key = local.node.ident_path,
-                server_key = local.node.server_key
+                server_key = local.node.server_key,
+                timeout = 60,
+                err_on_timeout = False
                 )
             proc.wait()
 
@@ -384,7 +391,10 @@ class TunProtoBase(object):
                 user = local.node.slicename,
                 agent = None,
                 ident_key = local.node.ident_path,
-                server_key = local.node.server_key
+                server_key = local.node.server_key,
+                timeout = 60,
+                retry = 3,
+                err_on_timeout = False
                 )
             proc.wait()
 
@@ -406,7 +416,9 @@ class TunProtoBase(object):
                         user = local.node.slicename,
                         agent = None,
                         ident_key = local.node.ident_path,
-                        server_key = local.node.server_key
+                        server_key = local.node.server_key,
+                        timeout = 60,
+                        err_on_timeout = False
                         )
                     
                     if proc.wait():
@@ -441,7 +453,9 @@ class TunProtoBase(object):
                     user = local.node.slicename,
                     agent = None,
                     ident_key = local.node.ident_path,
-                    server_key = local.node.server_key
+                    server_key = local.node.server_key,
+                    timeout = 60,
+                    err_on_timeout = False
                     )
                 
                 if proc.wait():
index 27930ad..9808b05 100644 (file)
@@ -8,6 +8,7 @@ import os.path
 import resource
 import select
 import socket
+import signal
 import sys
 import subprocess
 import threading
@@ -522,7 +523,10 @@ def popen_ssh_command(command, host, port, user, agent,
             stdin="", 
             ident_key = None,
             server_key = None,
-            tty = False):
+            tty = False,
+            timeout = None,
+            retry = 0,
+            err_on_timeout = True):
         """
         Executes a remote commands, returns ((stdout,stderr),process)
         """
@@ -548,17 +552,27 @@ def popen_ssh_command(command, host, port, user, agent,
                 server_key, host, port, args)
         args.append(command)
 
-        # connects to the remote host and starts a remote connection
-        proc = subprocess.Popen(args, 
-                stdout = subprocess.PIPE,
-                stdin = subprocess.PIPE, 
-                stderr = subprocess.PIPE)
-        
-        # attach tempfile object to the process, to make sure the file stays
-        # alive until the process is finished with it
-        proc._known_hosts = tmp_known_hosts
-        
-        out, err = proc.communicate(stdin)
+        while 1:
+            # connects to the remote host and starts a remote connection
+            proc = subprocess.Popen(args, 
+                    stdout = subprocess.PIPE,
+                    stdin = subprocess.PIPE, 
+                    stderr = subprocess.PIPE)
+            
+            # attach tempfile object to the process, to make sure the file stays
+            # alive until the process is finished with it
+            proc._known_hosts = tmp_known_hosts
+            
+            try:
+                out, err = _communicate(proc, stdin, timeout, err_on_timeout)
+                break
+            except RuntimeError,e:
+                if retry <= 0:
+                    raise
+                if TRACE:
+                    print " timedout -> ", e.args
+                retry -= 1
+            
         if TRACE:
             print " -> ", out, err
 
@@ -843,4 +857,111 @@ def popen_ssh_subprocess(python_code, host, port, user, agent,
             raise RuntimeError, "Failed to start remote python interpreter: \nout:\n%s%s\nerr:\n%s" % (
                 msg, proc.stdout.read(), proc.stderr.read())
         return proc
+
+
+# POSIX
+def _communicate(self, input, timeout=None, err_on_timeout=True):
+    read_set = []
+    write_set = []
+    stdout = None # Return
+    stderr = None # Return
+    
+    killed = False
+    
+    if timeout is not None:
+        timelimit = time.time() + timeout
+        killtime = timelimit + 4
+        bailtime = timelimit + 4
+
+    if self.stdin:
+        # Flush stdio buffer.  This might block, if the user has
+        # been writing to .stdin in an uncontrolled fashion.
+        self.stdin.flush()
+        if input:
+            write_set.append(self.stdin)
+        else:
+            self.stdin.close()
+    if self.stdout:
+        read_set.append(self.stdout)
+        stdout = []
+    if self.stderr:
+        read_set.append(self.stderr)
+        stderr = []
+
+    input_offset = 0
+    while read_set or write_set:
+        if timeout is not None:
+            curtime = time.time()
+            if timeout is None or curtime > timelimit:
+                if curtime > bailtime:
+                    break
+                elif curtime > killtime:
+                    signum = signal.SIGKILL
+                else:
+                    signum = signal.SIGTERM
+                # Lets kill it
+                os.kill(self.pid, signum)
+                select_timeout = 0.5
+            else:
+                select_timeout = timelimit - curtime + 0.1
+        else:
+            select_timeout = None
+            
+        try:
+            rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
+        except select.error,e:
+            if e[0] != 4:
+                raise
+            else:
+                continue
+
+        if self.stdin in wlist:
+            # When select has indicated that the file is writable,
+            # we can write up to PIPE_BUF bytes without risk
+            # blocking.  POSIX defines PIPE_BUF >= 512
+            bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
+            input_offset += bytes_written
+            if input_offset >= len(input):
+                self.stdin.close()
+                write_set.remove(self.stdin)
+
+        if self.stdout in rlist:
+            data = os.read(self.stdout.fileno(), 1024)
+            if data == "":
+                self.stdout.close()
+                read_set.remove(self.stdout)
+            stdout.append(data)
+
+        if self.stderr in rlist:
+            data = os.read(self.stderr.fileno(), 1024)
+            if data == "":
+                self.stderr.close()
+                read_set.remove(self.stderr)
+            stderr.append(data)
+    
+    # All data exchanged.  Translate lists into strings.
+    if stdout is not None:
+        stdout = ''.join(stdout)
+    if stderr is not None:
+        stderr = ''.join(stderr)
+
+    # Translate newlines, if requested.  We cannot let the file
+    # object do the translation: It is based on stdio, which is
+    # impossible to combine with select (unless forcing no
+    # buffering).
+    if self.universal_newlines and hasattr(file, 'newlines'):
+        if stdout:
+            stdout = self._translate_newlines(stdout)
+        if stderr:
+            stderr = self._translate_newlines(stderr)
+
+    if killed and err_on_timeout:
+        errcode = self.poll()
+        raise RuntimeError, ("Operation timed out", errcode, stdout, stderr)
+    else:
+        if killed:
+            self.poll()
+        else:
+            self.wait()
+        return (stdout, stderr)
+