From: Claudio-Daniel Freire Date: Mon, 16 May 2011 14:05:59 +0000 (+0200) Subject: Refactor TunChannel implementation in ns3 to make it common to all testbeds: X-Git-Tag: nepi_v2~36 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=60d57b9f6f35ed8eb68f276035fef00b5ad1d6f6;p=nepi.git Refactor TunChannel implementation in ns3 to make it common to all testbeds: it's quite generic and reusable. --- diff --git a/src/nepi/testbeds/ns3/execute.py b/src/nepi/testbeds/ns3/execute.py index 1dadd6a2..2b10a740 100644 --- a/src/nepi/testbeds/ns3/execute.py +++ b/src/nepi/testbeds/ns3/execute.py @@ -12,166 +12,7 @@ import random import socket import weakref -class TunChannel(object): - def __init__(self): - # These get initialized when the channel is configured - self.external_addr = None - - # These get initialized when the channel is configured - # They're part of the TUN standard attribute set - self.tun_port = None - self.tun_addr = None - - # These get initialized when the channel is connected to its peer - self.peer_proto = None - self.peer_addr = None - self.peer_port = None - - # These get initialized when the channel is connected to its iface - self.tun_socket = None - - # same as peer proto, but for execute-time standard attribute lookups - self.tun_proto = None - - # some state - self.prepared = False - self.listen = False - self._terminate = [] # terminate signaller - self._connected = threading.Event() - self._forwarder_thread = None - - # Generate an initial random cryptographic key to use for tunnelling - # Upon connection, both endpoints will agree on a common one based on - # this one. - self.tun_key = ( ''.join(map(chr, [ - r.getrandbits(8) - for i in xrange(32) - for r in (random.SystemRandom(),) ]) - ).encode("base64").strip() ) - - - def __str__(self): - return "%s" % ( - self.__class__.__name__, - self.address, self.netprefix, - " up" if self.up else " down", - " snat" if self.snat else "", - ) - - def Prepare(self): - if not self.udp and self.listen and not self._forwarder_thread: - if self.listen or (self.peer_addr and self.peer_port and self.peer_proto): - self._launch() - - def Setup(self): - if not self._forwarder_thread: - self._launch() - - def Cleanup(self): - if self._forwarder_thread: - self.Kill() - - def Wait(self): - if self._forwarder_thread: - self._connected.wait() - - def Kill(self): - if self._forwarder_thread: - if not self._terminate: - self._terminate.append(None) - self._forwarder_thread.join() - - def _launch(self): - # Launch forwarder thread with a weak reference - # to self, so that we don't create any strong cycles - # and automatic refcounting works as expected - self._forwarder_thread = threading.Thread( - self._forwarder, - args = (weakref.ref(self),) ) - self._forwarder_thread.start() - - @staticmethod - def _forwarder(weak_self): - import tunchannel - - # grab strong reference - self = weak_self() - if not self: - return - - peer_port = self.peer_port - peer_addr = self.peer_addr - peer_proto= self.peer_proto - - local_port = self.tun_port - local_addr = self.tun_addr - local_proto = self.tun_proto - - if local_proto != peer_proto: - raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto) - - udp = local_proto == 'udp' - listen = self.listen - - if (udp or not listen) and (not peer_port or not peer_addr): - raise RuntimeError, "Misconfigured peer for: %s" % (self,) - - if (udp or listen) and (not local_port or not local_addr): - raise RuntimeError, "Misconfigured TUN: %s" % (self,) - - TERMINATE = self._terminate - cipher_key = self.tun_key - tun = self.tun_socket - - if not tun: - raise RuntimeError, "Unconnected TUN channel %s" % (self,) - - if udp: - # listen on udp port - if remaining_args and not remaining_args[0].startswith('-'): - rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - rsock.bind((local_addr,local_port)) - rsock.connect((peer_addr,peer_port)) - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - elif listen: - # accept tcp connections - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - lsock.bind((local_addr,local_port)) - lsock.listen(1) - rsock,raddr = lsock.accept() - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - else: - # connect to tcp server - rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - for i in xrange(30): - try: - rsock.connect((peer_addr,peer_port)) - break - except socket.error: - # wait a while, retry - time.sleep(1) - else: - rsock.connect((peer_addr,peer_port)) - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - - # notify that we're ready - self._connected.set() - - # drop strong reference - del self - - tunchannel.tun_fwd(tun, remote, - with_pi = False, - ether_mode = True, - cipher_key = cipher_key, - udp = udp, - TERMINATE = TERMINATE, - stderr = open("/dev/null","w") # silence logging - ) - - tun.close() - remote.close() - +from nepi.util.tunchannel import TunChannel class TestbedController(testbed_impl.TestbedController): LOCAL_FACTORIES = { diff --git a/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py b/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py index c232886f..b8dc66bf 100644 --- a/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py +++ b/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py @@ -159,8 +159,12 @@ def create_ipv4protocol(testbed_instance, guid): static_routing = testbed_instance.ns3.Ipv4StaticRouting() list_routing.AddRoutingProtocol(static_routing, 1) -def create_tunchannel(testbed_instance, guid): +def create_tunchannel(testbed_instance, guid, devnull = []): + if not devnull: + # just so it's not open if not needed + devnull.append(open("/dev/null","w")) element = testbed_instance.TunChannel() + element.stderr = devnull[0] # silence tracing testbed_instance._elements[guid] = element @@ -262,11 +266,10 @@ def preconfigure_tunchannel(testbed_instance, guid): "| awk -F. '{print $1\"[.]\"$2}') " "| head -1 | awk '{print $2}' " "| awk -F : '{print $2}'").read().rstrip() - element.external_addr = public_addr + element.tun_addr = public_addr # Set standard TUN attributes - if (not element.tun_addr or not element.tun_port) and element.external_addr: - element.tun_addr = element.external_addr + if not element.tun_port and element.tun_addr: element.tun_port = 15000 + int(guid) # First-phase setup diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index 1f03d4b4..9c0be650 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -3,6 +3,7 @@ import sys import os import struct import socket +import threading def ipfmt(ip): ipbytes = map(ord,ip.decode("hex")) @@ -231,3 +232,208 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr raise bkbuf += packet + + +class TunChannel(object): + """ + Helper box class that implements most of the required boilerplate + for tunnelling cross connections. + + The class implements a threaded forwarder that runs in the + testbed controller process. It takes several parameters that + can be given by directly setting attributes: + + tun_port/addr/proto: information about the local endpoint. + The addresses here should be externally-reachable, + since when listening or when using the UDP protocol, + connections to this address/port will be attempted + by remote endpoitns. + + peer_port/addr/proto: information about the remote endpoint. + Usually, you set these when the cross connection + initializer/completion functions are invoked (both). + + tun_key: the agreed upon encryption key. + + listen: if set to True (and in TCP mode), it marks a + listening endpoint. Be certain that any TCP connection + is made between a listening and a non-listening + endpoint, or it won't work. + + with_pi: set if the incoming packet stream (see tun_socket) + contains PI headers - if so, they will be stripped. + + ethernet_mode: set if the incoming packet stream is + composed of ethernet frames (as opposed of IP packets). + + tun_socket: a socket or file object that can be read + from and written to. Packets will be read when available, + remote packets will be forwarded as writes. + A socket should be of type SOCK_SEQPACKET (or SOCK_DGRAM + if not possible), a file object should preserve packet + boundaries (ie, a pipe or TUN/TAP device file descriptor). + + trace_target: a file object where trace output will be sent. + It cannot be changed after launch. + By default, it's sys.stderr + """ + + def __init__(self): + # These get initialized when the channel is configured + # They're part of the TUN standard attribute set + self.tun_port = None + self.tun_addr = None + + # These get initialized when the channel is connected to its peer + self.peer_proto = None + self.peer_addr = None + self.peer_port = None + + # These get initialized when the channel is connected to its iface + self.tun_socket = None + + # same as peer proto, but for execute-time standard attribute lookups + self.tun_proto = None + + # some state + self.prepared = False + self.listen = False + self._terminate = [] # terminate signaller + self._connected = threading.Event() + self._forwarder_thread = None + + # trace to stderr + self.stderr = sys.stderr + + # Generate an initial random cryptographic key to use for tunnelling + # Upon connection, both endpoints will agree on a common one based on + # this one. + self.tun_key = ( ''.join(map(chr, [ + r.getrandbits(8) + for i in xrange(32) + for r in (random.SystemRandom(),) ]) + ).encode("base64").strip() ) + + + def __str__(self): + return "%s" % ( + self.__class__.__name__, + self.address, self.netprefix, + " up" if self.up else " down", + " snat" if self.snat else "", + ) + + def Prepare(self): + if not self.udp and self.listen and not self._forwarder_thread: + if self.listen or (self.peer_addr and self.peer_port and self.peer_proto): + self._launch() + + def Setup(self): + if not self._forwarder_thread: + self._launch() + + def Cleanup(self): + if self._forwarder_thread: + self.Kill() + + def Wait(self): + if self._forwarder_thread: + self._connected.wait() + + def Kill(self): + if self._forwarder_thread: + if not self._terminate: + self._terminate.append(None) + self._forwarder_thread.join() + + def _launch(self): + # Launch forwarder thread with a weak reference + # to self, so that we don't create any strong cycles + # and automatic refcounting works as expected + self._forwarder_thread = threading.Thread( + self._forwarder, + args = (weakref.ref(self),) ) + self._forwarder_thread.start() + + @staticmethod + def _forwarder(weak_self): + # grab strong reference + self = weak_self() + if not self: + return + + peer_port = self.peer_port + peer_addr = self.peer_addr + peer_proto= self.peer_proto + + local_port = self.tun_port + local_addr = self.tun_addr + local_proto = self.tun_proto + + stderr = self.stderr + + if local_proto != peer_proto: + raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto) + + udp = local_proto == 'udp' + listen = self.listen + + if (udp or not listen) and (not peer_port or not peer_addr): + raise RuntimeError, "Misconfigured peer for: %s" % (self,) + + if (udp or listen) and (not local_port or not local_addr): + raise RuntimeError, "Misconfigured TUN: %s" % (self,) + + TERMINATE = self._terminate + cipher_key = self.tun_key + tun = self.tun_socket + + if not tun: + raise RuntimeError, "Unconnected TUN channel %s" % (self,) + + if udp: + # listen on udp port + if remaining_args and not remaining_args[0].startswith('-'): + rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + rsock.bind((local_addr,local_port)) + rsock.connect((peer_addr,peer_port)) + remote = os.fdopen(rsock.fileno(), 'r+b', 0) + elif listen: + # accept tcp connections + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + lsock.bind((local_addr,local_port)) + lsock.listen(1) + rsock,raddr = lsock.accept() + remote = os.fdopen(rsock.fileno(), 'r+b', 0) + else: + # connect to tcp server + rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + for i in xrange(30): + try: + rsock.connect((peer_addr,peer_port)) + break + except socket.error: + # wait a while, retry + time.sleep(1) + else: + rsock.connect((peer_addr,peer_port)) + remote = os.fdopen(rsock.fileno(), 'r+b', 0) + + # notify that we're ready + self._connected.set() + + # drop strong reference + del self + + tun_fwd(tun, remote, + with_pi = False, + ether_mode = True, + cipher_key = cipher_key, + udp = udp, + TERMINATE = TERMINATE, + stderr = stderr + ) + + tun.close() + remote.close() +