A series of synchronization fixes:
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 31 May 2011 15:53:51 +0000 (17:53 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Tue, 31 May 2011 15:53:51 +0000 (17:53 +0200)
 - remote_kill can be told not to wait, for increase parallelism if you'll be waiting later with remote_status
 - somehow TunProto.checkpid gets called before the PID is ready (maybe before the tun_connect script is spawned).
   In those cases, old pids get fetched, instead of simply retrying later.
   To solve this, TunProto._make_home removes old pidfiles
 - tests that didn't perform shutdown on exception conditions now do

src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/rspawn.py
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/tunchannel.py
test/testbeds/planetlab/integration_ns3.py

index 4d4644d..f6693bf 100644 (file)
@@ -191,12 +191,18 @@ class TestbedController(testbed_impl.TestbedController):
         raise NotImplementedError
 
     def shutdown(self):
-        for trace in self._traces.values():
+        for trace in self._traces.itervalues():
             trace.close()
-        for element in self._elements.values():
+        for element in self._elements.itervalues():
             # invoke cleanup hooks
             if hasattr(element, 'cleanup'):
                 element.cleanup()
+        for element in self._elements.itervalues():
+            # invoke destroy hooks
+            if hasattr(element, 'destroy'):
+                element.destroy()
+        self._elements.clear()
+        self._traces.clear()
 
     def trace(self, guid, trace_id, attribute='value'):
         app = self._elements[guid]
index d266864..632e022 100644 (file)
@@ -246,6 +246,10 @@ class TunIface(object):
     def cleanup(self):
         if self.peer_proto_impl:
             self.peer_proto_impl.shutdown()
+
+    def destroy(self):
+        if self.peer_proto_impl:
+            self.peer_proto_impl.destroy()
             self.peer_proto_impl = None
 
     def async_launch_wait(self):
index 7cd8910..9bc4c42 100644 (file)
@@ -180,7 +180,8 @@ def remote_status(pid, ppid,
 
 def remote_kill(pid, ppid, sudo = False,
         host = None, port = None, user = None, agent = None, 
-        ident_key = None, server_key = None):
+        ident_key = None, server_key = None,
+        nowait = False):
     """
     Kill a process spawned with remote_spawn.
     
@@ -198,21 +199,25 @@ def remote_kill(pid, ppid, sudo = False,
         
         Nothing, should have killed the process
     """
-
-    (out,err),proc = server.popen_ssh_command(
-        """
+    
+    cmd = """
 %(sudo)s kill %(pid)d
-for x in 1 2 3 4 5 6 7 8 9 0 ; do 
-    sleep 0.1 
+for x in 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 ; do 
+    sleep 0.2 
     if [ `ps --ppid %(ppid)d -o pid | grep -c %(pid)d` == '0' ]; then
         break
     fi
-    sleep 0.9
+    sleep 1.8
 done
 if [ `ps --ppid %(ppid)d -o pid | grep -c %(pid)d` != '0' ]; then
     %(sudo)s kill -9 %(pid)d
 fi
-""" % {
+"""
+    if nowait:
+        cmd = "{ %s } >/dev/null 2>/dev/null </dev/null &" % (cmd,)
+
+    (out,err),proc = server.popen_ssh_command(
+        cmd % {
             'ppid' : ppid,
             'pid' : pid,
             'sudo' : 'sudo -S' if sudo else ''
index 7cbe43a..3b8b1ed 100644 (file)
@@ -360,14 +360,16 @@ try:
         import passfd
         
         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+        retrydelay = 1.0
         for i in xrange(30):
             try:
                 sock.connect(options.pass_fd)
                 break
             except socket.error:
                 # wait a while, retry
-                print >>sys.stderr, "Could not connect. Retrying in a sec..."
-                time.sleep(1)
+                print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+                time.sleep(min(30.0,retrydelay))
+                retrydelay *= 1.1
         else:
             sock.connect(options.pass_fd)
         passfd.sendfd(sock, tun.fileno(), '0')
@@ -389,14 +391,16 @@ try:
             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
             rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+            retrydelay = 1.0
             for i in xrange(30):
                 try:
                     rsock.bind((hostaddr,options.udp))
                     break
                 except socket.error:
                     # wait a while, retry
-                    print >>sys.stderr, "Could not bind. Retrying in a sec..."
-                    time.sleep(1)
+                    print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+                    time.sleep(min(30.0,retrydelay))
+                    retrydelay *= 1.1
             else:
                 rsock.bind((hostaddr,options.udp))
             rsock.connect((remaining_args[0],options.port))
@@ -409,27 +413,31 @@ try:
         if remaining_args and not remaining_args[0].startswith('-'):
             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
             rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+            retrydelay = 1.0
             for i in xrange(30):
                 try:
                     rsock.connect((remaining_args[0],options.port))
                     break
                 except socket.error:
                     # wait a while, retry
-                    print >>sys.stderr, "Could not connect. Retrying in a sec..."
-                    time.sleep(1)
+                    print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+                    time.sleep(min(30.0,retrydelay))
+                    retrydelay *= 1.1
             else:
                 rsock.connect((remaining_args[0],options.port))
         else:
             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
             lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+            retrydelay = 1.0
             for i in xrange(30):
                 try:
                     lsock.bind((hostaddr,options.port))
                     break
                 except socket.error:
                     # wait a while, retry
-                    print >>sys.stderr, "Could not bind. Retrying in a sec..."
-                    time.sleep(1)
+                    print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+                    time.sleep(min(30.0,retrydelay))
+                    retrydelay *= 1.1
             else:
                 lsock.bind((hostaddr,options.port))
             lsock.listen(1)
index 0ea63b9..a836d44 100644 (file)
@@ -29,6 +29,7 @@ class TunProtoBase(object):
         
         self._launcher = None
         self._started = False
+        self._starting = False
         self._pid = None
         self._ppid = None
         self._if_name = None
@@ -43,7 +44,11 @@ class TunProtoBase(object):
         
         # Make sure all the paths are created where 
         # they have to be created for deployment
-        cmd = "mkdir -p %s" % (server.shell_escape(self.home_path),)
+        # Also remove pidfile, if there is one.
+        # Old pidfiles from previous runs can be troublesome.
+        cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
+            'home' : server.shell_escape(self.home_path)
+        }
         (out,err),proc = server.popen_ssh_command(
             cmd,
             host = local.node.hostname,
@@ -119,6 +124,11 @@ class TunProtoBase(object):
             raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
         
     def launch(self, check_proto, listen, extra_args=[]):
+        if self._starting:
+            raise AssertionError, "Double start"
+        
+        self._starting = True
+        
         peer = self.peer()
         local = self.local()
         
@@ -348,8 +358,18 @@ class TunProtoBase(object):
                 agent = None,
                 ident_key = local.node.ident_path,
                 server_key = local.node.server_key,
-                sudo = True
+                sudo = True,
+                nowait = True
                 )
+    
+    def waitkill(self):
+        interval = 1.0
+        for i in xrange(30):
+            status = self.status()
+            if status != rspawn.RUNNING:
+                break
+            time.sleep(interval)
+            interval = min(30.0, interval * 1.1)
         
     def sync_trace(self, local_dir, whichtrace):
         if whichtrace != 'packets':
@@ -409,6 +429,12 @@ class TunProtoBase(object):
         Cleanup
         """
         raise NotImplementedError
+    
+    def destroy(self):
+        """
+        Second-phase cleanup
+        """
+        pass
         
 
 class TunProtoUDP(TunProtoBase):
@@ -425,6 +451,9 @@ class TunProtoUDP(TunProtoBase):
     def shutdown(self):
         self.kill()
 
+    def destroy(self):
+        self.waitkill()
+
     def launch(self, check_proto='udp', listen=False, extra_args=None):
         if extra_args is None:
             extra_args = ("-u",str(self.port))
@@ -444,6 +473,9 @@ class TunProtoFD(TunProtoBase):
     def shutdown(self):
         self.kill()
 
+    def destroy(self):
+        self.waitkill()
+
     def launch(self, check_proto='fd', listen=False, extra_args=[]):
         super(TunProtoFD, self).launch(check_proto, listen, extra_args)
 
@@ -464,16 +496,16 @@ class TunProtoTCP(TunProtoBase):
                 peer.peer_proto_impl.async_launch_wait()
             
             if not self._started:
-                self.launch('tcp', False)
-        else:
-            # make sure WE are ready
-            self.async_launch_wait()
+                self.async_launch('tcp', False)
         
         self.checkpid()
     
     def shutdown(self):
         self.kill()
 
+    def destroy(self):
+        self.waitkill()
+
     def launch(self, check_proto='tcp', listen=None, extra_args=[]):
         if listen is None:
             listen = self.listening
index c38a3c3..7967dc6 100644 (file)
@@ -4,6 +4,7 @@ import os
 import struct
 import socket
 import threading
+import errno
 
 def ipfmt(ip):
     ipbytes = map(ord,ip.decode("hex"))
@@ -187,7 +188,13 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
             wset.append(tun)
         if packetReady(fwbuf, ether_mode):
             wset.append(remote)
-        rdrdy, wrdy, errs = select.select((tun,remote),wset,(tun,remote),1)
+        
+        try:
+            rdrdy, wrdy, errs = select.select((tun,remote),wset,(tun,remote),1)
+        except select.error, e:
+            if e.args[0] == errno.EINTR:
+                # just retry
+                continue
         
         # check for errors
         if errs:
index afc4a5f..2ae7c83 100755 (executable)
@@ -154,11 +154,13 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
 
         xml = exp.to_xml()
 
-        controller = ExperimentController(xml, self.root_dir)
-        controller.start()
-        # just test that it starts...
-        controller.stop()
-        controller.shutdown()
+        try:
+            controller = ExperimentController(xml, self.root_dir)
+            controller.start()
+            # just test that it starts...
+        finally:
+            controller.stop()
+            controller.shutdown()
 
     @test_util.skipUnless(test_util.pl_auth() is not None, 
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
@@ -207,17 +209,19 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
 
         xml = exp.to_xml()
 
-        controller = ExperimentController(xml, self.root_dir)
-        controller.start()
+        try:
+            controller = ExperimentController(xml, self.root_dir)
+            controller.start()
 
-        while not controller.is_finished(ping.guid):
-            time.sleep(0.5)
-          
-        ping_result = controller.trace(ping.guid, "stdout")
-        tap_trace = controller.trace(tap1.guid, "packets")
+            while not controller.is_finished(ping.guid):
+                time.sleep(0.5)
+              
+            ping_result = controller.trace(ping.guid, "stdout")
+            tap_trace = controller.trace(tap1.guid, "packets")
 
-        controller.stop()
-        controller.shutdown()
+        finally:
+            controller.stop()
+            controller.shutdown()
 
         # asserts at the end, to make sure there's proper cleanup
         self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
@@ -284,16 +288,18 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
 
         xml = exp.to_xml()
 
-        controller = ExperimentController(xml, self.root_dir)
-        controller.start()
+        try:
+            controller = ExperimentController(xml, self.root_dir)
+            controller.start()
 
-        while not controller.is_finished(ping.guid):
-            time.sleep(0.5)
-          
-        tap_trace = controller.trace(tap1.guid, "packets")
+            while not controller.is_finished(ping.guid):
+                time.sleep(0.5)
+              
+            tap_trace = controller.trace(tap1.guid, "packets")
 
-        controller.stop()
-        controller.shutdown()
+        finally:
+            controller.stop()
+            controller.shutdown()
         
         # asserts at the end, to make sure there's proper cleanup
         sent = 0