From: Claudio-Daniel Freire Date: Fri, 20 May 2011 14:35:31 +0000 (+0200) Subject: Synchronization fixes: X-Git-Tag: nepi_v2~14 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=1d82b1edea189665d7375ed8e4183600de581c16;p=nepi.git Synchronization fixes: - tun_connect retries on binding error (happens regularly during tests and may happen in Real Life (tm) too) - re-enabled wrongfully disabled ns3-in-pl tests - added prestart global synchronization step, which solves many cross-testbed synchronization issues --- diff --git a/src/nepi/core/execute.py b/src/nepi/core/execute.py index f38993e2..1b52335f 100644 --- a/src/nepi/core/execute.py +++ b/src/nepi/core/execute.py @@ -79,6 +79,7 @@ class Factory(AttributesMap): def __init__(self, factory_id, create_function, start_function, stop_function, status_function, configure_function, preconfigure_function, + prestart_function, allow_addresses = False, has_addresses = False, allow_routes = False, has_routes = False): super(Factory, self).__init__() @@ -93,6 +94,7 @@ class Factory(AttributesMap): self._status_function = status_function self._configure_function = configure_function self._preconfigure_function = preconfigure_function + self._prestart_function = prestart_function self._connector_types = dict() self._traces = list() self._box_attributes = AttributesMap() @@ -125,6 +127,10 @@ class Factory(AttributesMap): def create_function(self): return self._create_function + @property + def prestart_function(self): + return self._prestart_function + @property def start_function(self): return self._start_function @@ -261,6 +267,10 @@ class TestbedController(object): """After do_configure elements are configured""" raise NotImplementedError + def do_prestart(self): + """Before do_start elements are prestart-configured""" + raise NotImplementedError + def do_cross_connect_init(self, cross_data): """ After do_cross_connect_init initiation of all external connections @@ -445,6 +455,10 @@ class ExperimentController(object): cross_data = self._get_cross_data(guid) testbed.do_cross_connect_compl(cross_data) + # Last chance to configure (parallel on all testbeds) + self._parallel([testbed.do_prestart + for testbed in self._testbeds.itervalues()]) + # start experiment (parallel start on all testbeds) self._parallel([testbed.start for testbed in self._testbeds.itervalues()]) diff --git a/src/nepi/core/metadata.py b/src/nepi/core/metadata.py index 784158c0..46ee1a2f 100644 --- a/src/nepi/core/metadata.py +++ b/src/nepi/core/metadata.py @@ -84,6 +84,15 @@ class VersionedMetadataInfo(object): """ return self.configure_order + @property + def prestart_order(self): + """ list of factory ids that indicates the order in which the elements + should be prestart-configured. + + Default: same as configure_order + """ + return self.configure_order + @property def start_order(self): """ list of factory ids that indicates the order in which the elements @@ -111,6 +120,13 @@ class VersionedMetadataInfo(object): (just after connections are made, just before netrefs are resolved) "configure_function": function for element configuration, + "prestart_function": function for pre-start + element configuration (just before starting applications), + useful for synchronization of background setup tasks or + lazy instantiation or configuration of attributes + that require connection/cross-connection state before + being created. + After this point, all applications should be able to run. "factory_attributes": list of references to attribute_ids, "box_attributes": list of regerences to attribute_ids, "traces": list of references to trace_id @@ -318,6 +334,10 @@ class Metadata(object): def preconfigure_order(self): return self._metadata.preconfigure_order + @property + def prestart_order(self): + return self._metadata.prestart_order + @property def start_order(self): return self._metadata.start_order @@ -383,6 +403,7 @@ class Metadata(object): status_function = info.get("status_function") configure_function = info.get("configure_function") preconfigure_function = info.get("preconfigure_function") + prestart_function = info.get("prestart_function") allow_addresses = info.get("allow_addresses", False) allow_routes = info.get("allow_routes", False) has_addresses = info.get("has_addresses", False) @@ -390,6 +411,7 @@ class Metadata(object): factory = Factory(factory_id, create_function, start_function, stop_function, status_function, configure_function, preconfigure_function, + prestart_function, allow_addresses, has_addresses, allow_routes, has_routes) diff --git a/src/nepi/core/testbed_impl.py b/src/nepi/core/testbed_impl.py index 320f7b0b..288b2346 100644 --- a/src/nepi/core/testbed_impl.py +++ b/src/nepi/core/testbed_impl.py @@ -192,23 +192,15 @@ class TestbedController(execute.TestbedController): self._status = TESTBED_STATUS_SETUP def do_create(self): - guids = dict() - # order guids (elements) according to factory_id - for guid, factory_id in self._create.iteritems(): - if not factory_id in guids: - guids[factory_id] = list() - guids[factory_id].append(guid) - # create elements following the factory_id order - for factory_id in self._metadata.create_order: - # omit the factories that have no element to create - if factory_id not in guids: - continue - factory = self._factories[factory_id] - for guid in guids[factory_id]: - factory.create_function(self, guid) - parameters = self._get_parameters(guid) - for name, value in parameters.iteritems(): - self.set(guid, name, value) + def set_params(self, guid): + parameters = self._get_parameters(guid) + for name, value in parameters.iteritems(): + self.set(guid, name, value) + + self._do_in_factory_order( + 'create_function', + self._metadata.create_order, + postaction = set_params ) self._status = TESTBED_STATUS_CREATED def _do_connect(self, init = True): @@ -241,43 +233,40 @@ class TestbedController(execute.TestbedController): self._do_connect(init = False) self._status = TESTBED_STATUS_CONNECTED - def do_preconfigure(self): - guids = dict() + def _do_in_factory_order(self, action, order, postaction = None): + guids = collections.defaultdict(list) # order guids (elements) according to factory_id for guid, factory_id in self._create.iteritems(): - if not factory_id in guids: - guids[factory_id] = list() guids[factory_id].append(guid) # configure elements following the factory_id order - for factory_id in self._metadata.preconfigure_order: + for factory_id in order: # omit the factories that have no element to create if factory_id not in guids: continue factory = self._factories[factory_id] - if not factory.preconfigure_function: + if not getattr(factory, action): continue for guid in guids[factory_id]: - factory.preconfigure_function(self, guid) + getattr(factory, action)(self, guid) + if postaction: + postaction(self, guid) + + def do_preconfigure(self): + self._do_in_factory_order( + 'preconfigure_function', + self._metadata.preconfigure_order ) def do_configure(self): - guids = dict() - # order guids (elements) according to factory_id - for guid, factory_id in self._create.iteritems(): - if not factory_id in guids: - guids[factory_id] = list() - guids[factory_id].append(guid) - # configure elements following the factory_id order - for factory_id in self._metadata.configure_order: - # omit the factories that have no element to create - if factory_id not in guids: - continue - factory = self._factories[factory_id] - if not factory.configure_function: - continue - for guid in guids[factory_id]: - factory.configure_function(self, guid) + self._do_in_factory_order( + 'configure_function', + self._metadata.configure_order ) self._status = TESTBED_STATUS_CONFIGURED + def do_prestart(self): + self._do_in_factory_order( + 'prestart_function', + self._metadata.prestart_order ) + def _do_cross_connect(self, cross_data, init = True): for guid, cross_connections in self._cross_connect.iteritems(): factory = self._get_factory(guid) @@ -409,33 +398,17 @@ class TestbedController(execute.TestbedController): return factory.box_attributes.attributes_list def start(self, time = TIME_NOW): - # Plan everything - # - group by factory_id - # - enqueue task callables - plan = collections.defaultdict(list) - - for guid, factory_id in self._create.iteritems(): - factory = self._factories[factory_id] - start_function = factory.start_function - if start_function: - plan[factory_id].append((start_function, guid)) - - # Execute plan, following the factory_id order - for factory_id in self._metadata.start_order: - if factory_id in plan: - for start_function, guid in plan[factory_id]: - start_function(self, guid) - + self._do_in_factory_order( + 'start_function', + self._metadata.start_order ) self._status = TESTBED_STATUS_STARTED #action: NotImplementedError def stop(self, time = TIME_NOW): - for guid, factory_id in self._create.iteritems(): - factory = self._factories[factory_id] - stop_function = factory.stop_function - if stop_function: - stop_function(self, guid) + self._do_in_factory_order( + 'stop_function', + reversed(self._metadata.start_order) ) self._status = TESTBED_STATUS_STOPPED def status(self, guid = None): diff --git a/src/nepi/testbeds/planetlab/metadata_v01.py b/src/nepi/testbeds/planetlab/metadata_v01.py index c2f7e412..d5c0ab9c 100644 --- a/src/nepi/testbeds/planetlab/metadata_v01.py +++ b/src/nepi/testbeds/planetlab/metadata_v01.py @@ -929,7 +929,7 @@ create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEP configure_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] -# Start node after ifaces, because the node needs the ifaces in order to set up routes +# Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes start_order = [ INTERNET, NODEIFACE, TAPIFACE, TUNIFACE, NODE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] factories_info = dict({ @@ -939,7 +939,7 @@ factories_info = dict({ "category": "topology", "create_function": create_node, "preconfigure_function": configure_node, - "start_function": configure_node_routes, + "prestart_function": configure_node_routes, "box_attributes": [ "forward_X11", "hostname", @@ -973,7 +973,7 @@ factories_info = dict({ "create_function": create_tuniface, "preconfigure_function": preconfigure_tuniface, "configure_function": postconfigure_tuniface, - "start_function": wait_tuniface, + "prestart_function": wait_tuniface, "box_attributes": [ "up", "device_name", "mtu", "snat", "pointopoint", "txqueuelen", @@ -989,7 +989,7 @@ factories_info = dict({ "create_function": create_tapiface, "preconfigure_function": preconfigure_tuniface, "configure_function": postconfigure_tuniface, - "start_function": wait_tuniface, + "prestart_function": wait_tuniface, "box_attributes": [ "up", "device_name", "mtu", "snat", "pointopoint", "txqueuelen", @@ -1136,6 +1136,10 @@ class VersionedMetadataInfo(metadata.VersionedMetadataInfo): def configure_order(self): return configure_order + @property + def prestart_order(self): + return start_order + @property def start_order(self): return start_order diff --git a/src/nepi/testbeds/planetlab/scripts/tun_connect.py b/src/nepi/testbeds/planetlab/scripts/tun_connect.py index 4950dec1..7cbe43aa 100644 --- a/src/nepi/testbeds/planetlab/scripts/tun_connect.py +++ b/src/nepi/testbeds/planetlab/scripts/tun_connect.py @@ -96,6 +96,12 @@ parser.add_option( help = "Specify a symmetric encryption key with which to protect packets across " "the tunnel. python-crypto must be installed on the system." ) +parser.add_option( + "-N", "--no-capture", dest="no_capture", + action = "store_true", + default = False, + help = "If specified, packets won't be logged to standard error " + "(default is to log them to standard error). " ) (options, remaining_args) = parser.parse_args(sys.argv[1:]) @@ -277,7 +283,10 @@ def tun_fwd(tun, remote): ether_mode = tun_name.startswith('tap'), cipher_key = options.cipher_key, udp = options.udp, - TERMINATE = TERMINATE) + TERMINATE = TERMINATE, + stderr = open("/dev/null","w") if options.no_capture + else sys.stderr + ) @@ -380,7 +389,16 @@ try: print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp) print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port) rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - rsock.bind((hostaddr,options.udp)) + for i in xrange(30): + try: + rsock.bind((hostaddr,options.udp)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "Could not bind. Retrying in a sec..." + time.sleep(1) + else: + rsock.bind((hostaddr,options.udp)) rsock.connect((remaining_args[0],options.port)) else: print >>sys.stderr, "Error: need a remote endpoint in UDP mode" @@ -404,7 +422,16 @@ try: else: print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port) lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - lsock.bind((hostaddr,options.port)) + for i in xrange(30): + try: + lsock.bind((hostaddr,options.port)) + break + except socket.error: + # wait a while, retry + print >>sys.stderr, "Could not bind. Retrying in a sec..." + time.sleep(1) + else: + lsock.bind((hostaddr,options.port)) lsock.listen(1) rsock,raddr = lsock.accept() remote = os.fdopen(rsock.fileno(), 'r+b', 0) diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index c4dcdba4..df23a7bd 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -176,6 +176,8 @@ class TunProtoBase(object): args.extend(("-P",str(local_p2p))) if local_txq: args.extend(("-Q",str(local_txq))) + if not local_cap: + args.append("-N") if extra_args: args.extend(map(str,extra_args)) if not listen and check_proto != 'fd': @@ -192,7 +194,7 @@ class TunProtoBase(object): pidfile = './pid', home = self.home_path, stdin = '/dev/null', - stdout = 'capture' if local_cap else '/dev/null', + stdout = 'capture', stderr = rspawn.STDOUT, sudo = True, @@ -219,36 +221,35 @@ class TunProtoBase(object): time.sleep(1.0) # Wait for the connection to be established - if local.capture: - for spin in xrange(30): - if self.status() != rspawn.RUNNING: - break - - (out,err),proc = server.popen_ssh_command( - "cd %(home)s ; grep -c Connected capture" % dict( - home = server.shell_escape(self.home_path)), - host = local.node.hostname, - port = None, - user = local.node.slicename, - agent = None, - ident_key = local.node.ident_path, - server_key = local.node.server_key - ) - - if proc.wait(): - break - - if out.strip() != '0': - break - - time.sleep(1.0) + for spin in xrange(30): + if self.status() != rspawn.RUNNING: + break + + (out,err),proc = server.popen_ssh_command( + "cd %(home)s ; grep -c Connected capture" % dict( + home = server.shell_escape(self.home_path)), + host = local.node.hostname, + port = None, + user = local.node.slicename, + agent = None, + ident_key = local.node.ident_path, + server_key = local.node.server_key + ) + + if proc.wait(): + break + + if out.strip() != '0': + break + + time.sleep(1.0) @property def if_name(self): if not self._if_name: # Inspect the trace to check the assigned iface local = self.local() - if local and local.capture: + if local: for spin in xrange(30): (out,err),proc = server.popen_ssh_command( "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict( diff --git a/src/nepi/util/proxy.py b/src/nepi/util/proxy.py index 90aa4857..059ccfff 100644 --- a/src/nepi/util/proxy.py +++ b/src/nepi/util/proxy.py @@ -55,6 +55,7 @@ TESTBED_ID = 35 TESTBED_VERSION = 36 EXPERIMENT_SET = 37 EXPERIMENT_GET = 38 +DO_PRESTART = 39 instruction_text = dict({ OK: "OK", @@ -582,6 +583,12 @@ class TestbedControllerServer(BaseServer): def do_preconfigure(self): self._testbed.do_preconfigure() + @Marshalling.handles(DO_PRESTART) + @Marshalling.args() + @Marshalling.retvoid + def do_prestart(self): + self._testbed.do_prestart() + @Marshalling.handles(DO_CROSS_CONNECT_INIT) @Marshalling.args( Marshalling.Decoders.pickled_data ) @Marshalling.retvoid diff --git a/src/nepi/util/tunchannel_impl.py b/src/nepi/util/tunchannel_impl.py index 0baabff0..14a34f4e 100644 --- a/src/nepi/util/tunchannel_impl.py +++ b/src/nepi/util/tunchannel_impl.py @@ -5,6 +5,7 @@ import threading import socket import select import weakref +import time from tunchannel import tun_fwd @@ -196,13 +197,29 @@ class TunChannel(object): if udp: # listen on udp port rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - rsock.bind((local_addr,local_port)) + for i in xrange(30): + try: + rsock.bind((local_addr,local_port)) + break + except socket.error: + # wait a while, retry + time.sleep(1) + else: + rsock.bind((local_addr,local_port)) rsock.connect((peer_addr,peer_port)) remote = os.fdopen(rsock.fileno(), 'r+b', 0) elif listen: # accept tcp connections lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - lsock.bind((local_addr,local_port)) + for i in xrange(30): + try: + lsock.bind((local_addr,local_port)) + break + except socket.error: + # wait a while, retry + time.sleep(1) + else: + lsock.bind((local_addr,local_port)) lsock.listen(1) rsock,raddr = lsock.accept() remote = os.fdopen(rsock.fileno(), 'r+b', 0) diff --git a/test/testbeds/netns/execute.py b/test/testbeds/netns/execute.py index d37d84f1..39fd54d6 100755 --- a/test/testbeds/netns/execute.py +++ b/test/testbeds/netns/execute.py @@ -47,6 +47,7 @@ class NetnsExecuteTestCase(unittest.TestCase): instance.do_connect_compl() instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(7) != STATUS_FINISHED: time.sleep(0.5) @@ -89,6 +90,7 @@ class NetnsExecuteTestCase(unittest.TestCase): instance.do_connect_compl() instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(6) != STATUS_FINISHED: time.sleep(0.5) @@ -150,6 +152,7 @@ class NetnsExecuteTestCase(unittest.TestCase): instance.do_connect_compl() instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(11) != STATUS_FINISHED: time.sleep(0.5) diff --git a/test/testbeds/ns3/execute.py b/test/testbeds/ns3/execute.py index be7a432f..f82ebf10 100755 --- a/test/testbeds/ns3/execute.py +++ b/test/testbeds/ns3/execute.py @@ -68,6 +68,7 @@ class Ns3ExecuteTestCase(unittest.TestCase): instance.do_connect_compl() instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(17) != STATUS_FINISHED: time.sleep(0.1) diff --git a/test/testbeds/ns3/execute2.py b/test/testbeds/ns3/execute2.py index 52ac5264..e04a4876 100755 --- a/test/testbeds/ns3/execute2.py +++ b/test/testbeds/ns3/execute2.py @@ -104,6 +104,7 @@ class Ns3ExecuteTestCase(unittest.TestCase): instance.do_connect_compl() instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(27) != STATUS_FINISHED: time.sleep(0.1) diff --git a/test/testbeds/planetlab/execute.py b/test/testbeds/planetlab/execute.py index d93068e5..920ef030 100755 --- a/test/testbeds/planetlab/execute.py +++ b/test/testbeds/planetlab/execute.py @@ -97,6 +97,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase): instance.do_configure() + instance.do_prestart() instance.start() while instance.status(7) != STATUS_FINISHED: time.sleep(0.5) @@ -134,6 +135,7 @@ class PlanetLabExecuteTestCase(unittest.TestCase): instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(5) != STATUS_FINISHED: time.sleep(0.5) @@ -191,6 +193,7 @@ FIONREAD = 0x[0-9a-fA-F]{8}.* instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(10) != STATUS_FINISHED: time.sleep(0.5) @@ -239,6 +242,7 @@ echo 'OKIDOKI' instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(6) != STATUS_FINISHED: time.sleep(0.5) @@ -287,6 +291,7 @@ echo 'OKIDOKI' instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(8) != STATUS_FINISHED: time.sleep(0.5) @@ -392,6 +397,7 @@ echo 'OKIDOKI' instance.do_configure() + instance.do_prestart() instance.start() while instance.status(9) != STATUS_FINISHED: time.sleep(0.5) @@ -450,6 +456,7 @@ echo 'OKIDOKI' instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(12) != STATUS_FINISHED: time.sleep(0.5) @@ -491,6 +498,7 @@ echo 'OKIDOKI' instance.do_preconfigure() instance.do_configure() + instance.do_prestart() instance.start() while instance.status(12) != STATUS_FINISHED: time.sleep(0.5) diff --git a/test/testbeds/planetlab/integration_ns3.py b/test/testbeds/planetlab/integration_ns3.py index 3e8a4c26..b2412e2f 100755 --- a/test/testbeds/planetlab/integration_ns3.py +++ b/test/testbeds/planetlab/integration_ns3.py @@ -134,7 +134,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'), "Test is expensive, requires NEPI_FULL_TESTS=yes") - def _test_ns3_in_pl(self): + def test_ns3_in_pl(self): ns3_testbed_id = "ns3" ns3_testbed_version = "3_9_RC3" @@ -164,7 +164,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'), "Test is expensive, requires NEPI_FULL_TESTS=yes") - def _test_ns3_in_pl_crossconnect(self): + def test_ns3_in_pl_crossconnect(self): pl, exp = self.make_experiment_desc() # Create PL node, ifaces, assign addresses @@ -229,7 +229,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase): "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'), "Test is expensive, requires NEPI_FULL_TESTS=yes") - def _test_ns3_in_pl_snat(self): + def test_ns3_in_pl_snat(self): pl, exp = self.make_experiment_desc() # Create PL node, ifaces, assign addresses