Ticket #30: Routing in PlanetLab, with a (currently broken) test
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 20 May 2011 12:04:18 +0000 (14:04 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 20 May 2011 12:04:18 +0000 (14:04 +0200)
TODO:
 - dynamically modify routes
 - dynamically query routes
 - fix some synchronization issues with routes+cross-connections
 - parallelize routing table setup

src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/tunproto.py
test/testbeds/planetlab/integration_ns3.py

index 1f9ad30..0567102 100644 (file)
@@ -9,6 +9,7 @@ import subprocess
 import os
 import os.path
 import random
+import ipaddr
 
 import tunproto
 
@@ -164,7 +165,39 @@ class TunIface(object):
             " up" if self.up else " down",
             " snat" if self.snat else "",
         )
+    
+    @property
+    def if_name(self):
+        if self.peer_proto_impl:
+            return self.peer_proto_impl.if_name
 
+    def routes_here(self, route):
+        """
+        Returns True if the route should be attached to this interface
+        (ie, it references a gateway in this interface's network segment)
+        """
+        if self.address and self.netprefix:
+            addr, prefix = self.address, self.netprefix
+            if self.pointopoint:
+                prefix = 32
+                
+            dest, destprefix, nexthop = route
+            
+            myNet = ipaddr.IPNetwork("%s/%d" % (addr, prefix))
+            gwIp = ipaddr.IPNetwork(nexthop)
+            
+            tgtIp = ipaddr.IPNetwork(dest 
+                + (("/%d" % destprefix) if destprefix else "") )
+            
+            if gwIp in myNet or tgtIp in myNet:
+                return True
+            
+            if self.pointopoint:
+                peerIp = ipaddr.IPNetwork(self.pointopoint)
+                
+                if gwIp == peerIp:
+                    return True
+    
     def add_address(self, address, netprefix, broadcast):
         if (self.address or self.netprefix or self.netmask) is not None:
             raise RuntimeError, "Cannot add more than one address to %s interfaces" % (self._KIND,)
index cbb736a..c2f7e41 100644 (file)
@@ -182,11 +182,29 @@ def create_nodeiface(testbed_instance, guid):
 def create_tuniface(testbed_instance, guid):
     parameters = testbed_instance._get_parameters(guid)
     element = testbed_instance._make_tun_iface(parameters)
+    
+    # Set custom addresses, if there are any already
+    # Setting this early helps set up P2P links
+    if guid in testbed_instance._add_address and not (element.address or element.netmask or element.netprefix):
+        addresses = testbed_instance._add_address[guid]
+        for address in addresses:
+            (address, netprefix, broadcast) = address
+            element.add_address(address, netprefix, broadcast)
+    
     testbed_instance.elements[guid] = element
 
 def create_tapiface(testbed_instance, guid):
     parameters = testbed_instance._get_parameters(guid)
     element = testbed_instance._make_tap_iface(parameters)
+    
+    # Set custom addresses, if there are any already
+    # Setting this early helps set up P2P links
+    if guid in testbed_instance._add_address and not (element.address or element.netmask or element.netprefix):
+        addresses = testbed_instance._add_address[guid]
+        for address in addresses:
+            (address, netprefix, broadcast) = address
+            element.add_address(address, netprefix, broadcast)
+    
     testbed_instance.elements[guid] = element
 
 def create_application(testbed_instance, guid):
@@ -286,7 +304,7 @@ def configure_nodeiface(testbed_instance, guid):
 def preconfigure_tuniface(testbed_instance, guid):
     element = testbed_instance._elements[guid]
     
-    # Set custom addresses if any
+    # Set custom addresses if any, and if not set already
     if guid in testbed_instance._add_address and not (element.address or element.netmask or element.netprefix):
         addresses = testbed_instance._add_address[guid]
         for address in addresses:
@@ -364,6 +382,18 @@ def configure_node(testbed_instance, guid):
     # this call only spawns the process
     node.install_dependencies()
 
+def configure_node_routes(testbed_instance, guid):
+    node = testbed_instance._elements[guid]
+    routes = testbed_instance._add_route.get(guid)
+    
+    if routes:
+        devs = [ dev
+            for dev_guid in testbed_instance.get_connected(guid, "devs", "node")
+            for dev in ( testbed_instance._elements.get(dev_guid) ,)
+            if dev and isinstance(dev, testbed_instance._interfaces.TunIface) ]
+        
+        node.configure_routes(routes, devs)
+
 def configure_application(testbed_instance, guid):
     app = testbed_instance._elements[guid]
     
@@ -899,13 +929,17 @@ create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEP
 
 configure_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 
+# Start node after ifaces, because the node needs the ifaces in order to set up routes
+start_order = [ INTERNET, NODEIFACE, TAPIFACE, TUNIFACE, NODE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+
 factories_info = dict({
     NODE: dict({
-            "allow_routes": False,
+            "allow_routes": True,
             "help": "Virtualized Node (V-Server style)",
             "category": "topology",
             "create_function": create_node,
             "preconfigure_function": configure_node,
+            "start_function": configure_node_routes,
             "box_attributes": [
                 "forward_X11",
                 "hostname",
@@ -1102,6 +1136,10 @@ class VersionedMetadataInfo(metadata.VersionedMetadataInfo):
     def configure_order(self):
         return configure_order
 
+    @property
+    def start_order(self):
+        return start_order
+
     @property
     def factories_info(self):
         return factories_info
index 8e44f4c..1e52a51 100644 (file)
@@ -307,3 +307,46 @@ class Node(object):
             return False
     
 
+    def configure_routes(self, routes, devs):
+        """
+        Add the specified routes to the node's routing table
+        """
+        rules = []
+        
+        for route in routes:
+            for dev in devs:
+                if dev.routes_here(route):
+                    # Schedule rule
+                    dest, prefix, nexthop = route
+                    rules.append(
+                        "add %s%s gw %s %s" % (
+                            dest,
+                            (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
+                            nexthop,
+                            dev.if_name,
+                        )
+                    )
+                    
+                    # Stop checking
+                    break
+            else:
+                raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
+                    "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
+        
+        (out,err),proc = server.popen_ssh_command(
+            "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in'" % dict(
+                home = server.shell_escape(self.home_path)),
+            host = self.hostname,
+            port = None,
+            user = self.slicename,
+            agent = None,
+            ident_key = self.ident_path,
+            server_key = self.server_key,
+            stdin = '\n'.join(rules)
+            )
+        
+        if proc.wait() or err:
+            raise RuntimeError, "Could not set routes: %s%s" % (out,err)
+        
+        
+
index 9a12cec..c4dcdba 100644 (file)
@@ -31,6 +31,7 @@ class TunProtoBase(object):
         self._started = False
         self._pid = None
         self._ppid = None
+        self._if_name = None
 
     def _make_home(self):
         local = self.local()
@@ -138,7 +139,7 @@ class TunProtoBase(object):
         
         if not local_p2p and hasattr(peer, 'address'):
             local_p2p = peer.address
-        
+
         if check_proto != peer_proto:
             raise RuntimeError, "Peering protocol mismatch: %s != %s" % (check_proto, peer_proto)
         
@@ -242,6 +243,34 @@ class TunProtoBase(object):
                 
                 time.sleep(1.0)
     
+    @property
+    def if_name(self):
+        if not self._if_name:
+            # Inspect the trace to check the assigned iface
+            local = self.local()
+            if local and local.capture:
+                for spin in xrange(30):
+                    (out,err),proc = server.popen_ssh_command(
+                        "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
+                            home = server.shell_escape(self.home_path)),
+                        host = local.node.hostname,
+                        port = None,
+                        user = local.node.slicename,
+                        agent = None,
+                        ident_key = local.node.ident_path,
+                        server_key = local.node.server_key
+                        )
+                    
+                    if proc.wait():
+                        return
+                    
+                    out = out.strip()
+                    
+                    match = re.match(r"Using +tun: +([-a-zA-Z0-9]*) +.*",out)
+                    if match:
+                        self._if_name = match.group(1)
+        return self._if_name
+    
     def async_launch(self, check_proto, listen, extra_args=[]):
         if not self._launcher:
             self._launcher = threading.Thread(
index ec7a52c..3e8a4c2 100755 (executable)
@@ -22,6 +22,8 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
     
     host1 = "nepi1.pl.sophia.inria.fr"
     host2 = "nepi2.pl.sophia.inria.fr"
+    host3 = "nepi3.pl.sophia.inria.fr"
+    host4 = "nepi5.pl.sophia.inria.fr"
 
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
@@ -56,25 +58,42 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         
         return pl_desc, exp_desc
     
-    def make_pl_tapnode(self, pl):
+    def make_pl_tapnode(self, pl, ip = "192.168.2.2", inet = None, label = "node1", hostname = None):
+        if not isinstance(ip, list):
+            ips = [ip]
+        else:
+            ips = ip
         node1 = pl.create("Node")
-        node1.set_attribute_value("hostname", self.host1)
-        node1.set_attribute_value("label", "node1")
+        node1.set_attribute_value("hostname", hostname or self.host1)
+        node1.set_attribute_value("label", label)
         node1.set_attribute_value("emulation", True) # require emulation
         iface1 = pl.create("NodeInterface")
-        iface1.set_attribute_value("label", "node1iface")
-        tap1 = pl.create("TapInterface")
-        tap1.enable_trace("packets") # for error output
-        tap1.set_attribute_value("label", "node1tap")
-        inet = pl.create("Internet")
+        iface1.set_attribute_value("label", label+"iface")
+        tap1 = []
+        tap1ip = []
+        for i,ip in enumerate(ips):
+            _tap1 = pl.create("TapInterface")
+            _tap1.enable_trace("packets") # for error output
+            _tap1.set_attribute_value("label", label+"tap"+(str(i+1) if i else ""))
+        
+            _tap1ip = _tap1.add_address()
+            _tap1ip.set_attribute_value("Address", ip)
+            _tap1ip.set_attribute_value("NetPrefix", 24)
+            _tap1ip.set_attribute_value("Broadcast", False)
+        
+            node1.connector("devs").connect(_tap1.connector("node"))
+            
+            tap1.append(_tap1)
+            tap1ip.append(_tap1ip)
+            
+        inet = inet or pl.create("Internet")
         node1.connector("devs").connect(iface1.connector("node"))
-        node1.connector("devs").connect(tap1.connector("node"))
         iface1.connector("inet").connect(inet.connector("devs"))
         
-        tap1ip = tap1.add_address()
-        tap1ip.set_attribute_value("Address", "192.168.2.2")
-        tap1ip.set_attribute_value("NetPrefix", 24)
-        tap1ip.set_attribute_value("Broadcast", False)
+        if len(tap1) == 1:
+            tap1 = tap1[0]
+        if len(tap1ip) == 1:
+            tap1ip = tap1ip[0]
         
         return node1, iface1, tap1, tap1ip, inet
     
@@ -115,7 +134,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
         "Test is expensive, requires NEPI_FULL_TESTS=yes")
-    def test_ns3_in_pl(self):
+    def _test_ns3_in_pl(self):
         ns3_testbed_id = "ns3"
         ns3_testbed_version = "3_9_RC3"
         
@@ -145,7 +164,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
         "Test is expensive, requires NEPI_FULL_TESTS=yes")
-    def test_ns3_in_pl_crossconnect(self):
+    def _test_ns3_in_pl_crossconnect(self):
         pl, exp = self.make_experiment_desc()
         
         # Create PL node, ifaces, assign addresses
@@ -210,7 +229,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
         "Test is expensive, requires NEPI_FULL_TESTS=yes")
-    def test_ns3_in_pl_snat(self):
+    def _test_ns3_in_pl_snat(self):
         pl, exp = self.make_experiment_desc()
         
         # Create PL node, ifaces, assign addresses
@@ -296,6 +315,131 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
             "Unexpected trace:\n%s\n" % (
                 tap_trace,) )
 
+    def _test_ns3_in_pl_p2p(self, proto):
+        pl, exp = self.make_experiment_desc()
+        
+        # Create PL node, ifaces, assign addresses
+        node1, iface1, (tap0,tap1), (tap0ip,tap1ip), inet = self.make_pl_tapnode(pl, 
+            label="node1", hostname = self.host1,
+            ip=["192.168.2.2","192.168.2.5"])
+        node2, iface2, (tap2,tap3), (tap2ip,tap3ip), inet = self.make_pl_tapnode(pl, inet=inet, 
+            label="node2", hostname = self.host2,
+            ip=["192.168.2.6","192.168.2.9"])
+        node3, iface3, tap4, tap4ip, inet = self.make_pl_tapnode(pl, inet=inet, 
+            label="node3", hostname = self.host3,
+            ip="192.168.2.10")
+        
+        # Add NS3 support in node1
+        ns3_desc = self.make_ns_in_pl(pl, exp, node1, iface1, "tb-ns3-4-%s" % (proto,))
+        
+        # Configure P2P links
+        tap0.set_attribute_value("pointopoint", "192.168.2.1") # cross-p2p is not automatic
+        tap1.connector(proto).connect(tap2.connector(proto))
+        tap3.connector(proto).connect(tap4.connector(proto))
+        
+        # Configure routes
+        r = node1.add_route()
+        r.set_attribute_value("Destination", "192.168.2.8")
+        r.set_attribute_value("NetPrefix", 29)
+        r.set_attribute_value("NextHop", "192.168.2.6")
+
+        r = node2.add_route()
+        r.set_attribute_value("Destination", "192.168.2.0")
+        r.set_attribute_value("NetPrefix", 29)
+        r.set_attribute_value("NextHop", "192.168.2.5")
+
+        r = node3.add_route()
+        r.set_attribute_value("Destination", "192.168.2.0")
+        r.set_attribute_value("NetPrefix", 29)
+        r.set_attribute_value("NextHop", "192.168.2.9")
+        
+        # Create NS3 node that is responsive to pings, connected
+        # to node1 through the Tap interface
+        ns1 = ns3_desc.create("ns3::Node")
+        ipv41 = ns3_desc.create("ns3::Ipv4L3Protocol")
+        arp1  = ns3_desc.create("ns3::ArpL3Protocol")
+        icmp1 = ns3_desc.create("ns3::Icmpv4L4Protocol")
+        ns1.connector("protos").connect(ipv41.connector("node"))
+        ns1.connector("protos").connect(arp1.connector("node"))
+        ns1.connector("protos").connect(icmp1.connector("node"))
+        ns1if = ns3_desc.create("ns3::FileDescriptorNetDevice")
+        ns1if.enable_trace("FileDescriptorPcapTrace")
+        ns1if.set_attribute_value("label", "ns1if")
+        ns1.connector("devs").connect(ns1if.connector("node"))
+        tap0.connector("fd->").connect(ns1if.connector("->fd"))
+        ip1 = ns1if.add_address()
+        ip1.set_attribute_value("Address", "192.168.2.1")
+        ip1.set_attribute_value("NetPrefix", 30)
+        ip1.set_attribute_value("Broadcast", False)
+        
+        # Add default route to the PL node
+        r1 = ns1.add_route()
+        r1.set_attribute_value("Destination", "0.0.0.0")
+        r1.set_attribute_value("NetPrefix", 0)
+        r1.set_attribute_value("NextHop", "192.168.2.2")
+
+        # Create NS3 ping application, pinging the PL node
+        ping = ns3_desc.create("ns3::V4Ping")
+        ping.set_attribute_value("Remote", "{#[node3tap].addr[0].[Address]#}")
+        ping.set_attribute_value("StartTime", "0s")
+        ping.set_attribute_value("StopTime", "10s")
+        ping.connector("node").connect(ns1.connector("apps"))
+
+        xml = exp.to_xml()
+
+        try:
+            controller = ExperimentController(xml, self.root_dir)
+            controller.start()
+
+            while not controller.is_finished(ping.guid):
+                time.sleep(0.5)
+
+            tap_trace = []
+            for i,tap in enumerate([ tap0, tap1, tap2, tap3, tap4 ]):
+                tap_trace.append("\nTrace for tap%d:\n" % i)
+                tap_trace.append(controller.trace(pl.guid, tap.guid, "packets"))
+            tap_trace = "".join(tap_trace)
+            tap0_trace = controller.trace(pl.guid, tap0.guid, "packets")
+
+        finally:
+            controller.stop()
+            controller.shutdown()
+        
+        # asserts at the end, to make sure there's proper cleanup
+        sent = 0
+        replied = 0
+        for seq in xrange(10):
+            re_send = r""".*
+[0-9.:]* IP 192.168.2.1 > (\d*\.){3}\d*: ICMP echo request, id 0, seq %(seq)d, length \d*
+.*""" % dict(seq=seq)
+
+            re_reply = r""".*
+[0-9.:]* IP 192.168.2.1 > (\d*\.){3}\d*: ICMP echo request, id 0, seq %(seq)d, length \d*.*
+[0-9.:]* IP (\d*\.){3}\d* > 192.168.2.1: ICMP echo reply, id 0, seq %(seq)d, length \d*
+.*""" % dict(seq=seq)
+
+            sent += bool(re.match(re_send, tap0_trace, re.MULTILINE|re.DOTALL))
+            replied += bool(re.match(re_reply, tap0_trace, re.MULTILINE|re.DOTALL))
+
+        self.assertTrue(replied >= sent/2 and sent > 5,
+            "Unexpected trace:\n%s\n" % (
+                tap_trace,) )
+
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, 
+        "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+        "Test is expensive, requires NEPI_FULL_TESTS=yes")
+    def test_ns3_in_pl_p2p_udp(self):
+        self._test_ns3_in_pl_p2p("udp")
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, 
+        "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+        "Test is expensive, requires NEPI_FULL_TESTS=yes")
+    def test_ns3_in_pl_p2p_tcp(self):
+        self._test_ns3_in_pl_p2p("tcp")
+
 if __name__ == '__main__':
     unittest.main()