Synchronization fixes:
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 20 May 2011 14:35:31 +0000 (16:35 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 20 May 2011 14:35:31 +0000 (16:35 +0200)
 - tun_connect retries on binding error (happens regularly during tests and may happen in Real Life (tm) too)
 - re-enabled wrongfully disabled ns3-in-pl tests
 - added prestart global synchronization step, which solves many cross-testbed synchronization issues

13 files changed:
src/nepi/core/execute.py
src/nepi/core/metadata.py
src/nepi/core/testbed_impl.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/proxy.py
src/nepi/util/tunchannel_impl.py
test/testbeds/netns/execute.py
test/testbeds/ns3/execute.py
test/testbeds/ns3/execute2.py
test/testbeds/planetlab/execute.py
test/testbeds/planetlab/integration_ns3.py

index f38993e..1b52335 100644 (file)
@@ -79,6 +79,7 @@ class Factory(AttributesMap):
     def __init__(self, factory_id, create_function, start_function, 
             stop_function, status_function, 
             configure_function, preconfigure_function,
+            prestart_function,
             allow_addresses = False, has_addresses = False,
             allow_routes = False, has_routes = False):
         super(Factory, self).__init__()
@@ -93,6 +94,7 @@ class Factory(AttributesMap):
         self._status_function = status_function
         self._configure_function = configure_function
         self._preconfigure_function = preconfigure_function
+        self._prestart_function = prestart_function
         self._connector_types = dict()
         self._traces = list()
         self._box_attributes = AttributesMap()
@@ -125,6 +127,10 @@ class Factory(AttributesMap):
     def create_function(self):
         return self._create_function
 
+    @property
+    def prestart_function(self):
+        return self._prestart_function
+
     @property
     def start_function(self):
         return self._start_function
@@ -261,6 +267,10 @@ class TestbedController(object):
         """After do_configure elements are configured"""
         raise NotImplementedError
 
+    def do_prestart(self):
+        """Before do_start elements are prestart-configured"""
+        raise NotImplementedError
+
     def do_cross_connect_init(self, cross_data):
         """
         After do_cross_connect_init initiation of all external connections 
@@ -445,6 +455,10 @@ class ExperimentController(object):
             cross_data = self._get_cross_data(guid)
             testbed.do_cross_connect_compl(cross_data)
        
+        # Last chance to configure (parallel on all testbeds)
+        self._parallel([testbed.do_prestart
+                        for testbed in self._testbeds.itervalues()])
+                        
         # start experiment (parallel start on all testbeds)
         self._parallel([testbed.start
                         for testbed in self._testbeds.itervalues()])
index 784158c..46ee1a2 100644 (file)
@@ -84,6 +84,15 @@ class VersionedMetadataInfo(object):
         """
         return self.configure_order
 
+    @property
+    def prestart_order(self):
+        """ list of factory ids that indicates the order in which the elements
+        should be prestart-configured.
+        
+        Default: same as configure_order
+        """
+        return self.configure_order
+
     @property
     def start_order(self):
         """ list of factory ids that indicates the order in which the elements
@@ -111,6 +120,13 @@ class VersionedMetadataInfo(object):
                     (just after connections are made, 
                     just before netrefs are resolved)
                 "configure_function": function for element configuration,
+                "prestart_function": function for pre-start
+                    element configuration (just before starting applications),
+                    useful for synchronization of background setup tasks or
+                    lazy instantiation or configuration of attributes
+                    that require connection/cross-connection state before
+                    being created.
+                    After this point, all applications should be able to run.
                 "factory_attributes": list of references to attribute_ids,
                 "box_attributes": list of regerences to attribute_ids,
                 "traces": list of references to trace_id
@@ -318,6 +334,10 @@ class Metadata(object):
     def preconfigure_order(self):
         return self._metadata.preconfigure_order
 
+    @property
+    def prestart_order(self):
+        return self._metadata.prestart_order
+
     @property
     def start_order(self):
         return self._metadata.start_order
@@ -383,6 +403,7 @@ class Metadata(object):
             status_function = info.get("status_function")
             configure_function = info.get("configure_function")
             preconfigure_function = info.get("preconfigure_function")
+            prestart_function = info.get("prestart_function")
             allow_addresses = info.get("allow_addresses", False)
             allow_routes = info.get("allow_routes", False)
             has_addresses = info.get("has_addresses", False)
@@ -390,6 +411,7 @@ class Metadata(object):
             factory = Factory(factory_id, create_function, start_function,
                     stop_function, status_function, 
                     configure_function, preconfigure_function,
+                    prestart_function,
                     allow_addresses, has_addresses,
                     allow_routes, has_routes)
                     
index 320f7b0..288b234 100644 (file)
@@ -192,23 +192,15 @@ class TestbedController(execute.TestbedController):
         self._status = TESTBED_STATUS_SETUP
 
     def do_create(self):
-        guids = dict()
-        # order guids (elements) according to factory_id
-        for guid, factory_id in self._create.iteritems():
-            if not factory_id in guids:
-               guids[factory_id] = list()
-            guids[factory_id].append(guid)
-        # create elements following the factory_id order
-        for factory_id in self._metadata.create_order:
-            # omit the factories that have no element to create
-            if factory_id not in guids:
-                continue
-            factory = self._factories[factory_id]
-            for guid in guids[factory_id]:
-                factory.create_function(self, guid)
-                parameters = self._get_parameters(guid)
-                for name, value in parameters.iteritems():
-                    self.set(guid, name, value)
+        def set_params(self, guid):
+            parameters = self._get_parameters(guid)
+            for name, value in parameters.iteritems():
+                self.set(guid, name, value)
+            
+        self._do_in_factory_order(
+            'create_function',
+            self._metadata.create_order,
+            postaction = set_params )
         self._status = TESTBED_STATUS_CREATED
 
     def _do_connect(self, init = True):
@@ -241,43 +233,40 @@ class TestbedController(execute.TestbedController):
         self._do_connect(init = False)
         self._status = TESTBED_STATUS_CONNECTED
 
-    def do_preconfigure(self):
-        guids = dict()
+    def _do_in_factory_order(self, action, order, postaction = None):
+        guids = collections.defaultdict(list)
         # order guids (elements) according to factory_id
         for guid, factory_id in self._create.iteritems():
-            if not factory_id in guids:
-               guids[factory_id] = list()
             guids[factory_id].append(guid)
         # configure elements following the factory_id order
-        for factory_id in self._metadata.preconfigure_order:
+        for factory_id in order:
             # omit the factories that have no element to create
             if factory_id not in guids:
                 continue
             factory = self._factories[factory_id]
-            if not factory.preconfigure_function:
+            if not getattr(factory, action):
                 continue
             for guid in guids[factory_id]:
-                factory.preconfigure_function(self, guid)
+                getattr(factory, action)(self, guid)
+                if postaction:
+                    postaction(self, guid)
+
+    def do_preconfigure(self):
+        self._do_in_factory_order(
+            'preconfigure_function',
+            self._metadata.preconfigure_order )
 
     def do_configure(self):
-        guids = dict()
-        # order guids (elements) according to factory_id
-        for guid, factory_id in self._create.iteritems():
-            if not factory_id in guids:
-               guids[factory_id] = list()
-            guids[factory_id].append(guid)
-        # configure elements following the factory_id order
-        for factory_id in self._metadata.configure_order:
-            # omit the factories that have no element to create
-            if factory_id not in guids:
-                continue
-            factory = self._factories[factory_id]
-            if not factory.configure_function:
-                continue
-            for guid in guids[factory_id]:
-                factory.configure_function(self, guid)
+        self._do_in_factory_order(
+            'configure_function',
+            self._metadata.configure_order )
         self._status = TESTBED_STATUS_CONFIGURED
 
+    def do_prestart(self):
+        self._do_in_factory_order(
+            'prestart_function',
+            self._metadata.prestart_order )
+
     def _do_cross_connect(self, cross_data, init = True):
         for guid, cross_connections in self._cross_connect.iteritems():
             factory = self._get_factory(guid)
@@ -409,33 +398,17 @@ class TestbedController(execute.TestbedController):
         return factory.box_attributes.attributes_list
 
     def start(self, time = TIME_NOW):
-        # Plan everything
-        #  - group by factory_id
-        #  - enqueue task callables
-        plan = collections.defaultdict(list)
-        
-        for guid, factory_id in self._create.iteritems():
-            factory = self._factories[factory_id]
-            start_function = factory.start_function
-            if start_function:
-                plan[factory_id].append((start_function, guid))
-
-        # Execute plan, following the factory_id order
-        for factory_id in self._metadata.start_order:
-            if factory_id in plan:
-                for start_function, guid in plan[factory_id]:
-                    start_function(self, guid)
-        
+        self._do_in_factory_order(
+            'start_function',
+            self._metadata.start_order )
         self._status = TESTBED_STATUS_STARTED
 
     #action: NotImplementedError
 
     def stop(self, time = TIME_NOW):
-        for guid, factory_id in self._create.iteritems():
-            factory = self._factories[factory_id]
-            stop_function = factory.stop_function
-            if stop_function:
-                stop_function(self, guid)
+        self._do_in_factory_order(
+            'stop_function',
+            reversed(self._metadata.start_order) )
         self._status = TESTBED_STATUS_STOPPED
 
     def status(self, guid = None):
index c2f7e41..d5c0ab9 100644 (file)
@@ -929,7 +929,7 @@ create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEP
 
 configure_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 
-# Start node after ifaces, because the node needs the ifaces in order to set up routes
+# Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
 start_order = [ INTERNET, NODEIFACE, TAPIFACE, TUNIFACE, NODE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 
 factories_info = dict({
@@ -939,7 +939,7 @@ factories_info = dict({
             "category": "topology",
             "create_function": create_node,
             "preconfigure_function": configure_node,
-            "start_function": configure_node_routes,
+            "prestart_function": configure_node_routes,
             "box_attributes": [
                 "forward_X11",
                 "hostname",
@@ -973,7 +973,7 @@ factories_info = dict({
             "create_function": create_tuniface,
             "preconfigure_function": preconfigure_tuniface,
             "configure_function": postconfigure_tuniface,
-            "start_function": wait_tuniface,
+            "prestart_function": wait_tuniface,
             "box_attributes": [
                 "up", "device_name", "mtu", "snat", "pointopoint",
                 "txqueuelen",
@@ -989,7 +989,7 @@ factories_info = dict({
             "create_function": create_tapiface,
             "preconfigure_function": preconfigure_tuniface,
             "configure_function": postconfigure_tuniface,
-            "start_function": wait_tuniface,
+            "prestart_function": wait_tuniface,
             "box_attributes": [
                 "up", "device_name", "mtu", "snat", "pointopoint",
                 "txqueuelen",
@@ -1136,6 +1136,10 @@ class VersionedMetadataInfo(metadata.VersionedMetadataInfo):
     def configure_order(self):
         return configure_order
 
+    @property
+    def prestart_order(self):
+        return start_order
+
     @property
     def start_order(self):
         return start_order
index 4950dec..7cbe43a 100644 (file)
@@ -96,6 +96,12 @@ parser.add_option(
     help = 
         "Specify a symmetric encryption key with which to protect packets across "
         "the tunnel. python-crypto must be installed on the system." )
+parser.add_option(
+    "-N", "--no-capture", dest="no_capture", 
+    action = "store_true",
+    default = False,
+    help = "If specified, packets won't be logged to standard error "
+           "(default is to log them to standard error). " )
 
 (options, remaining_args) = parser.parse_args(sys.argv[1:])
 
@@ -277,7 +283,10 @@ def tun_fwd(tun, remote):
         ether_mode = tun_name.startswith('tap'),
         cipher_key = options.cipher_key,
         udp = options.udp,
-        TERMINATE = TERMINATE)
+        TERMINATE = TERMINATE,
+        stderr = open("/dev/null","w") if options.no_capture 
+                 else sys.stderr 
+    )
 
 
 
@@ -380,7 +389,16 @@ try:
             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
             rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
-            rsock.bind((hostaddr,options.udp))
+            for i in xrange(30):
+                try:
+                    rsock.bind((hostaddr,options.udp))
+                    break
+                except socket.error:
+                    # wait a while, retry
+                    print >>sys.stderr, "Could not bind. Retrying in a sec..."
+                    time.sleep(1)
+            else:
+                rsock.bind((hostaddr,options.udp))
             rsock.connect((remaining_args[0],options.port))
         else:
             print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
@@ -404,7 +422,16 @@ try:
         else:
             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
             lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-            lsock.bind((hostaddr,options.port))
+            for i in xrange(30):
+                try:
+                    lsock.bind((hostaddr,options.port))
+                    break
+                except socket.error:
+                    # wait a while, retry
+                    print >>sys.stderr, "Could not bind. Retrying in a sec..."
+                    time.sleep(1)
+            else:
+                lsock.bind((hostaddr,options.port))
             lsock.listen(1)
             rsock,raddr = lsock.accept()
         remote = os.fdopen(rsock.fileno(), 'r+b', 0)
index c4dcdba..df23a7b 100644 (file)
@@ -176,6 +176,8 @@ class TunProtoBase(object):
             args.extend(("-P",str(local_p2p)))
         if local_txq:
             args.extend(("-Q",str(local_txq)))
+        if not local_cap:
+            args.append("-N")
         if extra_args:
             args.extend(map(str,extra_args))
         if not listen and check_proto != 'fd':
@@ -192,7 +194,7 @@ class TunProtoBase(object):
             pidfile = './pid',
             home = self.home_path,
             stdin = '/dev/null',
-            stdout = 'capture' if local_cap else '/dev/null',
+            stdout = 'capture',
             stderr = rspawn.STDOUT,
             sudo = True,
             
@@ -219,36 +221,35 @@ class TunProtoBase(object):
             time.sleep(1.0)
         
         # Wait for the connection to be established
-        if local.capture:
-            for spin in xrange(30):
-                if self.status() != rspawn.RUNNING:
-                    break
-                
-                (out,err),proc = server.popen_ssh_command(
-                    "cd %(home)s ; grep -c Connected capture" % dict(
-                        home = server.shell_escape(self.home_path)),
-                    host = local.node.hostname,
-                    port = None,
-                    user = local.node.slicename,
-                    agent = None,
-                    ident_key = local.node.ident_path,
-                    server_key = local.node.server_key
-                    )
-                
-                if proc.wait():
-                    break
-                
-                if out.strip() != '0':
-                    break
-                
-                time.sleep(1.0)
+        for spin in xrange(30):
+            if self.status() != rspawn.RUNNING:
+                break
+            
+            (out,err),proc = server.popen_ssh_command(
+                "cd %(home)s ; grep -c Connected capture" % dict(
+                    home = server.shell_escape(self.home_path)),
+                host = local.node.hostname,
+                port = None,
+                user = local.node.slicename,
+                agent = None,
+                ident_key = local.node.ident_path,
+                server_key = local.node.server_key
+                )
+            
+            if proc.wait():
+                break
+            
+            if out.strip() != '0':
+                break
+            
+            time.sleep(1.0)
     
     @property
     def if_name(self):
         if not self._if_name:
             # Inspect the trace to check the assigned iface
             local = self.local()
-            if local and local.capture:
+            if local:
                 for spin in xrange(30):
                     (out,err),proc = server.popen_ssh_command(
                         "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
index 90aa485..059ccff 100644 (file)
@@ -55,6 +55,7 @@ TESTBED_ID  = 35
 TESTBED_VERSION  = 36
 EXPERIMENT_SET = 37
 EXPERIMENT_GET = 38
+DO_PRESTART = 39
 
 instruction_text = dict({
     OK:     "OK",
@@ -582,6 +583,12 @@ class TestbedControllerServer(BaseServer):
     def do_preconfigure(self):
         self._testbed.do_preconfigure()
 
+    @Marshalling.handles(DO_PRESTART)
+    @Marshalling.args()
+    @Marshalling.retvoid
+    def do_prestart(self):
+        self._testbed.do_prestart()
+
     @Marshalling.handles(DO_CROSS_CONNECT_INIT)
     @Marshalling.args( Marshalling.Decoders.pickled_data )
     @Marshalling.retvoid
index 0baabff..14a34f4 100644 (file)
@@ -5,6 +5,7 @@ import threading
 import socket
 import select
 import weakref
+import time
 
 from tunchannel import tun_fwd
 
@@ -196,13 +197,29 @@ class TunChannel(object):
         if udp:
             # listen on udp port
             rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
-            rsock.bind((local_addr,local_port))
+            for i in xrange(30):
+                try:
+                    rsock.bind((local_addr,local_port))
+                    break
+                except socket.error:
+                    # wait a while, retry
+                    time.sleep(1)
+            else:
+                rsock.bind((local_addr,local_port))
             rsock.connect((peer_addr,peer_port))
             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
         elif listen:
             # accept tcp connections
             lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-            lsock.bind((local_addr,local_port))
+            for i in xrange(30):
+                try:
+                    lsock.bind((local_addr,local_port))
+                    break
+                except socket.error:
+                    # wait a while, retry
+                    time.sleep(1)
+            else:
+                lsock.bind((local_addr,local_port))
             lsock.listen(1)
             rsock,raddr = lsock.accept()
             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
index d37d84f..39fd54d 100755 (executable)
@@ -47,6 +47,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.do_connect_compl()
         instance.do_preconfigure()
         instance.do_configure()
+        instance.do_prestart()
         instance.start()
         while instance.status(7) != STATUS_FINISHED:
             time.sleep(0.5)
@@ -89,6 +90,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.do_connect_compl()
         instance.do_preconfigure()
         instance.do_configure()
+        instance.do_prestart()
         instance.start()
         while instance.status(6) != STATUS_FINISHED:
             time.sleep(0.5)
@@ -150,6 +152,7 @@ class NetnsExecuteTestCase(unittest.TestCase):
         instance.do_connect_compl()
         instance.do_preconfigure()
         instance.do_configure()
+        instance.do_prestart()
         instance.start()
         while instance.status(11) != STATUS_FINISHED:
             time.sleep(0.5)
index be7a432..f82ebf1 100755 (executable)
@@ -68,6 +68,7 @@ class Ns3ExecuteTestCase(unittest.TestCase):
         instance.do_connect_compl()
         instance.do_preconfigure()
         instance.do_configure()
+        instance.do_prestart()
         instance.start()
         while instance.status(17) != STATUS_FINISHED:
             time.sleep(0.1)
index 52ac526..e04a487 100755 (executable)
@@ -104,6 +104,7 @@ class Ns3ExecuteTestCase(unittest.TestCase):
         instance.do_connect_compl()
         instance.do_preconfigure()
         instance.do_configure()
+        instance.do_prestart()
         instance.start()
         while instance.status(27) != STATUS_FINISHED:
             time.sleep(0.1)
index d93068e..920ef03 100755 (executable)
@@ -97,6 +97,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
 
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(7) != STATUS_FINISHED:
                 time.sleep(0.5)
@@ -134,6 +135,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
             instance.do_preconfigure()
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(5) != STATUS_FINISHED:
                 time.sleep(0.5)
@@ -191,6 +193,7 @@ FIONREAD = 0x[0-9a-fA-F]{8}.*
             instance.do_preconfigure()
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(10) != STATUS_FINISHED:
                 time.sleep(0.5)
@@ -239,6 +242,7 @@ echo 'OKIDOKI'
             instance.do_preconfigure()
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(6) != STATUS_FINISHED:
                 time.sleep(0.5)
@@ -287,6 +291,7 @@ echo 'OKIDOKI'
             instance.do_preconfigure()
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(8) != STATUS_FINISHED:
                 time.sleep(0.5)
@@ -392,6 +397,7 @@ echo 'OKIDOKI'
             
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(9) != STATUS_FINISHED:
                 time.sleep(0.5)
@@ -450,6 +456,7 @@ echo 'OKIDOKI'
             instance.do_preconfigure()
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(12) != STATUS_FINISHED:
                 time.sleep(0.5)
@@ -491,6 +498,7 @@ echo 'OKIDOKI'
             instance.do_preconfigure()
             instance.do_configure()
             
+            instance.do_prestart()
             instance.start()
             while instance.status(12) != STATUS_FINISHED:
                 time.sleep(0.5)
index 3e8a4c2..b2412e2 100755 (executable)
@@ -134,7 +134,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
         "Test is expensive, requires NEPI_FULL_TESTS=yes")
-    def _test_ns3_in_pl(self):
+    def test_ns3_in_pl(self):
         ns3_testbed_id = "ns3"
         ns3_testbed_version = "3_9_RC3"
         
@@ -164,7 +164,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
         "Test is expensive, requires NEPI_FULL_TESTS=yes")
-    def _test_ns3_in_pl_crossconnect(self):
+    def test_ns3_in_pl_crossconnect(self):
         pl, exp = self.make_experiment_desc()
         
         # Create PL node, ifaces, assign addresses
@@ -229,7 +229,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
         "Test is expensive, requires NEPI_FULL_TESTS=yes")
-    def _test_ns3_in_pl_snat(self):
+    def test_ns3_in_pl_snat(self):
         pl, exp = self.make_experiment_desc()
         
         # Create PL node, ifaces, assign addresses