From 1ac0ed7e2649d3c590651d319a5907cab9fd7de4 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Mon, 16 May 2011 15:34:49 +0200 Subject: [PATCH] Initial untested implementation of ns3 tun-compliant connections (Tunchannel) --- src/nepi/testbeds/ns3/execute.py | 182 +++++++++++++++++- .../ns3/factories_metadata_v3_9_RC3.py | 60 ++++++ src/nepi/testbeds/ns3/metadata_v3_9_RC3.py | 85 ++++++++ 3 files changed, 325 insertions(+), 2 deletions(-) diff --git a/src/nepi/testbeds/ns3/execute.py b/src/nepi/testbeds/ns3/execute.py index 685a47af..1dadd6a2 100644 --- a/src/nepi/testbeds/ns3/execute.py +++ b/src/nepi/testbeds/ns3/execute.py @@ -8,8 +8,176 @@ from nepi.util.constants import TIME_NOW import os import sys import threading +import random +import socket +import weakref + +class TunChannel(object): + def __init__(self): + # These get initialized when the channel is configured + self.external_addr = None + + # These get initialized when the channel is configured + # They're part of the TUN standard attribute set + self.tun_port = None + self.tun_addr = None + + # These get initialized when the channel is connected to its peer + self.peer_proto = None + self.peer_addr = None + self.peer_port = None + + # These get initialized when the channel is connected to its iface + self.tun_socket = None + + # same as peer proto, but for execute-time standard attribute lookups + self.tun_proto = None + + # some state + self.prepared = False + self.listen = False + self._terminate = [] # terminate signaller + self._connected = threading.Event() + self._forwarder_thread = None + + # Generate an initial random cryptographic key to use for tunnelling + # Upon connection, both endpoints will agree on a common one based on + # this one. + self.tun_key = ( ''.join(map(chr, [ + r.getrandbits(8) + for i in xrange(32) + for r in (random.SystemRandom(),) ]) + ).encode("base64").strip() ) + + + def __str__(self): + return "%s" % ( + self.__class__.__name__, + self.address, self.netprefix, + " up" if self.up else " down", + " snat" if self.snat else "", + ) + + def Prepare(self): + if not self.udp and self.listen and not self._forwarder_thread: + if self.listen or (self.peer_addr and self.peer_port and self.peer_proto): + self._launch() + + def Setup(self): + if not self._forwarder_thread: + self._launch() + + def Cleanup(self): + if self._forwarder_thread: + self.Kill() + + def Wait(self): + if self._forwarder_thread: + self._connected.wait() + + def Kill(self): + if self._forwarder_thread: + if not self._terminate: + self._terminate.append(None) + self._forwarder_thread.join() + + def _launch(self): + # Launch forwarder thread with a weak reference + # to self, so that we don't create any strong cycles + # and automatic refcounting works as expected + self._forwarder_thread = threading.Thread( + self._forwarder, + args = (weakref.ref(self),) ) + self._forwarder_thread.start() + + @staticmethod + def _forwarder(weak_self): + import tunchannel + + # grab strong reference + self = weak_self() + if not self: + return + + peer_port = self.peer_port + peer_addr = self.peer_addr + peer_proto= self.peer_proto + + local_port = self.tun_port + local_addr = self.tun_addr + local_proto = self.tun_proto + + if local_proto != peer_proto: + raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto) + + udp = local_proto == 'udp' + listen = self.listen + + if (udp or not listen) and (not peer_port or not peer_addr): + raise RuntimeError, "Misconfigured peer for: %s" % (self,) + + if (udp or listen) and (not local_port or not local_addr): + raise RuntimeError, "Misconfigured TUN: %s" % (self,) + + TERMINATE = self._terminate + cipher_key = self.tun_key + tun = self.tun_socket + + if not tun: + raise RuntimeError, "Unconnected TUN channel %s" % (self,) + + if udp: + # listen on udp port + if remaining_args and not remaining_args[0].startswith('-'): + rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + rsock.bind((local_addr,local_port)) + rsock.connect((peer_addr,peer_port)) + remote = os.fdopen(rsock.fileno(), 'r+b', 0) + elif listen: + # accept tcp connections + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + lsock.bind((local_addr,local_port)) + lsock.listen(1) + rsock,raddr = lsock.accept() + remote = os.fdopen(rsock.fileno(), 'r+b', 0) + else: + # connect to tcp server + rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + for i in xrange(30): + try: + rsock.connect((peer_addr,peer_port)) + break + except socket.error: + # wait a while, retry + time.sleep(1) + else: + rsock.connect((peer_addr,peer_port)) + remote = os.fdopen(rsock.fileno(), 'r+b', 0) + + # notify that we're ready + self._connected.set() + + # drop strong reference + del self + + tunchannel.tun_fwd(tun, remote, + with_pi = False, + ether_mode = True, + cipher_key = cipher_key, + udp = udp, + TERMINATE = TERMINATE, + stderr = open("/dev/null","w") # silence logging + ) + + tun.close() + remote.close() + class TestbedController(testbed_impl.TestbedController): + LOCAL_FACTORIES = { + 'ns3::Nepi::TunChannel' : TunChannel, + } + def __init__(self, testbed_version): super(TestbedController, self).__init__(TESTBED_ID, testbed_version) self._ns3 = None @@ -17,6 +185,9 @@ class TestbedController(testbed_impl.TestbedController): self._traces = dict() self._simulator_thread = None self._condition = None + + # local factories + self.TunChannel = TunChannel @property def home_directory(self): @@ -54,8 +225,11 @@ class TestbedController(testbed_impl.TestbedController): factory.box_attributes.is_attribute_invisible(name): return element = self._elements[guid] - ns3_value = self._to_ns3_value(guid, name, value) - element.SetAttribute(name, ns3_value) + if factory_id in self.LOCAL_FACTORIES: + setattr(element, name, value) + else: + ns3_value = self._to_ns3_value(guid, name, value) + element.SetAttribute(name, ns3_value) def get(self, guid, name, time = TIME_NOW): value = super(TestbedController, self).get(guid, name, time) @@ -65,6 +239,8 @@ class TestbedController(testbed_impl.TestbedController): if factory.box_attributes.is_attribute_design_only(name) or \ factory.box_attributes.is_attribute_invisible(name): return value + if factory_id in self.LOCAL_FACTORIES: + return getattr(element, name) TypeId = self.ns3.TypeId() typeid = TypeId.LookupByName(factory_id) info = TypeId.AttributeInfo() @@ -214,3 +390,5 @@ class TestbedController(testbed_impl.TestbedController): construct_params[name] = value return construct_params + + diff --git a/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py b/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py index 8fd79b1e..c232886f 100644 --- a/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py +++ b/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py @@ -159,6 +159,11 @@ def create_ipv4protocol(testbed_instance, guid): static_routing = testbed_instance.ns3.Ipv4StaticRouting() list_routing.AddRoutingProtocol(static_routing, 1) +def create_tunchannel(testbed_instance, guid): + element = testbed_instance.TunChannel() + testbed_instance._elements[guid] = element + + ### Start/Stop functions ### def start_application(testbed_instance, guid): @@ -172,6 +177,11 @@ def stop_application(testbed_instance, guid): now = testbed_instance.ns3.Simulator.Now() element.SetStopTime(now) +def wait_tunchannel(testbed_instance, guid): + element = testbed_instance.elements[guid] + element.Wait() + + ### Status functions ### def status_application(testbed_instance, guid): @@ -241,6 +251,44 @@ def configure_device(testbed_instance, guid): ipv4.SetMetric(ifindex, 1) ipv4.SetUp(ifindex) +def preconfigure_tunchannel(testbed_instance, guid): + element = testbed_instance._elements[guid] + + # Find external interface, if any + import os + public_addr = os.popen( + "/sbin/ifconfig " + "| grep $(ip route | grep default | awk '{print $3}' " + "| awk -F. '{print $1\"[.]\"$2}') " + "| head -1 | awk '{print $2}' " + "| awk -F : '{print $2}'").read().rstrip() + element.external_addr = public_addr + + # Set standard TUN attributes + if (not element.tun_addr or not element.tun_port) and element.external_addr: + element.tun_addr = element.external_addr + element.tun_port = 15000 + int(guid) + + # First-phase setup + if element.peer_proto: + # cross tun + if not element.tun_addr or not element.tun_port: + listening = True + elif not element.peer_addr or not element.peer_port: + listening = True + else: + # both have addresses... + # ...the one with the lesser address listens + listening = element.tun_addr < element.peer_addr + element.listen = listening + element.Prepare() + +def postconfigure_tunchannel(testbed_instance, guid): + element = testbed_instance._elements[guid] + + # Second-phase setup + element.Setup() + def _add_static_route(ns3, static_routing, address, netprefix, nexthop_address, ifindex): if netprefix == 0: @@ -658,6 +706,18 @@ factories_info = dict({ "LinuxSocketAddress", "tun_proto", "tun_addr", "tun_port", "tun_key"], "traces": ["fdpcap"] + }), + "ns3::Nepi::TunChannel": dict({ + "category": "Channel", + "create_function": create_tunchannel, + "preconfigure_function": preconfigure_tunchannel, + "configure_function": postconfigure_tunchannel, + "start_function": wait_tunchannel, + "help": "Channel to forward FileDescriptorNetDevice data to " + "other TAP interfaces supporting the NEPI tunneling protocol.", + "connector_types": ["fd->", "udp", "tcp"], + "allow_addresses": False, + "box_attributes": ["tun_proto", "tun_addr", "tun_port", "tun_key"] }), "ns3::CsmaNetDevice": dict({ "category": "Device", diff --git a/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py b/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py index 70c0732f..0effdb09 100644 --- a/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py +++ b/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py @@ -6,6 +6,7 @@ from nepi.core import metadata from nepi.core.attributes import Attribute from nepi.util import validation import os.path +import functools ### Connection functions #### @@ -108,6 +109,57 @@ def connect_fd(testbed_instance, fdnd_guid, cross_data): testbed_instance.set(fdnd_guid, "tun_port", 0) testbed_instance.set(fdnd_guid, "tun_key", "\xfa"*32) # unimportant, fds aren't encrypted +def connect_tunchannel_fd(testbed_instance, tun_guid, fdnd_guid): + fdnd = testbed_instance._elements[fdnd_guid] + tun = testbed_instance._elements[tun_guid] + + # XXX: check the method StringToBuffer of ns3::FileDescriptorNetDevice + # to see how the address should be decoded + endpoint = fdnd.GetEndpoint() + address = endpoint.replace(":", "").decode('hex')[2:] + testbed_instance.set(fdnd_guid, "LinuxSocketAddress", address) + + # Create socket pair to connect the FDND and the TunChannel with it + import socket + sock1, sock2 = socket.socketpair( + socket.AF_UNIX, socket.SOCK_SEQPACKET) + + # Send one endpoint to the FDND + import passfd + import socket + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.connect(address) + passfd.sendfd(sock, sock1.fileno(), '0') + + # Store a reference to the endpoint to keep the socket alive + fdnd._endpoint_socket = sock1 + + # Send the other endpoint to the TUN channel + tun.tun_socket = sock2 + +def connect_tunchannel_peer_init(testbed_instance, tun_guid, cross_data): + tun = testbed_instance._elements[tun_guid] + +def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data): + tun = testbed_instance._elements[tun_guid] + tun.peer_addr = peer_data.get("tun_addr") + tun.peer_proto = peer_data.get("tun_proto") or proto + tun.peer_port = peer_data.get("tun_port") + tun.tun_key = min(tun.tun_key, peer_data.get("tun_key")) + tun.tun_proto = proto + + preconfigure_tunchannel(testbed_instance, tun_guid) + +def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data): + # refresh (refreshable) attributes for second-phase + tun = testbed_instance._elements[tun_guid] + tun.peer_addr = peer_data.get("tun_addr") + tun.peer_proto = peer_data.get("tun_proto") or proto + tun.peer_port = peer_data.get("tun_port") + + postconfigure_tunchannel(testbed_instance, tun_guid) + + ### Connector information ### connector_types = dict({ @@ -231,6 +283,18 @@ connector_types = dict({ "max": 1, "min": 0 }), + "tcp": dict({ + "help": "ip-ip tunneling over TCP link", + "name": "tcp", + "max": 1, + "min": 0 + }), + "udp": dict({ + "help": "ip-ip tunneling over UDP datagrams", + "name": "udp", + "max": 1, + "min": 0 + }), }) connections = [ @@ -516,6 +580,26 @@ connections = [ "init_code": connect_fd, "can_cross": True }), + dict({ + "from": ( "ns3", "ns3::Nepi::TunChannel", "fd->" ), + "to": ( "ns3", "ns3::FileDescriptorNetDevice", "->fd" ), + "init_code": connect_tunchannel_fd, + "can_cross": False + }), + dict({ + "from": ( "ns3", "ns3::Nepi::TunChannel", "tcp"), + "to": (None, None, "tcp"), + "init_code": functools.partial(crossconnect_tunchannel_peer_init,"tcp"), + "compl_code": functools.partial(crossconnect_tunchannel_peer_compl,"tcp"), + "can_cross": True + }), + dict({ + "from": ( "ns3", "ns3::Nepi::TunChannel", "udp"), + "to": (None, None, "udp"), + "init_code": functools.partial(crossconnect_tunchannel_peer_init,"udp"), + "compl_code": functools.partial(crossconnect_tunchannel_peer_compl,"udp"), + "can_cross": True + }), ] traces = dict({ @@ -640,6 +724,7 @@ factories_order = ["ns3::BasicEnergySource", "ns3::OnOffApplication", "ns3::VirtualNetDevice", "ns3::FileDescriptorNetDevice", + "ns3::Nepi::TunChannel", "ns3::TapBridge", "ns3::BridgeChannel", "ns3::BridgeNetDevice", -- 2.47.0