Retry retriable operations when we get an EINTR
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 23 Jun 2011 15:09:34 +0000 (17:09 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Thu, 23 Jun 2011 15:09:34 +0000 (17:09 +0200)
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/rspawn.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/server.py

index 50167ac..227cadb 100644 (file)
@@ -125,21 +125,17 @@ class Dependency(object):
             raise RuntimeError, "Failed to synchronize trace"
         
         # sync files
-        (out,err),proc = server.popen_scp(
-            '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
-                tracefile),
-            local_path,
-            port = None,
-            agent = None,
-            ident_key = self.node.ident_path,
-            server_key = self.node.server_key
-            )
-        
-        if proc.wait():
-            raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,)
+        try:
+            self._popen_scp(
+                '%s@%s:%s' % (self.node.slicename, self.node.hostname,
+                    tracefile),
+                local_path
+                )
+        except RuntimeError, e:
+            raise RuntimeError, "Failed to synchronize trace: %s %s" \
+                    % (e.args[0], e.args[1],)
         
         return local_path
-    
 
     def setup(self):
         self._make_home()
@@ -175,33 +171,26 @@ class Dependency(object):
     def _make_home(self):
         # Make sure all the paths are created where 
         # they have to be created for deployment
-        (out,err),proc = server.popen_ssh_command(
-            "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" % { 'home' : server.shell_escape(self.home_path) },
-            host = self.node.hostname,
-            port = None,
-            user = self.node.slicename,
-            agent = None,
-            ident_key = self.node.ident_path,
-            server_key = self.node.server_key
-            )
-        
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
+        # sync files
+        try:
+            self._popen_ssh_command(
+                "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \
+                    % { 'home' : server.shell_escape(self.home_path) }
+                )
+        except RuntimeError, e:
+            raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],)
         
         if self.stdin:
             # Write program input
-            (out,err),proc = server.popen_scp(
-                cStringIO.StringIO(self.stdin),
-                '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
-                    os.path.join(self.home_path, 'stdin') ),
-                port = None,
-                agent = None,
-                ident_key = self.node.ident_path,
-                server_key = self.node.server_key
-                )
-            
-            if proc.wait():
-                raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
+            try:
+                self._popen_scp(
+                    cStringIO.StringIO(self.stdin),
+                    '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
+                        os.path.join(self.home_path, 'stdin') ),
+                    )
+            except RuntimeError, e:
+                raise RuntimeError, "Failed to set up application %s: %s %s" \
+                        % (self.home_path, e.args[0], e.args[1],)
 
     def _replace_paths(self, command):
         """
@@ -222,18 +211,15 @@ class Dependency(object):
             
         if buildscript is not None:
             # upload build script
-            (out,err),proc = server.popen_scp(
-                buildscript,
-                '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
-                    os.path.join(self.home_path, 'nepi-build.sh') ),
-                port = None,
-                agent = None,
-                ident_key = self.node.ident_path,
-                server_key = self.node.server_key
-                )
-            
-            if proc.wait():
-                raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,)
+            try:
+                self._popen_scp(
+                    buildscript,
+                    '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
+                        os.path.join(self.home_path, 'nepi-build.sh') )
+                    )
+            except RuntimeError, e:
+                raise RuntimeError, "Failed to set up application %s: %s %s" \
+                        % (self.home_path, e.args[0], e.args[1],)
             
             # launch build
             self._do_launch_build()
@@ -314,7 +300,6 @@ class Dependency(object):
             )
         (out,err),proc = rspawn.remote_spawn(
             script,
-            
             pidfile = 'build-pid',
             home = self.home_path,
             stdin = '/dev/null',
@@ -381,19 +366,10 @@ class Dependency(object):
                     delay = min(30,delay*1.2)
             
             # check build token
-
-            (out,err),proc = server.popen_ssh_command(
+            (out, err), proc = self._popen_ssh_command(
                 "cat %(token_path)s" % {
                     'token_path' : os.path.join(self.home_path, 'build.token'),
-                },
-                host = self.node.hostname,
-                port = None,
-                user = self.node.slicename,
-                agent = None,
-                ident_key = self.node.ident_path,
-                server_key = self.node.server_key
-                )
-            
+                })
             slave_token = ""
             if not proc.wait() and out:
                 slave_token = out.strip()
@@ -401,18 +377,11 @@ class Dependency(object):
             if slave_token != self._master_token:
                 # Get buildlog for the error message
 
-                (buildlog,err),proc = server.popen_ssh_command(
+                (buildlog, err), proc = self._popen_ssh_command(
                     "cat %(buildlog)s" % {
                         'buildlog' : os.path.join(self.home_path, 'buildlog'),
                         'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
-                    },
-                    host = self.node.hostname,
-                    port = None,
-                    user = self.node.slicename,
-                    agent = None,
-                    ident_key = self.node.ident_path,
-                    server_key = self.node.server_key
-                    )
+                    })
                 
                 proc.wait()
                 
@@ -444,16 +413,15 @@ class Dependency(object):
             sources = self.sources.split(' ')
             
             # Copy all sources
-            (out,err),proc = server.popen_scp(
-                sources,
-                "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
-                    os.path.join(self.home_path,'.'),),
-                ident_key = self.node.ident_path,
-                server_key = self.node.server_key
-                )
-        
-            if proc.wait():
-                raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,)
+            try:
+                self._popen_scp(
+                    sources,
+                    "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
+                        os.path.join(self.home_path,'.'),)
+                    )
+            except RuntimeError, e:
+                raise RuntimeError, "Failed upload source file %r: %s %s" \
+                        % (sources, e.args[0], e.args[1],)
             
         buildscript = cStringIO.StringIO()
         
@@ -485,26 +453,20 @@ class Dependency(object):
         buildscript.seek(0)
 
         return buildscript
-        
 
     def _do_install(self):
         if self.install:
             # Install application
-            (out,err),proc = server.popen_ssh_command(
-                "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/installlog >&2 && false )" % {
-                    'command' : self._replace_paths(self.install),
-                    'home' : server.shell_escape(self.home_path),
-                },
-                host = self.node.hostname,
-                port = None,
-                user = self.node.slicename,
-                agent = None,
-                ident_key = self.node.ident_path,
-                server_key = self.node.server_key
-                )
-        
-            if proc.wait():
-                raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,)
+            try:
+                self._popen_ssh_command(
+                    "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/installlog >&2 && false )" % \
+                        {
+                        'command' : self._replace_paths(self.install),
+                        'home' : server.shell_escape(self.home_path),
+                        }
+                    )
+            except RuntimeError, e:
+                raise RuntimeError, "Failed instal build sources: %s %s" % (e.args[0], e.args[1],)
 
     def set_master(self, master):
         self._master = master
@@ -520,42 +482,67 @@ class Dependency(object):
     def _do_install_keys(self):
         prk = self._master_prk
         puk = self._master_puk
+       
+        try:
+            self._popen_scp(
+                [ prk.name, puk.name ],
+                '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path )
+                )
+        except RuntimeError, e:
+            raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
+                    % (e.args[0], e.args[1],)
+
+        try:
+            self._popen_scp(
+                cStringIO.StringIO('%s,%s %s\n' % (
+                    self._master.node.hostname, socket.gethostbyname(self._master.node.hostname), 
+                    self._master.node.server_key)),
+                '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
+                    os.path.join(self.home_path,"master_known_hosts") )
+                )
+        except RuntimeError, e:
+            raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
+                    % (e.args[0], e.args[1],)
         
+        # No longer need'em
+        self._master_prk = None
+        self._master_puk = None
+    
+    def cleanup(self):
+        # make sure there's no leftover build processes
+        self._do_kill_build()
+
+    @server.eintr_retry
+    def _popen_scp(self, src, dst, retry = True):
         (out,err),proc = server.popen_scp(
-            [ prk.name, puk.name ],
-            '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path ),
+            src,
+            dst, 
             port = None,
             agent = None,
             ident_key = self.node.ident_path,
             server_key = self.node.server_key
             )
-        
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,)
 
-        (out,err),proc = server.popen_scp(
-            cStringIO.StringIO('%s,%s %s\n' % (
-                self._master.node.hostname, socket.gethostbyname(self._master.node.hostname), 
-                self._master.node.server_key)),
-            '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
-                os.path.join(self.home_path,"master_known_hosts") ),
+        if server.eintr_retry(proc.wait)():
+            raise RuntimeError, (out, err)
+        return (out, err), proc
+  
+
+    @server.eintr_retry
+    def _popen_ssh_command(self, command, retry = True):
+        (out,err),proc = server.popen_ssh_command(
+            command,
+            host = self.node.hostname,
             port = None,
+            user = self.node.slicename,
             agent = None,
             ident_key = self.node.ident_path,
             server_key = self.node.server_key
             )
-        
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,)
-        
-        # No longer need'em
-        self._master_prk = None
-        self._master_puk = None
-    
-    def cleanup(self):
-        # make sure there's no leftover build processes
-        self._do_kill_build()
 
+        if server.eintr_retry(proc.wait)():
+            raise RuntimeError, (out, err)
+        return (out, err), proc
 
 class Application(Dependency):
     """
@@ -611,19 +598,16 @@ class Application(Dependency):
                     command.write('export %s=%s\n' % (envkey, envval))
         command.write(self.command)
         command.seek(0)
-        
-        (out,err),proc = server.popen_scp(
-            command,
-            '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
-                os.path.join(self.home_path, "app.sh")),
-            port = None,
-            agent = None,
-            ident_key = self.node.ident_path,
-            server_key = self.node.server_key
-            )
-        
-        if proc.wait():
-            raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+
+        try:
+            self._popen_scp(
+                command,
+                '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
+                    os.path.join(self.home_path, "app.sh"))
+                )
+        except RuntimeError, e:
+            raise RuntimeError, "Failed to set up application: %s %s" \
+                    % (e.args[0], e.args[1],)
         
         # Start process in a "daemonized" way, using nohup and heavy
         # stdin/out redirection to avoid connection issues
@@ -707,7 +691,8 @@ class Application(Dependency):
                 ident_key = self.node.ident_path,
                 server_key = self.node.server_key
                 )
-    
+
+
 class NepiDependency(Dependency):
     """
     This dependency adds nepi itself to the python path,
index e534257..544c19d 100644 (file)
@@ -410,7 +410,7 @@ class Node(object):
     def is_alive(self):
         # Make sure all the paths are created where 
         # they have to be created for deployment
-        (out,err),proc = server.popen_ssh_command(
+        (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
             "echo 'ALIVE'",
             host = self.hostname,
             port = None,
index 618235b..d28ca2d 100644 (file)
@@ -96,6 +96,7 @@ def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/de
 
     return (out,err),proc
 
+@server.eintr_retry
 def remote_check_pid(pidfile,
         host = None, port = None, user = None, agent = None, 
         ident_key = None, server_key = None):
@@ -136,6 +137,7 @@ def remote_check_pid(pidfile,
             return None
 
 
+@server.eintr_retry
 def remote_status(pid, ppid, 
         host = None, port = None, user = None, agent = None, 
         ident_key = None, server_key = None):
@@ -178,6 +180,7 @@ def remote_status(pid, ppid,
     return RUNNING if status else FINISHED
     
 
+@server.eintr_retry
 def remote_kill(pid, ppid, sudo = False,
         host = None, port = None, user = None, agent = None, 
         ident_key = None, server_key = None,
index 6e518f4..243b7c2 100644 (file)
@@ -49,7 +49,7 @@ class TunProtoBase(object):
         cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
             'home' : server.shell_escape(self.home_path)
         }
-        (out,err),proc = server.popen_ssh_command(
+        (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
             cmd,
             host = local.node.hostname,
             port = None,
@@ -81,7 +81,7 @@ class TunProtoBase(object):
         dest = "%s@%s:%s" % (
             local.node.slicename, local.node.hostname, 
             os.path.join(self.home_path,'.'),)
-        (out,err),proc = server.popen_scp(
+        (out,err),proc = server.eintr_retry(server.popen_scp)(
             sources,
             dest,
             ident_key = local.node.ident_path,
@@ -245,7 +245,7 @@ class TunProtoBase(object):
             if self.status() != rspawn.RUNNING:
                 break
             
-            (out,err),proc = server.popen_ssh_command(
+            (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
                 "cd %(home)s ; grep -c Connected capture" % dict(
                     home = server.shell_escape(self.home_path)),
                 host = local.node.hostname,
@@ -271,7 +271,7 @@ class TunProtoBase(object):
             local = self.local()
             if local:
                 for spin in xrange(30):
-                    (out,err),proc = server.popen_ssh_command(
+                    (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
                         "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
                             home = server.shell_escape(self.home_path)),
                         host = local.node.hostname,
index 0491b55..d6079af 100644 (file)
@@ -49,6 +49,23 @@ def shell_escape(s):
         s = ''.join(map(escp,s))
         return "'%s'" % (s,)
 
+def eintr_retry(func):
+    import functools
+    @functools.wraps(func)
+    def rv(*p, **kw):
+        retry = kw.pop("_retry", False)
+        for i in xrange(0 if retry else 4):
+            try:
+                return func(*p, **kw)
+            except select.error, args:
+                if args[0] == errno.EINTR:
+                    continue
+                else:
+                    raise 
+        else:
+            return func(*p, **kw)
+    return rv
+
 class Server(object):
     def __init__(self, root_dir = ".", log_level = ERROR_LEVEL):
         self._root_dir = root_dir