NS3-in-PL fixes all over the place.
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 6 May 2011 16:46:41 +0000 (18:46 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 6 May 2011 16:46:41 +0000 (18:46 +0200)
Cross-connection fixes all over the place too.
Still couldn't get cross connections working, but that's specific to FileDescriptorNetDevice

src/nepi/core/execute.py
src/nepi/testbeds/ns3/metadata_v3_9_RC3.py
src/nepi/testbeds/planetlab/application.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/node.py
test/testbeds/planetlab/integration_ns3.py

index 980053c..b6094c6 100644 (file)
@@ -12,8 +12,9 @@ import threading
 import ConfigParser
 import os
 import collections
+import functools
 
-ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
+ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
 ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
 COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
 
@@ -336,11 +337,24 @@ class ExperimentController(object):
 
     @staticmethod
     def _parallel(callables):
-        threads = [ threading.Thread(target=callable) for callable in callables ]
+        excs = []
+        def wrap(callable):
+            @functools.wraps(callable)
+            def wrapped(*p, **kw):
+                try:
+                    callable(*p, **kw)
+                except Exception,e:
+                    import traceback
+                    traceback.print_exc(file=sys.stderr)
+                    excs.append(e)
+            return wrapped
+        threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
         for thread in threads:
             thread.start()
         for thread in threads:
             thread.join()
+        for exc in excs:
+            raise exc
 
     def start(self):
         parser = XmlExperimentParser()
@@ -396,6 +410,8 @@ class ExperimentController(object):
         # final netref step, fail if anything's left unresolved
         self.do_netrefs(data, fail_if_undefined=True)
         
+        self._program_testbed_cross_connections(data)
+        
         # perform do_configure in parallel for al testbeds
         # (it's internal configuration for each)
         self._parallel([testbed.do_configure
@@ -530,13 +546,13 @@ class ExperimentController(object):
                 ref_guid = int(label[5:])
                 if ref_guid:
                     expr = match.group("expr")
-                    component = match.group("component")[1:] # skip the dot
+                    component = (match.group("component") or "")[1:] # skip the dot
                     attribute = match.group("attribute")
                     
                     # split compound components into component kind and index
                     # eg: 'addr[0]' -> ('addr', '0')
                     component, component_index = self._netref_component_split(component)
-                    
+
                     # find object and resolve expression
                     for ref_testbed in self._testbeds.itervalues():
                         if component not in self._NETREF_COMPONENT_GETTERS:
@@ -675,40 +691,56 @@ class ExperimentController(object):
     def _program_testbed_controllers(self, element_guids, data):
         for guid in element_guids:
             (testbed_guid, factory_id) = data.get_box_data(guid)
-            testbed = self._testbeds[testbed_guid]
-            testbed.defer_create(guid, factory_id)
-            for (name, value) in data.get_attribute_data(guid):
-                testbed.defer_create_set(guid, name, value)
+            testbed = self._testbeds.get(testbed_guid)
+            if testbed:
+                testbed.defer_create(guid, factory_id)
+                for (name, value) in data.get_attribute_data(guid):
+                    testbed.defer_create_set(guid, name, value)
 
         for guid in element_guids: 
             (testbed_guid, factory_id) = data.get_box_data(guid)
-            testbed = self._testbeds[testbed_guid]
-            for (connector_type_name, cross_guid, cross_connector_type_name) \
-                    in data.get_connection_data(guid):
-                (testbed_guid, factory_id) = data.get_box_data(guid)
-                (cross_testbed_guid, cross_factory_id) = data.get_box_data(
-                        cross_guid)
-                if testbed_guid == cross_testbed_guid:
-                    testbed.defer_connect(guid, connector_type_name, 
-                            cross_guid, cross_connector_type_name)
-                else: 
-                    cross_testbed = self._testbeds[cross_testbed_guid]
-                    cross_testbed_id = cross_testbed.testbed_id
-                    testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
-                            cross_testbed_guid, cross_testbed_id, cross_factory_id, 
-                            cross_connector_type_name)
-                    # save cross data for later
-                    self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
+            testbed = self._testbeds.get(testbed_guid)
+            if testbed:
+                for (connector_type_name, cross_guid, cross_connector_type_name) \
+                        in data.get_connection_data(guid):
+                    (testbed_guid, factory_id) = data.get_box_data(guid)
+                    (cross_testbed_guid, cross_factory_id) = data.get_box_data(
                             cross_guid)
-            for trace_id in data.get_trace_data(guid):
-                testbed.defer_add_trace(guid, trace_id)
-            for (autoconf, address, netprefix, broadcast) in \
-                    data.get_address_data(guid):
-                if address != None:
-                    testbed.defer_add_address(guid, address, netprefix, 
-                            broadcast)
-            for (destination, netprefix, nexthop) in data.get_route_data(guid):
-                testbed.defer_add_route(guid, destination, netprefix, nexthop)
+                    if testbed_guid == cross_testbed_guid:
+                        testbed.defer_connect(guid, connector_type_name, 
+                                cross_guid, cross_connector_type_name)
+                for trace_id in data.get_trace_data(guid):
+                    testbed.defer_add_trace(guid, trace_id)
+                for (autoconf, address, netprefix, broadcast) in \
+                        data.get_address_data(guid):
+                    if address != None:
+                        testbed.defer_add_address(guid, address, netprefix, 
+                                broadcast)
+                for (destination, netprefix, nexthop) in data.get_route_data(guid):
+                    testbed.defer_add_route(guid, destination, netprefix, nexthop)
+    
+    def _program_testbed_cross_connections(self, data):
+        data_guids = data.guids
+
+        for guid in data_guids: 
+            if not data.is_testbed_data(guid):
+                (testbed_guid, factory_id) = data.get_box_data(guid)
+                testbed = self._testbeds.get(testbed_guid)
+                if testbed:
+                    for (connector_type_name, cross_guid, cross_connector_type_name) \
+                            in data.get_connection_data(guid):
+                        (testbed_guid, factory_id) = data.get_box_data(guid)
+                        (cross_testbed_guid, cross_factory_id) = data.get_box_data(
+                                cross_guid)
+                        if testbed_guid != cross_testbed_guid:
+                            cross_testbed = self._testbeds[cross_testbed_guid]
+                            cross_testbed_id = cross_testbed.testbed_id
+                            testbed.defer_cross_connect(guid, connector_type_name, cross_guid, 
+                                    cross_testbed_guid, cross_testbed_id, cross_factory_id, 
+                                    cross_connector_type_name)
+                            # save cross data for later
+                            self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
+                                    cross_guid)
                 
     def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
         if testbed_guid not in self._cross_data:
index 03bc41b..472b6a3 100644 (file)
@@ -5,6 +5,7 @@ from constants import TESTBED_ID
 from nepi.core import metadata
 from nepi.core.attributes import Attribute
 from nepi.util import validation
+import os.path
 
 ### Connection functions ####
 
@@ -98,10 +99,11 @@ def connect_fd(testbed_instance, fdnd_guid, cross_data):
     testbed_instance.set(fdnd_guid, "LinuxSocketAddress", address)
     
     # Set tun standard contract attributes
-    testbed_instance.set(fdnd_guid, "tun_addr", address)
+    testbed_instance.set(fdnd_guid, "tun_addr", 
+        os.path.join( testbed_instance.root_directory, address ) )
     testbed_instance.set(fdnd_guid, "tun_proto", "fd")
     testbed_instance.set(fdnd_guid, "tun_port", 0)
-    testbed_instance.set(fdnd_guid, "tun_key", "\xff"*32) # unimportant, fds aren't encrypted
+    testbed_instance.set(fdnd_guid, "tun_key", "\xfa"*32) # unimportant, fds aren't encrypted
 
 ### Connector information ###
 
index d22e3a3..db623e7 100644 (file)
@@ -529,7 +529,7 @@ class NS3Dependency(Dependency):
                      "mv -f ${BUILD}/target/lib/python*/site-packages/pybindgen ${BUILD}/target/. && "
                      "rm -rf ${BUILD}/target/lib && "
                      "cd ../ns3-src && "
-                     "./waf configure --prefix=${BUILD}/target -d release --disable-examples && "
+                     "./waf configure --prefix=${BUILD}/target -d release --disable-examples --high-precision-as-double && "
                      "./waf &&"
                      "./waf install && "
                      "./waf clean"
index 30895b8..47e9585 100644 (file)
@@ -34,15 +34,6 @@ class NodeIface(object):
 
         # These get initialized when the iface is connected to the internet
         self.has_internet = False
-        
-        # Generate an initial random cryptographic key to use for tunnelling
-        # Upon connection, both endpoints will agree on a common one based on
-        # this one.
-        self.tun_key = ( ''.join(map(chr, [ 
-                    r.getrandbits(8) 
-                    for i in xrange(32) 
-                    for r in (random.SystemRandom(),) ])
-                ).encode("base64").strip() )        
 
     def __str__(self):
         return "%s<ip:%s/%s up mac:%s>" % (
@@ -132,7 +123,6 @@ class TunIface(object):
         # They're part of the TUN standard attribute set
         self.tun_port = None
         self.tun_addr = None
-        self.tun_key = None
         
         # These get initialized when the iface is connected to its peer
         self.peer_iface = None
@@ -141,6 +131,17 @@ class TunIface(object):
 
         # same as peer proto, but for execute-time standard attribute lookups
         self.tun_proto = None 
+        
+        
+        # Generate an initial random cryptographic key to use for tunnelling
+        # Upon connection, both endpoints will agree on a common one based on
+        # this one.
+        self.tun_key = ( ''.join(map(chr, [ 
+                    r.getrandbits(8) 
+                    for i in xrange(32) 
+                    for r in (random.SystemRandom(),) ])
+                ).encode("base64").strip() )        
+        
 
     def __str__(self):
         return "%s<ip:%s/%s %s%s>" % (
@@ -177,7 +178,7 @@ class TunIface(object):
     def prepare(self, home_path, listening):
         if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
             # Ad-hoc peer_iface
-            self.peer_iface = CrossIface(
+            self.peer_iface = _CrossIface(
                 self.peer_proto,
                 self.peer_addr,
                 self.peer_port)
index 62e39fa..e1ffe08 100644 (file)
@@ -287,7 +287,7 @@ def preconfigure_tuniface(testbed_instance, guid):
     element = testbed_instance._elements[guid]
     
     # Set custom addresses if any
-    if guid in testbed_instance._add_address:
+    if guid in testbed_instance._add_address and not (element.address or element.netmask or element.netprefix):
         addresses = testbed_instance._add_address[guid]
         for address in addresses:
             (address, netprefix, broadcast) = address
@@ -790,13 +790,6 @@ attributes = dict({
                 "flags": Attribute.DesignOnly,
                 "validation_function": validation.is_string
             }),
-    ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP: dict({
-                "name": ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
-                "help": "Commands to set up the environment needed to run NEPI testbeds",
-                "type": Attribute.STRING,
-                "flags": Attribute.Invisible | Attribute.ReadOnly,
-                "validation_function": validation.is_string
-            }),
     
     "netpipe_mode": dict({      
                 "name": "mode",
@@ -973,7 +966,7 @@ factories_info = dict({
             "help": "Requirement for package or application to be installed on some node",
             "category": "applications",
             "create_function": create_dependency,
-            "configure_function": configure_dependency,
+            "preconfigure_function": configure_dependency,
             "box_attributes": ["depends", "build-depends", "build", "install",
                                "sources" ],
             "connector_types": ["node"],
@@ -983,7 +976,7 @@ factories_info = dict({
             "help": "Requirement for NEPI inside NEPI - required to run testbed instances inside a node",
             "category": "applications",
             "create_function": create_nepi_dependency,
-            "configure_function": configure_dependency,
+            "preconfigure_function": configure_dependency,
             "box_attributes": [ ],
             "connector_types": ["node"],
             "traces": ["buildlog"]
@@ -992,7 +985,7 @@ factories_info = dict({
             "help": "Requirement for NS3 inside NEPI - required to run NS3 testbed instances inside a node. It also needs NepiDependency.",
             "category": "applications",
             "create_function": create_ns3_dependency,
-            "configure_function": configure_dependency,
+            "preconfigure_function": configure_dependency,
             "box_attributes": [ ],
             "connector_types": ["node"],
             "traces": ["buildlog"]
index 73593af..8e44f4c 100644 (file)
@@ -72,17 +72,16 @@ class Node(object):
     @property
     def _nepi_testbed_environment_setup(self):
         command = cStringIO.StringIO()
-        command.write('PYTHONPATH=$PYTHONPATH:%s' % (
+        command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
         ))
-        command.write(' PATH=$PATH:%s' % (
+        command.write(' ; export PATH=$PATH:%s' % (
             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
         ))
-        if self.node.env:
-            for envkey, envvals in self.node.env.iteritems():
+        if self.env:
+            for envkey, envvals in self.env.iteritems():
                 for envval in envvals:
-                    command.write(' %s=%s' % (envkey, envval))
-        command.write(self.command)
+                    command.write(' ; export %s=%s' % (envkey, envval))
         return command.getvalue()
     
     def build_filters(self, target_filters, filter_map):
index c086f07..d40de81 100755 (executable)
@@ -72,7 +72,7 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
 
         ns3_provider = FactoriesProvider(ns3_testbed_id, ns3_testbed_version)
         ns3_desc = exp.add_testbed_description(ns3_provider)
-        ns3_desc.set_attribute_value("homeDirectory", "tb-ns3")
+        ns3_desc.set_attribute_value("rootDirectory", "tb-ns3")
         ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[node1iface].addr[0].[Address]#}")
         ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER, 
             pl.get_attribute_value("slice"))
@@ -91,6 +91,108 @@ class PlanetLabCrossIntegrationTestCase(unittest.TestCase):
         controller.stop()
         controller.shutdown()
 
+    @test_util.skipUnless(test_util.pl_auth() is not None, 
+        "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+        "Test is expensive, requires NEPI_FULL_TESTS=yes")
+    def test_ns3_in_pl_crossconnect(self):
+        ns3_testbed_id = "ns3"
+        ns3_testbed_version = "3_9_RC3"
+        
+        pl, exp = self.make_experiment_desc()
+        
+        # Create PL node, ifaces, assign addresses
+        node1 = pl.create("Node")
+        node1.set_attribute_value("hostname", "onelab11.pl.sophia.inria.fr")
+        node1.set_attribute_value("label", "node1")
+        node1.set_attribute_value("emulation", True) # require emulation
+        iface1 = pl.create("NodeInterface")
+        iface1.set_attribute_value("label", "node1iface")
+        tap1 = pl.create("TapInterface")
+        tap1.enable_trace("packets") # for error output
+        tap1.set_attribute_value("label", "node1tap")
+        inet = pl.create("Internet")
+        node1.connector("devs").connect(iface1.connector("node"))
+        node1.connector("devs").connect(tap1.connector("node"))
+        iface1.connector("inet").connect(inet.connector("devs"))
+        
+        tap1ip = tap1.add_address()
+        tap1ip.set_attribute_value("Address", "192.168.2.2")
+        tap1ip.set_attribute_value("NetPrefix", 24)
+        tap1ip.set_attribute_value("Broadcast", False)
+        
+        # Add NS3 support in node1
+        plnepi = pl.create("NepiDependency")
+        plns3 = pl.create("NS3Dependency")
+        plnepi.connector("node").connect(node1.connector("deps"))
+        plns3.connector("node").connect(node1.connector("deps"))
+
+        # Create NS3 testbed running in node1
+        ns3_provider = FactoriesProvider(ns3_testbed_id, ns3_testbed_version)
+        ns3_desc = exp.add_testbed_description(ns3_provider)
+        ns3_desc.set_attribute_value("rootDirectory", "tb-ns3")
+        ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[node1iface].addr[0].[Address]#}")
+        ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER, 
+            pl.get_attribute_value("slice"))
+        ns3_desc.set_attribute_value(DC.DEPLOYMENT_KEY, 
+            pl.get_attribute_value("sliceSSHKey"))
+        ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+        ns3_desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+        ns3_desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, 
+            "{#[node1].[%s]#}" % (ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,))
+
+        
+        # Create NS3 node that is responsive to pings, connected
+        # to node1 through the Tap interface
+        ns1 = ns3_desc.create("ns3::Node")
+        ipv41 = ns3_desc.create("ns3::Ipv4L3Protocol")
+        arp1  = ns3_desc.create("ns3::ArpL3Protocol")
+        icmp1 = ns3_desc.create("ns3::Icmpv4L4Protocol")
+        ns1.connector("protos").connect(ipv41.connector("node"))
+        ns1.connector("protos").connect(arp1.connector("node"))
+        ns1.connector("protos").connect(icmp1.connector("node"))
+        ns1if = ns3_desc.create("ns3::FileDescriptorNetDevice")
+        ns1if.set_attribute_value("label", "ns1if")
+        ns1.connector("devs").connect(ns1if.connector("node"))
+        tap1.connector("fd->").connect(ns1if.connector("->fd"))
+        ip1 = ns1if.add_address()
+        ip1.set_attribute_value("Address", "192.168.2.3")
+        ip1.set_attribute_value("NetPrefix", 24)
+        ip1.set_attribute_value("Broadcast", False)
+
+        # Create PlanetLab ping application, pinging the NS3 node
+        ping = pl.create("Application")
+        ping.set_attribute_value("command", "ping -qc1 {#[GUID-8].addr[0].[Address]#}")
+        ping.enable_trace("stdout")
+        ping.enable_trace("stderr")
+        ping.connector("node").connect(node1.connector("apps"))
+
+        comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+
+--- .* ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
+"""
+
+        xml = exp.to_xml()
+
+        controller = ExperimentController(xml, self.root_dir)
+        controller.start()
+
+        while not controller.is_finished(ping.guid):
+            time.sleep(0.5)
+          
+        ping_result = controller.trace(pl.guid, ping.guid, "stdout")
+        tap_trace = controller.trace(pl.guid, tap1.guid, "packets")
+
+        controller.stop()
+        controller.shutdown()
+
+        # asserts at the end, to make sure there's proper cleanup
+        self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+            "Unexpected trace:\n%s\nTap trace:\n%s\n" % (
+                ping_result,
+                tap_trace) )
+
 if __name__ == '__main__':
     unittest.main()