From: Claudio-Daniel Freire Date: Thu, 23 Jun 2011 15:09:34 +0000 (+0200) Subject: Retry retriable operations when we get an EINTR X-Git-Tag: nepi_v2_1~26 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=4f2254e89af188cc0064d03831e701495aceb97b;p=nepi.git Retry retriable operations when we get an EINTR --- diff --git a/src/nepi/testbeds/planetlab/application.py b/src/nepi/testbeds/planetlab/application.py index 50167aca..227cadb6 100644 --- a/src/nepi/testbeds/planetlab/application.py +++ b/src/nepi/testbeds/planetlab/application.py @@ -125,21 +125,17 @@ class Dependency(object): raise RuntimeError, "Failed to synchronize trace" # sync files - (out,err),proc = server.popen_scp( - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - tracefile), - local_path, - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to synchronize trace: %s %s" % (out,err,) + try: + self._popen_scp( + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + tracefile), + local_path + ) + except RuntimeError, e: + raise RuntimeError, "Failed to synchronize trace: %s %s" \ + % (e.args[0], e.args[1],) return local_path - def setup(self): self._make_home() @@ -175,33 +171,26 @@ class Dependency(object): def _make_home(self): # Make sure all the paths are created where # they have to be created for deployment - (out,err),proc = server.popen_ssh_command( - "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" % { 'home' : server.shell_escape(self.home_path) }, - host = self.node.hostname, - port = None, - user = self.node.slicename, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,) + # sync files + try: + self._popen_ssh_command( + "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \ + % { 'home' : server.shell_escape(self.home_path) } + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],) if self.stdin: # Write program input - (out,err),proc = server.popen_scp( - cStringIO.StringIO(self.stdin), - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path, 'stdin') ), - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,) + try: + self._popen_scp( + cStringIO.StringIO(self.stdin), + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, 'stdin') ), + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application %s: %s %s" \ + % (self.home_path, e.args[0], e.args[1],) def _replace_paths(self, command): """ @@ -222,18 +211,15 @@ class Dependency(object): if buildscript is not None: # upload build script - (out,err),proc = server.popen_scp( - buildscript, - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path, 'nepi-build.sh') ), - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, out,err,) + try: + self._popen_scp( + buildscript, + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, 'nepi-build.sh') ) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application %s: %s %s" \ + % (self.home_path, e.args[0], e.args[1],) # launch build self._do_launch_build() @@ -314,7 +300,6 @@ class Dependency(object): ) (out,err),proc = rspawn.remote_spawn( script, - pidfile = 'build-pid', home = self.home_path, stdin = '/dev/null', @@ -381,19 +366,10 @@ class Dependency(object): delay = min(30,delay*1.2) # check build token - - (out,err),proc = server.popen_ssh_command( + (out, err), proc = self._popen_ssh_command( "cat %(token_path)s" % { 'token_path' : os.path.join(self.home_path, 'build.token'), - }, - host = self.node.hostname, - port = None, - user = self.node.slicename, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - + }) slave_token = "" if not proc.wait() and out: slave_token = out.strip() @@ -401,18 +377,11 @@ class Dependency(object): if slave_token != self._master_token: # Get buildlog for the error message - (buildlog,err),proc = server.popen_ssh_command( + (buildlog, err), proc = self._popen_ssh_command( "cat %(buildlog)s" % { 'buildlog' : os.path.join(self.home_path, 'buildlog'), 'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'), - }, - host = self.node.hostname, - port = None, - user = self.node.slicename, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) + }) proc.wait() @@ -444,16 +413,15 @@ class Dependency(object): sources = self.sources.split(' ') # Copy all sources - (out,err),proc = server.popen_scp( - sources, - "%s@%s:%s" % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path,'.'),), - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed upload source file %r: %s %s" % (source, out,err,) + try: + self._popen_scp( + sources, + "%s@%s:%s" % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path,'.'),) + ) + except RuntimeError, e: + raise RuntimeError, "Failed upload source file %r: %s %s" \ + % (sources, e.args[0], e.args[1],) buildscript = cStringIO.StringIO() @@ -485,26 +453,20 @@ class Dependency(object): buildscript.seek(0) return buildscript - def _do_install(self): if self.install: # Install application - (out,err),proc = server.popen_ssh_command( - "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/installlog >&2 && false )" % { - 'command' : self._replace_paths(self.install), - 'home' : server.shell_escape(self.home_path), - }, - host = self.node.hostname, - port = None, - user = self.node.slicename, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed instal build sources: %s %s" % (out,err,) + try: + self._popen_ssh_command( + "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/installlog >&2 && false )" % \ + { + 'command' : self._replace_paths(self.install), + 'home' : server.shell_escape(self.home_path), + } + ) + except RuntimeError, e: + raise RuntimeError, "Failed instal build sources: %s %s" % (e.args[0], e.args[1],) def set_master(self, master): self._master = master @@ -520,42 +482,67 @@ class Dependency(object): def _do_install_keys(self): prk = self._master_prk puk = self._master_puk + + try: + self._popen_scp( + [ prk.name, puk.name ], + '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path ) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application deployment keys: %s %s" \ + % (e.args[0], e.args[1],) + + try: + self._popen_scp( + cStringIO.StringIO('%s,%s %s\n' % ( + self._master.node.hostname, socket.gethostbyname(self._master.node.hostname), + self._master.node.server_key)), + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path,"master_known_hosts") ) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application deployment keys: %s %s" \ + % (e.args[0], e.args[1],) + # No longer need'em + self._master_prk = None + self._master_puk = None + + def cleanup(self): + # make sure there's no leftover build processes + self._do_kill_build() + + @server.eintr_retry + def _popen_scp(self, src, dst, retry = True): (out,err),proc = server.popen_scp( - [ prk.name, puk.name ], - '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path ), + src, + dst, port = None, agent = None, ident_key = self.node.ident_path, server_key = self.node.server_key ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,) - (out,err),proc = server.popen_scp( - cStringIO.StringIO('%s,%s %s\n' % ( - self._master.node.hostname, socket.gethostbyname(self._master.node.hostname), - self._master.node.server_key)), - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path,"master_known_hosts") ), + if server.eintr_retry(proc.wait)(): + raise RuntimeError, (out, err) + return (out, err), proc + + + @server.eintr_retry + def _popen_ssh_command(self, command, retry = True): + (out,err),proc = server.popen_ssh_command( + command, + host = self.node.hostname, port = None, + user = self.node.slicename, agent = None, ident_key = self.node.ident_path, server_key = self.node.server_key ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application deployment keys: %s %s" % (out,err,) - - # No longer need'em - self._master_prk = None - self._master_puk = None - - def cleanup(self): - # make sure there's no leftover build processes - self._do_kill_build() + if server.eintr_retry(proc.wait)(): + raise RuntimeError, (out, err) + return (out, err), proc class Application(Dependency): """ @@ -611,19 +598,16 @@ class Application(Dependency): command.write('export %s=%s\n' % (envkey, envval)) command.write(self.command) command.seek(0) - - (out,err),proc = server.popen_scp( - command, - '%s@%s:%s' % (self.node.slicename, self.node.hostname, - os.path.join(self.home_path, "app.sh")), - port = None, - agent = None, - ident_key = self.node.ident_path, - server_key = self.node.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + + try: + self._popen_scp( + command, + '%s@%s:%s' % (self.node.slicename, self.node.hostname, + os.path.join(self.home_path, "app.sh")) + ) + except RuntimeError, e: + raise RuntimeError, "Failed to set up application: %s %s" \ + % (e.args[0], e.args[1],) # Start process in a "daemonized" way, using nohup and heavy # stdin/out redirection to avoid connection issues @@ -707,7 +691,8 @@ class Application(Dependency): ident_key = self.node.ident_path, server_key = self.node.server_key ) - + + class NepiDependency(Dependency): """ This dependency adds nepi itself to the python path, diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index e534257f..544c19d7 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -410,7 +410,7 @@ class Node(object): def is_alive(self): # Make sure all the paths are created where # they have to be created for deployment - (out,err),proc = server.popen_ssh_command( + (out,err),proc = server.eintr_retry(server.popen_ssh_command)( "echo 'ALIVE'", host = self.hostname, port = None, diff --git a/src/nepi/testbeds/planetlab/rspawn.py b/src/nepi/testbeds/planetlab/rspawn.py index 618235b9..d28ca2da 100644 --- a/src/nepi/testbeds/planetlab/rspawn.py +++ b/src/nepi/testbeds/planetlab/rspawn.py @@ -96,6 +96,7 @@ def remote_spawn(command, pidfile, stdout='/dev/null', stderr=STDOUT, stdin='/de return (out,err),proc +@server.eintr_retry def remote_check_pid(pidfile, host = None, port = None, user = None, agent = None, ident_key = None, server_key = None): @@ -136,6 +137,7 @@ def remote_check_pid(pidfile, return None +@server.eintr_retry def remote_status(pid, ppid, host = None, port = None, user = None, agent = None, ident_key = None, server_key = None): @@ -178,6 +180,7 @@ def remote_status(pid, ppid, return RUNNING if status else FINISHED +@server.eintr_retry def remote_kill(pid, ppid, sudo = False, host = None, port = None, user = None, agent = None, ident_key = None, server_key = None, diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 6e518f4a..243b7c24 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -49,7 +49,7 @@ class TunProtoBase(object): cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % { 'home' : server.shell_escape(self.home_path) } - (out,err),proc = server.popen_ssh_command( + (out,err),proc = server.eintr_retry(server.popen_ssh_command)( cmd, host = local.node.hostname, port = None, @@ -81,7 +81,7 @@ class TunProtoBase(object): dest = "%s@%s:%s" % ( local.node.slicename, local.node.hostname, os.path.join(self.home_path,'.'),) - (out,err),proc = server.popen_scp( + (out,err),proc = server.eintr_retry(server.popen_scp)( sources, dest, ident_key = local.node.ident_path, @@ -245,7 +245,7 @@ class TunProtoBase(object): if self.status() != rspawn.RUNNING: break - (out,err),proc = server.popen_ssh_command( + (out,err),proc = server.eintr_retry(server.popen_ssh_command)( "cd %(home)s ; grep -c Connected capture" % dict( home = server.shell_escape(self.home_path)), host = local.node.hostname, @@ -271,7 +271,7 @@ class TunProtoBase(object): local = self.local() if local: for spin in xrange(30): - (out,err),proc = server.popen_ssh_command( + (out,err),proc = server.eintr_retry(server.popen_ssh_command)( "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict( home = server.shell_escape(self.home_path)), host = local.node.hostname, diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index 0491b550..d6079af5 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -49,6 +49,23 @@ def shell_escape(s): s = ''.join(map(escp,s)) return "'%s'" % (s,) +def eintr_retry(func): + import functools + @functools.wraps(func) + def rv(*p, **kw): + retry = kw.pop("_retry", False) + for i in xrange(0 if retry else 4): + try: + return func(*p, **kw) + except select.error, args: + if args[0] == errno.EINTR: + continue + else: + raise + else: + return func(*p, **kw) + return rv + class Server(object): def __init__(self, root_dir = ".", log_level = ERROR_LEVEL): self._root_dir = root_dir