Initial untested implementation of ns3 tun-compliant connections (Tunchannel)
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 16 May 2011 13:34:49 +0000 (15:34 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 16 May 2011 13:34:49 +0000 (15:34 +0200)
src/nepi/testbeds/ns3/execute.py
src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py
src/nepi/testbeds/ns3/metadata_v3_9_RC3.py

index 685a47a..1dadd6a 100644 (file)
@@ -8,8 +8,176 @@ from nepi.util.constants import TIME_NOW
 import os
 import sys
 import threading
+import random
+import socket
+import weakref
+
+class TunChannel(object):
+    def __init__(self):
+        # These get initialized when the channel is configured
+        self.external_addr = None
+        
+        # These get initialized when the channel is configured
+        # They're part of the TUN standard attribute set
+        self.tun_port = None
+        self.tun_addr = None
+        
+        # These get initialized when the channel is connected to its peer
+        self.peer_proto = None
+        self.peer_addr = None
+        self.peer_port = None
+        
+        # These get initialized when the channel is connected to its iface
+        self.tun_socket = None
+
+        # same as peer proto, but for execute-time standard attribute lookups
+        self.tun_proto = None 
+        
+        # some state
+        self.prepared = False
+        self.listen = False
+        self._terminate = [] # terminate signaller
+        self._connected = threading.Event()
+        self._forwarder_thread = None
+        
+        # Generate an initial random cryptographic key to use for tunnelling
+        # Upon connection, both endpoints will agree on a common one based on
+        # this one.
+        self.tun_key = ( ''.join(map(chr, [ 
+                    r.getrandbits(8) 
+                    for i in xrange(32) 
+                    for r in (random.SystemRandom(),) ])
+                ).encode("base64").strip() )        
+        
+
+    def __str__(self):
+        return "%s<ip:%s/%s %s%s>" % (
+            self.__class__.__name__,
+            self.address, self.netprefix,
+            " up" if self.up else " down",
+            " snat" if self.snat else "",
+        )
+
+    def Prepare(self):
+        if not self.udp and self.listen and not self._forwarder_thread:
+            if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
+                self._launch()
+    
+    def Setup(self):
+        if not self._forwarder_thread:
+            self._launch()
+    
+    def Cleanup(self):
+        if self._forwarder_thread:
+            self.Kill()
+
+    def Wait(self):
+        if self._forwarder_thread:
+            self._connected.wait()
+
+    def Kill(self):    
+        if self._forwarder_thread:
+            if not self._terminate:
+                self._terminate.append(None)
+            self._forwarder_thread.join()
+
+    def _launch(self):
+        # Launch forwarder thread with a weak reference
+        # to self, so that we don't create any strong cycles
+        # and automatic refcounting works as expected
+        self._forwarder_thread = threading.Thread(
+            self._forwarder,
+            args = (weakref.ref(self),) )
+        self._forwarder_thread.start()
+    
+    @staticmethod
+    def _forwarder(weak_self):
+        import tunchannel
+        
+        # grab strong reference
+        self = weak_self()
+        if not self:
+            return
+        
+        peer_port = self.peer_port
+        peer_addr = self.peer_addr
+        peer_proto= self.peer_proto
+
+        local_port = self.tun_port
+        local_addr = self.tun_addr
+        local_proto = self.tun_proto
+        
+        if local_proto != peer_proto:
+            raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
+        
+        udp = local_proto == 'udp'
+        listen = self.listen
+
+        if (udp or not listen) and (not peer_port or not peer_addr):
+            raise RuntimeError, "Misconfigured peer for: %s" % (self,)
+
+        if (udp or listen) and (not local_port or not local_addr):
+            raise RuntimeError, "Misconfigured TUN: %s" % (self,)
+        
+        TERMINATE = self._terminate
+        cipher_key = self.tun_key
+        tun = self.tun_socket
+        
+        if not tun:
+            raise RuntimeError, "Unconnected TUN channel %s" % (self,)
+        
+        if udp:
+            # listen on udp port
+            if remaining_args and not remaining_args[0].startswith('-'):
+                rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+                rsock.bind((local_addr,local_port))
+                rsock.connect((peer_addr,peer_port))
+            remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+        elif listen:
+            # accept tcp connections
+            lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+            lsock.bind((local_addr,local_port))
+            lsock.listen(1)
+            rsock,raddr = lsock.accept()
+            remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+        else:
+            # connect to tcp server
+            rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+            for i in xrange(30):
+                try:
+                    rsock.connect((peer_addr,peer_port))
+                    break
+                except socket.error:
+                    # wait a while, retry
+                    time.sleep(1)
+            else:
+                rsock.connect((peer_addr,peer_port))
+            remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+        
+        # notify that we're ready
+        self._connected.set()
+        
+        # drop strong reference
+        del self
+        
+        tunchannel.tun_fwd(tun, remote,
+            with_pi = False, 
+            ether_mode = True, 
+            cipher_key = cipher_key, 
+            udp = udp, 
+            TERMINATE = TERMINATE,
+            stderr = open("/dev/null","w") # silence logging
+        )
+        
+        tun.close()
+        remote.close()
+
 
 class TestbedController(testbed_impl.TestbedController):
+    LOCAL_FACTORIES = {
+        'ns3::Nepi::TunChannel' : TunChannel,
+    }
+
     def __init__(self, testbed_version):
         super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
         self._ns3 = None
@@ -17,6 +185,9 @@ class TestbedController(testbed_impl.TestbedController):
         self._traces = dict()
         self._simulator_thread = None
         self._condition = None
+        
+        # local factories
+        self.TunChannel = TunChannel
 
     @property
     def home_directory(self):
@@ -54,8 +225,11 @@ class TestbedController(testbed_impl.TestbedController):
                 factory.box_attributes.is_attribute_invisible(name):
             return
         element = self._elements[guid]
-        ns3_value = self._to_ns3_value(guid, name, value) 
-        element.SetAttribute(name, ns3_value)
+        if factory_id in self.LOCAL_FACTORIES:
+            setattr(element, name, value)
+        else:
+            ns3_value = self._to_ns3_value(guid, name, value) 
+            element.SetAttribute(name, ns3_value)
 
     def get(self, guid, name, time = TIME_NOW):
         value = super(TestbedController, self).get(guid, name, time)
@@ -65,6 +239,8 @@ class TestbedController(testbed_impl.TestbedController):
         if factory.box_attributes.is_attribute_design_only(name) or \
                 factory.box_attributes.is_attribute_invisible(name):
             return value
+        if factory_id in self.LOCAL_FACTORIES:
+            return getattr(element, name)
         TypeId = self.ns3.TypeId()
         typeid = TypeId.LookupByName(factory_id)
         info = TypeId.AttributeInfo()
@@ -214,3 +390,5 @@ class TestbedController(testbed_impl.TestbedController):
                 construct_params[name] = value
         return construct_params
 
+
+
index 8fd79b1..c232886 100644 (file)
@@ -159,6 +159,11 @@ def create_ipv4protocol(testbed_instance, guid):
     static_routing = testbed_instance.ns3.Ipv4StaticRouting()
     list_routing.AddRoutingProtocol(static_routing, 1)
 
+def create_tunchannel(testbed_instance, guid):
+    element = testbed_instance.TunChannel()
+    testbed_instance._elements[guid] = element
+
+
 ### Start/Stop functions ###
 
 def start_application(testbed_instance, guid):
@@ -172,6 +177,11 @@ def stop_application(testbed_instance, guid):
     now = testbed_instance.ns3.Simulator.Now()
     element.SetStopTime(now)
 
+def wait_tunchannel(testbed_instance, guid):
+    element = testbed_instance.elements[guid]
+    element.Wait()
+
+
 ### Status functions ###
 
 def status_application(testbed_instance, guid):
@@ -241,6 +251,44 @@ def configure_device(testbed_instance, guid):
         ipv4.SetMetric(ifindex, 1)
         ipv4.SetUp(ifindex)
 
+def preconfigure_tunchannel(testbed_instance, guid):
+    element = testbed_instance._elements[guid]
+    
+    # Find external interface, if any
+    import os
+    public_addr = os.popen(
+        "/sbin/ifconfig "
+        "| grep $(ip route | grep default | awk '{print $3}' "
+                "| awk -F. '{print $1\"[.]\"$2}') "
+        "| head -1 | awk '{print $2}' "
+        "| awk -F : '{print $2}'").read().rstrip()
+    element.external_addr = public_addr
+
+    # Set standard TUN attributes
+    if (not element.tun_addr or not element.tun_port) and element.external_addr:
+        element.tun_addr = element.external_addr
+        element.tun_port = 15000 + int(guid)
+
+    # First-phase setup
+    if element.peer_proto:
+        # cross tun
+        if not element.tun_addr or not element.tun_port:
+            listening = True
+        elif not element.peer_addr or not element.peer_port:
+            listening = True
+        else:
+            # both have addresses...
+            # ...the one with the lesser address listens
+            listening = element.tun_addr < element.peer_addr
+        element.listen = listening
+        element.Prepare()
+
+def postconfigure_tunchannel(testbed_instance, guid):
+    element = testbed_instance._elements[guid]
+    
+    # Second-phase setup
+    element.Setup()
+    
 def _add_static_route(ns3, static_routing, 
         address, netprefix, nexthop_address, ifindex):
     if netprefix == 0:
@@ -658,6 +706,18 @@ factories_info = dict({
             "LinuxSocketAddress",
             "tun_proto", "tun_addr", "tun_port", "tun_key"],
         "traces": ["fdpcap"]
+    }),
+     "ns3::Nepi::TunChannel": dict({
+        "category": "Channel",
+        "create_function": create_tunchannel,
+        "preconfigure_function": preconfigure_tunchannel,
+        "configure_function": postconfigure_tunchannel,
+        "start_function": wait_tunchannel,
+        "help": "Channel to forward FileDescriptorNetDevice data to "
+                "other TAP interfaces supporting the NEPI tunneling protocol.",
+        "connector_types": ["fd->", "udp", "tcp"],
+        "allow_addresses": False,
+        "box_attributes": ["tun_proto", "tun_addr", "tun_port", "tun_key"]
     }),
      "ns3::CsmaNetDevice": dict({
         "category": "Device",
index 70c0732..0effdb0 100644 (file)
@@ -6,6 +6,7 @@ from nepi.core import metadata
 from nepi.core.attributes import Attribute
 from nepi.util import validation
 import os.path
+import functools
 
 ### Connection functions ####
 
@@ -108,6 +109,57 @@ def connect_fd(testbed_instance, fdnd_guid, cross_data):
     testbed_instance.set(fdnd_guid, "tun_port", 0)
     testbed_instance.set(fdnd_guid, "tun_key", "\xfa"*32) # unimportant, fds aren't encrypted
 
+def connect_tunchannel_fd(testbed_instance, tun_guid, fdnd_guid):
+    fdnd = testbed_instance._elements[fdnd_guid]
+    tun = testbed_instance._elements[tun_guid]
+
+    # XXX: check the method StringToBuffer of ns3::FileDescriptorNetDevice
+    # to see how the address should be decoded
+    endpoint = fdnd.GetEndpoint()
+    address = endpoint.replace(":", "").decode('hex')[2:]
+    testbed_instance.set(fdnd_guid, "LinuxSocketAddress", address)
+    
+    # Create socket pair to connect the FDND and the TunChannel with it
+    import socket
+    sock1, sock2 = socket.socketpair(
+        socket.AF_UNIX, socket.SOCK_SEQPACKET)
+
+    # Send one endpoint to the FDND
+    import passfd
+    import socket
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    sock.connect(address)
+    passfd.sendfd(sock, sock1.fileno(), '0')
+    
+    # Store a reference to the endpoint to keep the socket alive
+    fdnd._endpoint_socket = sock1
+    
+    # Send the other endpoint to the TUN channel
+    tun.tun_socket = sock2
+
+def connect_tunchannel_peer_init(testbed_instance, tun_guid, cross_data):
+    tun = testbed_instance._elements[tun_guid]
+
+def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data):
+    tun = testbed_instance._elements[tun_guid]
+    tun.peer_addr = peer_data.get("tun_addr")
+    tun.peer_proto = peer_data.get("tun_proto") or proto
+    tun.peer_port = peer_data.get("tun_port")
+    tun.tun_key = min(tun.tun_key, peer_data.get("tun_key"))
+    tun.tun_proto = proto
+    
+    preconfigure_tunchannel(testbed_instance, tun_guid)
+
+def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data):
+    # refresh (refreshable) attributes for second-phase
+    tun = testbed_instance._elements[tun_guid]
+    tun.peer_addr = peer_data.get("tun_addr")
+    tun.peer_proto = peer_data.get("tun_proto") or proto
+    tun.peer_port = peer_data.get("tun_port")
+    
+    postconfigure_tunchannel(testbed_instance, tun_guid)
+    
+
 ### Connector information ###
 
 connector_types = dict({
@@ -231,6 +283,18 @@ connector_types = dict({
                 "max": 1,
                 "min": 0
             }),
+    "tcp": dict({
+                "help": "ip-ip tunneling over TCP link", 
+                "name": "tcp",
+                "max": 1, 
+                "min": 0
+            }),
+    "udp": dict({
+                "help": "ip-ip tunneling over UDP datagrams", 
+                "name": "udp",
+                "max": 1, 
+                "min": 0
+            }),
     })
 
 connections = [
@@ -516,6 +580,26 @@ connections = [
         "init_code": connect_fd,
         "can_cross": True
     }),
+    dict({
+        "from": ( "ns3", "ns3::Nepi::TunChannel", "fd->" ),
+        "to":   ( "ns3", "ns3::FileDescriptorNetDevice", "->fd" ),
+        "init_code": connect_tunchannel_fd,
+        "can_cross": False
+    }),
+    dict({
+        "from": ( "ns3", "ns3::Nepi::TunChannel", "tcp"),
+        "to":   (None, None, "tcp"),
+        "init_code": functools.partial(crossconnect_tunchannel_peer_init,"tcp"),
+        "compl_code": functools.partial(crossconnect_tunchannel_peer_compl,"tcp"),
+        "can_cross": True
+    }),
+    dict({
+        "from": ( "ns3", "ns3::Nepi::TunChannel", "udp"),
+        "to":   (None, None, "udp"),
+        "init_code": functools.partial(crossconnect_tunchannel_peer_init,"udp"),
+        "compl_code": functools.partial(crossconnect_tunchannel_peer_compl,"udp"),
+        "can_cross": True
+    }),
 ]
 
 traces = dict({
@@ -640,6 +724,7 @@ factories_order = ["ns3::BasicEnergySource",
     "ns3::OnOffApplication",
     "ns3::VirtualNetDevice",
     "ns3::FileDescriptorNetDevice",
+    "ns3::Nepi::TunChannel",
     "ns3::TapBridge",
     "ns3::BridgeChannel",
     "ns3::BridgeNetDevice",