From: Claudio-Daniel Freire Date: Tue, 31 May 2011 15:53:51 +0000 (+0200) Subject: A series of synchronization fixes: X-Git-Tag: nepi_v2_1~49 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=24fbf4289442f1add6cf74a741d2e26deb7e39c8;p=nepi.git A series of synchronization fixes: - remote_kill can be told not to wait, for increase parallelism if you'll be waiting later with remote_status - somehow TunProto.checkpid gets called before the PID is ready (maybe before the tun_connect script is spawned). In those cases, old pids get fetched, instead of simply retrying later. To solve this, TunProto._make_home removes old pidfiles - tests that didn't perform shutdown on exception conditions now do --- diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 4d4644de..f6693bf9 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -191,12 +191,18 @@ class TestbedController(testbed_impl.TestbedController): raise NotImplementedError def shutdown(self): - for trace in self._traces.values(): + for trace in self._traces.itervalues(): trace.close() - for element in self._elements.values(): + for element in self._elements.itervalues(): # invoke cleanup hooks if hasattr(element, 'cleanup'): element.cleanup() + for element in self._elements.itervalues(): + # invoke destroy hooks + if hasattr(element, 'destroy'): + element.destroy() + self._elements.clear() + self._traces.clear() def trace(self, guid, trace_id, attribute='value'): app = self._elements[guid] diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index d2668645..632e0227 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -246,6 +246,10 @@ class TunIface(object): def cleanup(self): if self.peer_proto_impl: self.peer_proto_impl.shutdown() + + def destroy(self): + if self.peer_proto_impl: + self.peer_proto_impl.destroy() self.peer_proto_impl = None def async_launch_wait(self): diff --git a/src/nepi/testbeds/planetlab/rspawn.py b/src/nepi/testbeds/planetlab/rspawn.py index 7cd89108..9bc4c42f 100644 --- a/src/nepi/testbeds/planetlab/rspawn.py +++ b/src/nepi/testbeds/planetlab/rspawn.py @@ -180,7 +180,8 @@ def remote_status(pid, ppid, def remote_kill(pid, ppid, sudo = False, host = None, port = None, user = None, agent = None, - ident_key = None, server_key = None): + ident_key = None, server_key = None, + nowait = False): """ Kill a process spawned with remote_spawn. @@ -198,21 +199,25 @@ def remote_kill(pid, ppid, sudo = False, Nothing, should have killed the process """ - - (out,err),proc = server.popen_ssh_command( - """ + + cmd = """ %(sudo)s kill %(pid)d -for x in 1 2 3 4 5 6 7 8 9 0 ; do - sleep 0.1 +for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do + sleep 0.2 if [ `ps --ppid %(ppid)d -o pid | grep -c %(pid)d` == '0' ]; then break fi - sleep 0.9 + sleep 1.8 done if [ `ps --ppid %(ppid)d -o pid | grep -c %(pid)d` != '0' ]; then %(sudo)s kill -9 %(pid)d fi -""" % { +""" + if nowait: + cmd = "{ %s } >/dev/null 2>/dev/null >sys.stderr, "Could not connect. Retrying in a sec..." - time.sleep(1) + print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 else: sock.connect(options.pass_fd) passfd.sendfd(sock, tun.fileno(), '0') @@ -389,14 +391,16 @@ try: print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp) print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port) rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + retrydelay = 1.0 for i in xrange(30): try: rsock.bind((hostaddr,options.udp)) break except socket.error: # wait a while, retry - print >>sys.stderr, "Could not bind. Retrying in a sec..." - time.sleep(1) + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 else: rsock.bind((hostaddr,options.udp)) rsock.connect((remaining_args[0],options.port)) @@ -409,27 +413,31 @@ try: if remaining_args and not remaining_args[0].startswith('-'): print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port) rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + retrydelay = 1.0 for i in xrange(30): try: rsock.connect((remaining_args[0],options.port)) break except socket.error: # wait a while, retry - print >>sys.stderr, "Could not connect. Retrying in a sec..." - time.sleep(1) + print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 else: rsock.connect((remaining_args[0],options.port)) else: print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port) lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + retrydelay = 1.0 for i in xrange(30): try: lsock.bind((hostaddr,options.port)) break except socket.error: # wait a while, retry - print >>sys.stderr, "Could not bind. Retrying in a sec..." - time.sleep(1) + print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),) + time.sleep(min(30.0,retrydelay)) + retrydelay *= 1.1 else: lsock.bind((hostaddr,options.port)) lsock.listen(1) diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 0ea63b92..a836d44d 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -29,6 +29,7 @@ class TunProtoBase(object): self._launcher = None self._started = False + self._starting = False self._pid = None self._ppid = None self._if_name = None @@ -43,7 +44,11 @@ class TunProtoBase(object): # Make sure all the paths are created where # they have to be created for deployment - cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),) + # Also remove pidfile, if there is one. + # Old pidfiles from previous runs can be troublesome. + cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % { + 'home' : server.shell_escape(self.home_path) + } (out,err),proc = server.popen_ssh_command( cmd, host = local.node.hostname, @@ -119,6 +124,11 @@ class TunProtoBase(object): raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,) def launch(self, check_proto, listen, extra_args=[]): + if self._starting: + raise AssertionError, "Double start" + + self._starting = True + peer = self.peer() local = self.local() @@ -348,8 +358,18 @@ class TunProtoBase(object): agent = None, ident_key = local.node.ident_path, server_key = local.node.server_key, - sudo = True + sudo = True, + nowait = True ) + + def waitkill(self): + interval = 1.0 + for i in xrange(30): + status = self.status() + if status != rspawn.RUNNING: + break + time.sleep(interval) + interval = min(30.0, interval * 1.1) def sync_trace(self, local_dir, whichtrace): if whichtrace != 'packets': @@ -409,6 +429,12 @@ class TunProtoBase(object): Cleanup """ raise NotImplementedError + + def destroy(self): + """ + Second-phase cleanup + """ + pass class TunProtoUDP(TunProtoBase): @@ -425,6 +451,9 @@ class TunProtoUDP(TunProtoBase): def shutdown(self): self.kill() + def destroy(self): + self.waitkill() + def launch(self, check_proto='udp', listen=False, extra_args=None): if extra_args is None: extra_args = ("-u",str(self.port)) @@ -444,6 +473,9 @@ class TunProtoFD(TunProtoBase): def shutdown(self): self.kill() + def destroy(self): + self.waitkill() + def launch(self, check_proto='fd', listen=False, extra_args=[]): super(TunProtoFD, self).launch(check_proto, listen, extra_args) @@ -464,16 +496,16 @@ class TunProtoTCP(TunProtoBase): peer.peer_proto_impl.async_launch_wait() if not self._started: - self.launch('tcp', False) - else: - # make sure WE are ready - self.async_launch_wait() + self.async_launch('tcp', False) self.checkpid() def shutdown(self): self.kill() + def destroy(self): + self.waitkill() + def launch(self, check_proto='tcp', listen=None, extra_args=[]): if listen is None: listen = self.listening diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index c38a3c39..7967dc60 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -4,6 +4,7 @@ import os import struct import socket import threading +import errno def ipfmt(ip): ipbytes = map(ord,ip.decode("hex")) @@ -187,7 +188,13 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr wset.append(tun) if packetReady(fwbuf, ether_mode): wset.append(remote) - rdrdy, wrdy, errs = select.select((tun,remote),wset,(tun,remote),1) + + try: + rdrdy, wrdy, errs = select.select((tun,remote),wset,(tun,remote),1) + except select.error, e: + if e.args[0] == errno.EINTR: + # just retry + continue # check for errors if errs: diff --git a/test/testbeds/planetlab/integration_ns3.py b/test/testbeds/planetlab/integration_ns3.py index afc4a5f5..2ae7c832 100755 --- a/test/testbeds/planetlab/integration_ns3.py +++ b/test/testbeds/planetlab/integration_ns3.py @@ -154,11 +154,13 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): xml = exp.to_xml() - controller = ExperimentController(xml, self.root_dir) - controller.start() - # just test that it starts... - controller.stop() - controller.shutdown() + try: + controller = ExperimentController(xml, self.root_dir) + controller.start() + # just test that it starts... + finally: + controller.stop() + controller.shutdown() @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") @@ -207,17 +209,19 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): xml = exp.to_xml() - controller = ExperimentController(xml, self.root_dir) - controller.start() + try: + controller = ExperimentController(xml, self.root_dir) + controller.start() - while not controller.is_finished(ping.guid): - time.sleep(0.5) - - ping_result = controller.trace(ping.guid, "stdout") - tap_trace = controller.trace(tap1.guid, "packets") + while not controller.is_finished(ping.guid): + time.sleep(0.5) + + ping_result = controller.trace(ping.guid, "stdout") + tap_trace = controller.trace(tap1.guid, "packets") - controller.stop() - controller.shutdown() + finally: + controller.stop() + controller.shutdown() # asserts at the end, to make sure there's proper cleanup self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE), @@ -284,16 +288,18 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): xml = exp.to_xml() - controller = ExperimentController(xml, self.root_dir) - controller.start() + try: + controller = ExperimentController(xml, self.root_dir) + controller.start() - while not controller.is_finished(ping.guid): - time.sleep(0.5) - - tap_trace = controller.trace(tap1.guid, "packets") + while not controller.is_finished(ping.guid): + time.sleep(0.5) + + tap_trace = controller.trace(tap1.guid, "packets") - controller.stop() - controller.shutdown() + finally: + controller.stop() + controller.shutdown() # asserts at the end, to make sure there's proper cleanup sent = 0