From: Claudio-Daniel Freire Date: Wed, 20 Jul 2011 12:33:38 +0000 (+0200) Subject: Made automatic proxy reconnection more robust X-Git-Tag: nepi-3.0.0~358 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=0732c02603149fa475acaeaa6e4075c465126c77;p=nepi.git Made automatic proxy reconnection more robust --- diff --git a/src/nepi/util/server.py b/src/nepi/util/server.py index dbd0b563..92212f74 100644 --- a/src/nepi/util/server.py +++ b/src/nepi/util/server.py @@ -60,7 +60,7 @@ def eintr_retry(func): for i in xrange(0 if retry else 4): try: return func(*p, **kw) - except select.error, args: + except (select.error, socket.error), args: if args[0] == errno.EINTR: continue else: @@ -210,10 +210,8 @@ class Server(object): while '\n' not in chunk: try: chunk = conn.recv(1024) - except socket.timeout: - continue - except OSError, e: - if e.errno != errno.EINTR: + except (OSError, socket.error), e: + if e[0] != errno.EINTR: raise else: continue @@ -289,8 +287,8 @@ class Forwarder(object): def send_to_server(self, data): try: self._ctrl_sock.send(data) - except IOError, e: - if e.errno == errno.EPIPE: + except (IOError, socket.error), e: + if e[0] == errno.EPIPE: self.connect() self._ctrl_sock.send(data) else: @@ -306,11 +304,10 @@ class Forwarder(object): while '\n' not in chunk: try: chunk = self._ctrl_sock.recv(1024) - except OSError, e: - if e.errno != errno.EINTR: + except (OSError, socket.error), e: + if e[0] != errno.EINTR: raise - if chunk == '': - continue + continue if chunk: data.append(chunk) else: diff --git a/test/testbeds/netns/integration.py b/test/testbeds/netns/integration.py index 01282afb..4bc85dae 100755 --- a/test/testbeds/netns/integration.py +++ b/test/testbeds/netns/integration.py @@ -17,8 +17,7 @@ class NetnsIntegrationTestCase(unittest.TestCase): def setUp(self): self.root_dir = tempfile.mkdtemp() - @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") - def test_local_if(self): + def _test_if(self, daemonize_testbed, controller_access_configuration): exp_desc = ExperimentDescription() testbed_id = "netns" user = getpass.getuser() @@ -47,120 +46,58 @@ class NetnsIntegrationTestCase(unittest.TestCase): app.set_attribute_value("user", user) app.connector("node").connect(node1.connector("apps")) app.enable_trace("stdout") + + if daemonize_testbed: + netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) + netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + xml = exp_desc.to_xml() - controller = ExperimentController(xml, self.root_dir) - controller.start() - while not controller.is_finished(app.guid): - time.sleep(0.5) - ping_result = controller.trace(app.guid, "stdout") - comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. + if controller_access_configuration: + controller = proxy.create_experiment_controller(xml, + controller_access_configuration) + else: + controller = ExperimentController(xml, self.root_dir) + + try: + controller.start() + while not controller.is_finished(app.guid): + time.sleep(0.5) + ping_result = controller.trace(app.guid, "stdout") + comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. --- 10.0.0.2 ping statistics --- 1 packets transmitted, 1 received, 0% packet loss, time 0ms """ - self.assertTrue(ping_result.startswith(comp_result)) - controller.stop() - controller.shutdown() + self.assertTrue(ping_result.startswith(comp_result)) + finally: + controller.stop() + controller.shutdown() @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") - def test_all_daemonized_if(self): - exp_desc = ExperimentDescription() - testbed_id = "netns" - user = getpass.getuser() - netns_provider = FactoriesProvider(testbed_id) - netns_desc = exp_desc.add_testbed_description(netns_provider) - netns_desc.set_attribute_value("homeDirectory", self.root_dir) - #netns_desc.set_attribute_value("enableDebug", True) - node1 = netns_desc.create("Node") - node2 = netns_desc.create("Node") - iface1 = netns_desc.create("NodeInterface") - iface1.set_attribute_value("up", True) - node1.connector("devs").connect(iface1.connector("node")) - ip1 = iface1.add_address() - ip1.set_attribute_value("Address", "10.0.0.1") - iface2 = netns_desc.create("NodeInterface") - iface2.set_attribute_value("up", True) - node2.connector("devs").connect(iface2.connector("node")) - ip2 = iface2.add_address() - ip2.set_attribute_value("Address", "10.0.0.2") - switch = netns_desc.create("Switch") - switch.set_attribute_value("up", True) - iface1.connector("switch").connect(switch.connector("devs")) - iface2.connector("switch").connect(switch.connector("devs")) - app = netns_desc.create("Application") - app.set_attribute_value("command", "ping -qc1 10.0.0.2") - app.set_attribute_value("user", user) - app.connector("node").connect(node1.connector("apps")) - app.enable_trace("stdout") - - netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) - inst_root_dir = os.path.join(self.root_dir, "instance") - os.mkdir(inst_root_dir) - netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) - - xml = exp_desc.to_xml() + def test_local_if(self): + self._test_if( + daemonize_testbed = False, + controller_access_configuration = None) + @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") + def test_all_daemonized_if(self): access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) - controller = proxy.create_experiment_controller(xml, access_config) - - controller.start() - while not controller.is_finished(app.guid): - time.sleep(0.5) - ping_result = controller.trace(app.guid, "stdout") - comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. - ---- 10.0.0.2 ping statistics --- -1 packets transmitted, 1 received, 0% packet loss, time 0ms -""" - self.assertTrue(ping_result.startswith(comp_result)) - controller.stop() - controller.shutdown() + + self._test_if( + daemonize_testbed = True, + controller_access_configuration = access_config) @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges") def test_all_ssh_daemonized_if(self): - exp_desc = ExperimentDescription() - testbed_id = "netns" env = test_util.test_environment() - user = getpass.getuser() - netns_provider = FactoriesProvider(testbed_id) - netns_desc = exp_desc.add_testbed_description(netns_provider) - netns_desc.set_attribute_value("homeDirectory", self.root_dir) - #netns_desc.set_attribute_value("enableDebug", True) - node1 = netns_desc.create("Node") - node2 = netns_desc.create("Node") - iface1 = netns_desc.create("NodeInterface") - iface1.set_attribute_value("up", True) - node1.connector("devs").connect(iface1.connector("node")) - ip1 = iface1.add_address() - ip1.set_attribute_value("Address", "10.0.0.1") - iface2 = netns_desc.create("NodeInterface") - iface2.set_attribute_value("up", True) - node2.connector("devs").connect(iface2.connector("node")) - ip2 = iface2.add_address() - ip2.set_attribute_value("Address", "10.0.0.2") - switch = netns_desc.create("Switch") - switch.set_attribute_value("up", True) - iface1.connector("switch").connect(switch.connector("devs")) - iface2.connector("switch").connect(switch.connector("devs")) - app = netns_desc.create("Application") - app.set_attribute_value("command", "ping -qc1 10.0.0.2") - app.set_attribute_value("user", user) - app.connector("node").connect(node1.connector("apps")) - app.enable_trace("stdout") - - netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) - inst_root_dir = os.path.join(self.root_dir, "instance") - os.mkdir(inst_root_dir) - netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) - - xml = exp_desc.to_xml() - + access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) @@ -168,20 +105,10 @@ class NetnsIntegrationTestCase(unittest.TestCase): access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) access_config.set_attribute_value(DC.USE_AGENT, True) - controller = proxy.create_experiment_controller(xml, access_config) - - controller.start() - while not controller.is_finished(app.guid): - time.sleep(0.5) - ping_result = controller.trace(app.guid, "stdout") - comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data. - ---- 10.0.0.2 ping statistics --- -1 packets transmitted, 1 received, 0% packet loss, time 0ms -""" - self.assertTrue(ping_result.startswith(comp_result)) - controller.stop() - controller.shutdown() + + self._test_if( + daemonize_testbed = True, + controller_access_configuration = access_config) def tearDown(self): try: diff --git a/test/testbeds/ns3/integration.py b/test/testbeds/ns3/integration.py index 0ff968cf..9b615f7e 100755 --- a/test/testbeds/ns3/integration.py +++ b/test/testbeds/ns3/integration.py @@ -17,9 +17,7 @@ class Ns3IntegrationTestCase(unittest.TestCase): def setUp(self): self.root_dir = tempfile.mkdtemp() - @test_util.skipUnless(test_util.ns3_usable(), - "Test requires working ns-3 bindings") - def test_local_if(self): + def _test_if(self, daemonize_testbed, controller_access_configuration): exp_desc = ExperimentDescription() testbed_id = "ns3" ns3_provider = FactoriesProvider(testbed_id) @@ -66,90 +64,68 @@ class Ns3IntegrationTestCase(unittest.TestCase): app.set_attribute_value("StopTime", "20s") app.connector("node").connect(node1.connector("apps")) + if daemonize_testbed: + ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + ns3_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) + ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + xml = exp_desc.to_xml() - controller = ExperimentController(xml, self.root_dir) - controller.start() - while not controller.is_finished(app.guid): - time.sleep(0.5) - ping_result = controller.trace(iface2.guid, "P2PAsciiTrace") - comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)" - if ping_result.find(comp_result) == -1: - self.fail("Unexpected trace: %s" % (ping_result,)) - controller.stop() - controller.shutdown() + + if controller_access_configuration: + controller = ExperimentController(xml, self.root_dir) + else: + controller = proxy.create_experiment_controller(xml, + controller_access_configuration) + + try: + controller.start() + while not controller.is_finished(app.guid): + time.sleep(0.5) + ping_result = controller.trace(iface2.guid, "P2PAsciiTrace") + comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)" + if ping_result.find(comp_result) == -1: + self.fail("Unexpected trace: %s" % (ping_result,)) + finally: + controller.stop() + controller.shutdown() @test_util.skipUnless(test_util.ns3_usable(), "Test requires working ns-3 bindings") - def test_all_daemonized_if(self): - exp_desc = ExperimentDescription() - testbed_id = "ns3" - ns3_provider = FactoriesProvider(testbed_id) - ns3_desc = exp_desc.add_testbed_description(ns3_provider) - ns3_desc.set_attribute_value("homeDirectory", self.root_dir) - - node1 = ns3_desc.create("ns3::Node") - ipv41 = ns3_desc.create("ns3::Ipv4L3Protocol") - arp1 = ns3_desc.create("ns3::ArpL3Protocol") - icmp1 = ns3_desc.create("ns3::Icmpv4L4Protocol") - node1.connector("protos").connect(ipv41.connector("node")) - node1.connector("protos").connect(arp1.connector("node")) - node1.connector("protos").connect(icmp1.connector("node")) - iface1 = ns3_desc.create("ns3::PointToPointNetDevice") - queue1 = ns3_desc.create("ns3::DropTailQueue") - node1.connector("devs").connect(iface1.connector("node")) - iface1.connector("queue").connect(queue1.connector("dev")) - trace1 = iface1.enable_trace("P2PAsciiTrace") - ip1 = iface1.add_address() - ip1.set_attribute_value("Address", "10.0.0.1") - - node2 = ns3_desc.create("ns3::Node") - ipv42 = ns3_desc.create("ns3::Ipv4L3Protocol") - arp2 = ns3_desc.create("ns3::ArpL3Protocol") - icmp2 = ns3_desc.create("ns3::Icmpv4L4Protocol") - node2.connector("protos").connect(ipv42.connector("node")) - node2.connector("protos").connect(arp2.connector("node")) - node2.connector("protos").connect(icmp2.connector("node")) - iface2 = ns3_desc.create("ns3::PointToPointNetDevice") - queue2 = ns3_desc.create("ns3::DropTailQueue") - node2.connector("devs").connect(iface2.connector("node")) - iface2.connector("queue").connect(queue2.connector("dev")) - trace2 = iface2.enable_trace("P2PAsciiTrace") - ip2 = iface2.add_address() - ip2.set_attribute_value("Address", "10.0.0.2") + def test_local_if(self): + self._test_if( + daemonize_testbed = False, + controller_access_configuration = None) - chan = ns3_desc.create("ns3::PointToPointChannel") - iface1.connector("chan").connect(chan.connector("dev2")) - iface2.connector("chan").connect(chan.connector("dev2")) + @test_util.skipUnless(test_util.ns3_usable(), + "Test requires working ns-3 bindings") + def test_all_daemonized_if(self): + access_config = proxy.AccessConfiguration() + access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) + access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) - app = ns3_desc.create("ns3::V4Ping") - app.set_attribute_value("Remote", "10.0.0.2") - app.set_attribute_value("StartTime", "0s") - app.set_attribute_value("StopTime", "20s") - app.connector("node").connect(node1.connector("apps")) + self._test_if( + daemonize_testbed = True, + controller_access_configuration = access_config) - ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) - inst_root_dir = os.path.join(self.root_dir, "instance") - os.mkdir(inst_root_dir) - ns3_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) - ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + @test_util.skipUnless(test_util.ns3_usable(), + "Test requires working ns-3 bindings") + def test_all_ssh_daemonized_if(self): + env = test_util.test_environment() - xml = exp_desc.to_xml() access_config = proxy.AccessConfiguration() access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir) access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) - controller = proxy.create_experiment_controller(xml, access_config) - - controller.start() - while not controller.is_finished(app.guid): - time.sleep(0.5) - - ping_result = controller.trace(iface2.guid, "P2PAsciiTrace") - comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)" - if ping_result.find(comp_result) == -1: - self.fail("Unexpected trace: %s" % (ping_result,)) - controller.stop() - controller.shutdown() + access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) + access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port) + access_config.set_attribute_value(DC.USE_AGENT, True) + + self._test_if( + daemonize_testbed = True, + controller_access_configuration = access_config) def tearDown(self): try: diff --git a/test/testbeds/planetlab/integration.py b/test/testbeds/planetlab/integration.py index 2bccd692..d8fee003 100755 --- a/test/testbeds/planetlab/integration.py +++ b/test/testbeds/planetlab/integration.py @@ -5,6 +5,7 @@ import getpass from nepi.core.design import ExperimentDescription, FactoriesProvider from nepi.core.execute import ExperimentController from nepi.util import proxy +from nepi.util.constants import DeploymentConfiguration as DC import os import shutil import tempfile @@ -56,9 +57,8 @@ class PlanetLabIntegrationTestCase(unittest.TestCase): pl_desc.set_attribute_value("plcHost", plchost) return pl_desc, exp_desc - - @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") - def test_simple(self): + + def _test_simple(self, daemonize_testbed, controller_access_configuration): pl, exp = self.make_experiment_desc() node1 = pl.create("Node") @@ -78,9 +78,19 @@ class PlanetLabIntegrationTestCase(unittest.TestCase): app.enable_trace("stdout") app.connector("node").connect(node1.connector("apps")) - xml = exp.to_xml() - - controller = ExperimentController(xml, self.root_dir) + if daemonize_testbed: + pl.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + inst_root_dir = os.path.join(self.root_dir, "instance") + os.mkdir(inst_root_dir) + pl.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir) + pl.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + + if controller_access_configuration: + controller = proxy.create_experiment_controller(xml, + controller_access_configuration) + else: + controller = ExperimentController(xml, self.root_dir) + try: controller.start() while not controller.is_finished(app.guid): @@ -98,7 +108,6 @@ class PlanetLabIntegrationTestCase(unittest.TestCase): controller.stop() controller.shutdown() - @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") def test_spanning_deployment(self): pl, exp = self.make_experiment_desc() @@ -165,6 +174,40 @@ FIONREAD = 0x[0-9a-fA-F]{8}.* finally: controller.stop() controller.shutdown() + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_simple(self): + self._test_simple( + daemonize_testbed = False, + controller_access_configuration = None) + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_simple_daemonized(self): + access_config = proxy.AccessConfiguration({ + DC.DEPLOYMENT_MODE : DC.MODE_DAEMON, + DC.ROOT_DIRECTORY : self.root_dir, + }) + + self._test_simple( + daemonize_testbed = False, + controller_access_configuration = access_config) + + @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") + def test_simple_ssh(self): + env = test_util.test_environment() + + access_config = proxy.AccessConfiguration({ + DC.DEPLOYMENT_MODE : DC.MODE_DAEMON, + DC.ROOT_DIRECTORY : self.root_dir, + DC.LOG_LEVEL : DC.DEBUG_LEVEL, + DC.DEPLOYMENT_COMMUNICATION : DC.ACCESS_SSH, + DC.DEPLOYMENT_PORT : env.port, + DC.USE_AGENT : True, + }) + + self._test_simple( + daemonize_testbed = False, + controller_access_configuration = access_config) if __name__ == '__main__':