Made automatic proxy reconnection more robust
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 20 Jul 2011 12:33:38 +0000 (14:33 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 20 Jul 2011 12:33:38 +0000 (14:33 +0200)
src/nepi/util/server.py
test/testbeds/netns/integration.py
test/testbeds/ns3/integration.py
test/testbeds/planetlab/integration.py

index dbd0b56..92212f7 100644 (file)
@@ -60,7 +60,7 @@ def eintr_retry(func):
         for i in xrange(0 if retry else 4):
             try:
                 return func(*p, **kw)
-            except select.error, args:
+            except (select.error, socket.error), args:
                 if args[0] == errno.EINTR:
                     continue
                 else:
@@ -210,10 +210,8 @@ class Server(object):
         while '\n' not in chunk:
             try:
                 chunk = conn.recv(1024)
-            except socket.timeout:
-                continue
-            except OSError, e:
-                if e.errno != errno.EINTR:
+            except (OSError, socket.error), e:
+                if e[0] != errno.EINTR:
                     raise
                 else:
                     continue
@@ -289,8 +287,8 @@ class Forwarder(object):
     def send_to_server(self, data):
         try:
             self._ctrl_sock.send(data)
-        except IOError, e:
-            if e.errno == errno.EPIPE:
+        except (IOError, socket.error), e:
+            if e[0] == errno.EPIPE:
                 self.connect()
                 self._ctrl_sock.send(data)
             else:
@@ -306,11 +304,10 @@ class Forwarder(object):
         while '\n' not in chunk:
             try:
                 chunk = self._ctrl_sock.recv(1024)
-            except OSError, e:
-                if e.errno != errno.EINTR:
+            except (OSError, socket.error), e:
+                if e[0] != errno.EINTR:
                     raise
-                if chunk == '':
-                    continue
+                continue
             if chunk:
                 data.append(chunk)
             else:
index 01282af..4bc85da 100755 (executable)
@@ -17,8 +17,7 @@ class NetnsIntegrationTestCase(unittest.TestCase):
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
 
-    @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
-    def test_local_if(self):
+    def _test_if(self, daemonize_testbed, controller_access_configuration):
         exp_desc = ExperimentDescription()
         testbed_id = "netns"
         user = getpass.getuser()
@@ -47,120 +46,58 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         app.set_attribute_value("user", user)
         app.connector("node").connect(node1.connector("apps"))
         app.enable_trace("stdout")
+
+        if daemonize_testbed:
+            netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+            inst_root_dir = os.path.join(self.root_dir, "instance")
+            os.mkdir(inst_root_dir)
+            netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+            netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
         xml = exp_desc.to_xml()
 
-        controller = ExperimentController(xml, self.root_dir)
-        controller.start()
-        while not controller.is_finished(app.guid):
-            time.sleep(0.5)
-        ping_result = controller.trace(app.guid, "stdout")
-        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+        if controller_access_configuration:
+            controller = proxy.create_experiment_controller(xml, 
+                controller_access_configuration)
+        else:
+            controller = ExperimentController(xml, self.root_dir)
+        
+        try:
+            controller.start()
+            while not controller.is_finished(app.guid):
+                time.sleep(0.5)
+            ping_result = controller.trace(app.guid, "stdout")
+            comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
 
 --- 10.0.0.2 ping statistics ---
 1 packets transmitted, 1 received, 0% packet loss, time 0ms
 """
-        self.assertTrue(ping_result.startswith(comp_result))
-        controller.stop()
-        controller.shutdown()
+            self.assertTrue(ping_result.startswith(comp_result))
+        finally:
+            controller.stop()
+            controller.shutdown()
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
-    def test_all_daemonized_if(self):
-        exp_desc = ExperimentDescription()
-        testbed_id = "netns"
-        user = getpass.getuser()
-        netns_provider = FactoriesProvider(testbed_id)
-        netns_desc = exp_desc.add_testbed_description(netns_provider)
-        netns_desc.set_attribute_value("homeDirectory", self.root_dir)
-        #netns_desc.set_attribute_value("enableDebug", True)
-        node1 = netns_desc.create("Node")
-        node2 = netns_desc.create("Node")
-        iface1 = netns_desc.create("NodeInterface")
-        iface1.set_attribute_value("up", True)
-        node1.connector("devs").connect(iface1.connector("node"))
-        ip1 = iface1.add_address()
-        ip1.set_attribute_value("Address", "10.0.0.1")
-        iface2 = netns_desc.create("NodeInterface")
-        iface2.set_attribute_value("up", True)
-        node2.connector("devs").connect(iface2.connector("node"))
-        ip2 = iface2.add_address()
-        ip2.set_attribute_value("Address", "10.0.0.2")
-        switch = netns_desc.create("Switch")
-        switch.set_attribute_value("up", True)
-        iface1.connector("switch").connect(switch.connector("devs"))
-        iface2.connector("switch").connect(switch.connector("devs"))
-        app = netns_desc.create("Application")
-        app.set_attribute_value("command", "ping -qc1 10.0.0.2")
-        app.set_attribute_value("user", user)
-        app.connector("node").connect(node1.connector("apps"))
-        app.enable_trace("stdout")
-
-        netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
-        inst_root_dir = os.path.join(self.root_dir, "instance")
-        os.mkdir(inst_root_dir)
-        netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
-        netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
-
-        xml = exp_desc.to_xml()
+    def test_local_if(self):
+        self._test_if(
+            daemonize_testbed = False,
+            controller_access_configuration = None)
 
+    @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+    def test_all_daemonized_if(self):
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
         access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
-        controller = proxy.create_experiment_controller(xml, access_config)
-
-        controller.start()
-        while not controller.is_finished(app.guid):
-            time.sleep(0.5)
-        ping_result = controller.trace(app.guid, "stdout")
-        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
-
---- 10.0.0.2 ping statistics ---
-1 packets transmitted, 1 received, 0% packet loss, time 0ms
-"""
-        self.assertTrue(ping_result.startswith(comp_result))
-        controller.stop()
-        controller.shutdown()
+        
+        self._test_if(
+            daemonize_testbed = True,
+            controller_access_configuration = access_config)
 
     @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
     def test_all_ssh_daemonized_if(self):
-        exp_desc = ExperimentDescription()
-        testbed_id = "netns"
         env = test_util.test_environment()
-        user = getpass.getuser()
-        netns_provider = FactoriesProvider(testbed_id)
-        netns_desc = exp_desc.add_testbed_description(netns_provider)
-        netns_desc.set_attribute_value("homeDirectory", self.root_dir)
-        #netns_desc.set_attribute_value("enableDebug", True)
-        node1 = netns_desc.create("Node")
-        node2 = netns_desc.create("Node")
-        iface1 = netns_desc.create("NodeInterface")
-        iface1.set_attribute_value("up", True)
-        node1.connector("devs").connect(iface1.connector("node"))
-        ip1 = iface1.add_address()
-        ip1.set_attribute_value("Address", "10.0.0.1")
-        iface2 = netns_desc.create("NodeInterface")
-        iface2.set_attribute_value("up", True)
-        node2.connector("devs").connect(iface2.connector("node"))
-        ip2 = iface2.add_address()
-        ip2.set_attribute_value("Address", "10.0.0.2")
-        switch = netns_desc.create("Switch")
-        switch.set_attribute_value("up", True)
-        iface1.connector("switch").connect(switch.connector("devs"))
-        iface2.connector("switch").connect(switch.connector("devs"))
-        app = netns_desc.create("Application")
-        app.set_attribute_value("command", "ping -qc1 10.0.0.2")
-        app.set_attribute_value("user", user)
-        app.connector("node").connect(node1.connector("apps"))
-        app.enable_trace("stdout")
-
-        netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
-        inst_root_dir = os.path.join(self.root_dir, "instance")
-        os.mkdir(inst_root_dir)
-        netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
-        netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
-
-        xml = exp_desc.to_xml()
-
+        
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
@@ -168,20 +105,10 @@ class NetnsIntegrationTestCase(unittest.TestCase):
         access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
         access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
         access_config.set_attribute_value(DC.USE_AGENT, True)
-        controller = proxy.create_experiment_controller(xml, access_config)
-
-        controller.start()
-        while not controller.is_finished(app.guid):
-            time.sleep(0.5)
-        ping_result = controller.trace(app.guid, "stdout")
-        comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
-
---- 10.0.0.2 ping statistics ---
-1 packets transmitted, 1 received, 0% packet loss, time 0ms
-"""
-        self.assertTrue(ping_result.startswith(comp_result))
-        controller.stop()
-        controller.shutdown()
+        
+        self._test_if(
+            daemonize_testbed = True,
+            controller_access_configuration = access_config)
 
     def tearDown(self):
         try:
index 0ff968c..9b615f7 100755 (executable)
@@ -17,9 +17,7 @@ class Ns3IntegrationTestCase(unittest.TestCase):
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
 
-    @test_util.skipUnless(test_util.ns3_usable(), 
-           "Test requires working ns-3 bindings")
-    def test_local_if(self):
+    def _test_if(self, daemonize_testbed, controller_access_configuration):
         exp_desc = ExperimentDescription()
         testbed_id = "ns3"
         ns3_provider = FactoriesProvider(testbed_id)
@@ -66,90 +64,68 @@ class Ns3IntegrationTestCase(unittest.TestCase):
         app.set_attribute_value("StopTime", "20s")
         app.connector("node").connect(node1.connector("apps"))
 
+        if daemonize_testbed:
+            ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+            inst_root_dir = os.path.join(self.root_dir, "instance")
+            os.mkdir(inst_root_dir)
+            ns3_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+            ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
         xml = exp_desc.to_xml()
-        controller = ExperimentController(xml, self.root_dir)
-        controller.start()
-        while not controller.is_finished(app.guid):
-            time.sleep(0.5)
-        ping_result = controller.trace(iface2.guid, "P2PAsciiTrace")
-        comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)"
-        if ping_result.find(comp_result) == -1:
-            self.fail("Unexpected trace: %s" % (ping_result,))
-        controller.stop()
-        controller.shutdown()
+        
+        if controller_access_configuration:
+            controller = ExperimentController(xml, self.root_dir)
+        else:
+            controller = proxy.create_experiment_controller(xml, 
+                controller_access_configuration)
+        
+        try:
+            controller.start()
+            while not controller.is_finished(app.guid):
+                time.sleep(0.5)
+            ping_result = controller.trace(iface2.guid, "P2PAsciiTrace")
+            comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)"
+            if ping_result.find(comp_result) == -1:
+                self.fail("Unexpected trace: %s" % (ping_result,))
+        finally:
+            controller.stop()
+            controller.shutdown()
 
     @test_util.skipUnless(test_util.ns3_usable(), 
            "Test requires working ns-3 bindings")
-    def test_all_daemonized_if(self):
-        exp_desc = ExperimentDescription()
-        testbed_id = "ns3"
-        ns3_provider = FactoriesProvider(testbed_id)
-        ns3_desc = exp_desc.add_testbed_description(ns3_provider)
-        ns3_desc.set_attribute_value("homeDirectory", self.root_dir)
-
-        node1 = ns3_desc.create("ns3::Node")
-        ipv41 = ns3_desc.create("ns3::Ipv4L3Protocol")
-        arp1  = ns3_desc.create("ns3::ArpL3Protocol")
-        icmp1 = ns3_desc.create("ns3::Icmpv4L4Protocol")
-        node1.connector("protos").connect(ipv41.connector("node"))
-        node1.connector("protos").connect(arp1.connector("node"))
-        node1.connector("protos").connect(icmp1.connector("node"))
-        iface1 = ns3_desc.create("ns3::PointToPointNetDevice")
-        queue1 = ns3_desc.create("ns3::DropTailQueue")
-        node1.connector("devs").connect(iface1.connector("node"))
-        iface1.connector("queue").connect(queue1.connector("dev"))
-        trace1 = iface1.enable_trace("P2PAsciiTrace")
-        ip1 = iface1.add_address()
-        ip1.set_attribute_value("Address", "10.0.0.1")
-
-        node2 = ns3_desc.create("ns3::Node")
-        ipv42 = ns3_desc.create("ns3::Ipv4L3Protocol")
-        arp2  = ns3_desc.create("ns3::ArpL3Protocol")
-        icmp2 = ns3_desc.create("ns3::Icmpv4L4Protocol")
-        node2.connector("protos").connect(ipv42.connector("node"))
-        node2.connector("protos").connect(arp2.connector("node"))
-        node2.connector("protos").connect(icmp2.connector("node"))
-        iface2 = ns3_desc.create("ns3::PointToPointNetDevice")
-        queue2 = ns3_desc.create("ns3::DropTailQueue")
-        node2.connector("devs").connect(iface2.connector("node"))
-        iface2.connector("queue").connect(queue2.connector("dev"))
-        trace2 = iface2.enable_trace("P2PAsciiTrace")
-        ip2 = iface2.add_address()
-        ip2.set_attribute_value("Address", "10.0.0.2")
+    def test_local_if(self):
+        self._test_if(
+            daemonize_testbed = False,
+            controller_access_configuration = None)
 
-        chan = ns3_desc.create("ns3::PointToPointChannel")
-        iface1.connector("chan").connect(chan.connector("dev2"))
-        iface2.connector("chan").connect(chan.connector("dev2"))
+    @test_util.skipUnless(test_util.ns3_usable(), 
+           "Test requires working ns-3 bindings")
+    def test_all_daemonized_if(self):
+        access_config = proxy.AccessConfiguration()
+        access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+        access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
 
-        app = ns3_desc.create("ns3::V4Ping")
-        app.set_attribute_value("Remote", "10.0.0.2")
-        app.set_attribute_value("StartTime", "0s")
-        app.set_attribute_value("StopTime", "20s")
-        app.connector("node").connect(node1.connector("apps"))
+        self._test_if(
+            daemonize_testbed = True,
+            controller_access_configuration = access_config)
 
-        ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
-        inst_root_dir = os.path.join(self.root_dir, "instance")
-        os.mkdir(inst_root_dir)
-        ns3_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
-        ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+    @test_util.skipUnless(test_util.ns3_usable(), 
+           "Test requires working ns-3 bindings")
+    def test_all_ssh_daemonized_if(self):
+        env = test_util.test_environment()
 
-        xml = exp_desc.to_xml()
         access_config = proxy.AccessConfiguration()
         access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
         access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
         access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
-        controller = proxy.create_experiment_controller(xml, access_config)
-
-        controller.start()
-        while not controller.is_finished(app.guid):
-            time.sleep(0.5)
-          
-        ping_result = controller.trace(iface2.guid, "P2PAsciiTrace")
-        comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)"
-        if ping_result.find(comp_result) == -1:
-            self.fail("Unexpected trace: %s" % (ping_result,))
-        controller.stop()
-        controller.shutdown()
+        access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+        access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+        access_config.set_attribute_value(DC.USE_AGENT, True)
+        
+        self._test_if(
+            daemonize_testbed = True,
+            controller_access_configuration = access_config)
 
     def tearDown(self):
         try:
index 2bccd69..d8fee00 100755 (executable)
@@ -5,6 +5,7 @@ import getpass
 from nepi.core.design import ExperimentDescription, FactoriesProvider
 from nepi.core.execute import ExperimentController
 from nepi.util import proxy
+from nepi.util.constants import DeploymentConfiguration as DC
 import os
 import shutil
 import tempfile
@@ -56,9 +57,8 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("plcHost", plchost)
         
         return pl_desc, exp_desc
-
-    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
-    def test_simple(self):
+    
+    def _test_simple(self, daemonize_testbed, controller_access_configuration):
         pl, exp = self.make_experiment_desc()
         
         node1 = pl.create("Node")
@@ -78,9 +78,19 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
         app.enable_trace("stdout")
         app.connector("node").connect(node1.connector("apps"))
 
-        xml = exp.to_xml()
-
-        controller = ExperimentController(xml, self.root_dir)
+        if daemonize_testbed:
+            pl.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+            inst_root_dir = os.path.join(self.root_dir, "instance")
+            os.mkdir(inst_root_dir)
+            pl.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+            pl.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
+        if controller_access_configuration:
+            controller = proxy.create_experiment_controller(xml, 
+                controller_access_configuration)
+        else:
+            controller = ExperimentController(xml, self.root_dir)
+        
         try:
             controller.start()
             while not controller.is_finished(app.guid):
@@ -98,7 +108,6 @@ class PlanetLabIntegrationTestCase(unittest.TestCase):
             controller.stop()
             controller.shutdown()
 
-
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_spanning_deployment(self):
         pl, exp = self.make_experiment_desc()
@@ -165,6 +174,40 @@ FIONREAD = 0x[0-9a-fA-F]{8}.*
         finally:
             controller.stop()
             controller.shutdown()
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_simple(self):
+        self._test_simple(
+            daemonize_testbed = False,
+            controller_access_configuration = None)
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_simple_daemonized(self):
+        access_config = proxy.AccessConfiguration({
+            DC.DEPLOYMENT_MODE : DC.MODE_DAEMON,
+            DC.ROOT_DIRECTORY : self.root_dir,
+        })
+
+        self._test_simple(
+            daemonize_testbed = False,
+            controller_access_configuration = access_config)
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_simple_ssh(self):
+        env = test_util.test_environment()
+
+        access_config = proxy.AccessConfiguration({
+            DC.DEPLOYMENT_MODE : DC.MODE_DAEMON,
+            DC.ROOT_DIRECTORY : self.root_dir,
+            DC.LOG_LEVEL : DC.DEBUG_LEVEL,
+            DC.DEPLOYMENT_COMMUNICATION : DC.ACCESS_SSH,
+            DC.DEPLOYMENT_PORT : env.port,
+            DC.USE_AGENT : True,
+        })
+
+        self._test_simple(
+            daemonize_testbed = False,
+            controller_access_configuration = access_config)
         
 
 if __name__ == '__main__':