From ef3794f8cbedf1622fe0181d841185138d80a0a2 Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Mon, 16 May 2011 16:21:10 +0200 Subject: [PATCH] Refactor a bit more, connect, wait, create and configure functions are also general enough to be put in a shared utility module. Move that code to tunchannel_impl, to separate it from the core tunchannel stuff (which may be required if that generic implementation isn't applicable for some - ie, for tun_connect in PL). --- src/nepi/testbeds/ns3/execute.py | 7 +- .../ns3/factories_metadata_v3_9_RC3.py | 54 +---- src/nepi/testbeds/ns3/metadata_v3_9_RC3.py | 27 +-- src/nepi/util/tunchannel.py | 203 ------------------ 4 files changed, 12 insertions(+), 279 deletions(-) diff --git a/src/nepi/testbeds/ns3/execute.py b/src/nepi/testbeds/ns3/execute.py index 2b10a740..94f43355 100644 --- a/src/nepi/testbeds/ns3/execute.py +++ b/src/nepi/testbeds/ns3/execute.py @@ -12,9 +12,9 @@ import random import socket import weakref -from nepi.util.tunchannel import TunChannel - class TestbedController(testbed_impl.TestbedController): + from nepi.util.tunchannel_impl import TunChannel + LOCAL_FACTORIES = { 'ns3::Nepi::TunChannel' : TunChannel, } @@ -26,9 +26,6 @@ class TestbedController(testbed_impl.TestbedController): self._traces = dict() self._simulator_thread = None self._condition = None - - # local factories - self.TunChannel = TunChannel @property def home_directory(self): diff --git a/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py b/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py index b8dc66bf..2889a1b6 100644 --- a/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py +++ b/src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py @@ -4,6 +4,11 @@ from nepi.util.constants import AF_INET, STATUS_NOT_STARTED, STATUS_RUNNING, \ STATUS_FINISHED, STATUS_UNDETERMINED +from nepi.util.tunchannel_impl import \ + preconfigure_tunchannel, postconfigure_tunchannel, \ + wait_tunchannel, create_tunchannel + + def _get_ipv4_protocol_guid(testbed_instance, node_guid): # search for the Ipv4L3Protocol asociated with the device protos_guids = testbed_instance.get_connected(node_guid, "protos", "node") @@ -159,14 +164,6 @@ def create_ipv4protocol(testbed_instance, guid): static_routing = testbed_instance.ns3.Ipv4StaticRouting() list_routing.AddRoutingProtocol(static_routing, 1) -def create_tunchannel(testbed_instance, guid, devnull = []): - if not devnull: - # just so it's not open if not needed - devnull.append(open("/dev/null","w")) - element = testbed_instance.TunChannel() - element.stderr = devnull[0] # silence tracing - testbed_instance._elements[guid] = element - ### Start/Stop functions ### @@ -181,10 +178,6 @@ def stop_application(testbed_instance, guid): now = testbed_instance.ns3.Simulator.Now() element.SetStopTime(now) -def wait_tunchannel(testbed_instance, guid): - element = testbed_instance.elements[guid] - element.Wait() - ### Status functions ### @@ -254,43 +247,6 @@ def configure_device(testbed_instance, guid): ipv4.AddAddress(ifindex, inaddr) ipv4.SetMetric(ifindex, 1) ipv4.SetUp(ifindex) - -def preconfigure_tunchannel(testbed_instance, guid): - element = testbed_instance._elements[guid] - - # Find external interface, if any - import os - public_addr = os.popen( - "/sbin/ifconfig " - "| grep $(ip route | grep default | awk '{print $3}' " - "| awk -F. '{print $1\"[.]\"$2}') " - "| head -1 | awk '{print $2}' " - "| awk -F : '{print $2}'").read().rstrip() - element.tun_addr = public_addr - - # Set standard TUN attributes - if not element.tun_port and element.tun_addr: - element.tun_port = 15000 + int(guid) - - # First-phase setup - if element.peer_proto: - # cross tun - if not element.tun_addr or not element.tun_port: - listening = True - elif not element.peer_addr or not element.peer_port: - listening = True - else: - # both have addresses... - # ...the one with the lesser address listens - listening = element.tun_addr < element.peer_addr - element.listen = listening - element.Prepare() - -def postconfigure_tunchannel(testbed_instance, guid): - element = testbed_instance._elements[guid] - - # Second-phase setup - element.Setup() def _add_static_route(ns3, static_routing, address, netprefix, nexthop_address, ifindex): diff --git a/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py b/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py index 0effdb09..96f30e62 100644 --- a/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py +++ b/src/nepi/testbeds/ns3/metadata_v3_9_RC3.py @@ -8,6 +8,11 @@ from nepi.util import validation import os.path import functools +from nepi.util.tunchannel_impl import \ + crossconnect_tunchannel_peer_init, \ + crossconnect_tunchannel_peer_compl + + ### Connection functions #### def connect_node_device(testbed_instance, node_guid, device_guid): @@ -137,28 +142,6 @@ def connect_tunchannel_fd(testbed_instance, tun_guid, fdnd_guid): # Send the other endpoint to the TUN channel tun.tun_socket = sock2 -def connect_tunchannel_peer_init(testbed_instance, tun_guid, cross_data): - tun = testbed_instance._elements[tun_guid] - -def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data): - tun = testbed_instance._elements[tun_guid] - tun.peer_addr = peer_data.get("tun_addr") - tun.peer_proto = peer_data.get("tun_proto") or proto - tun.peer_port = peer_data.get("tun_port") - tun.tun_key = min(tun.tun_key, peer_data.get("tun_key")) - tun.tun_proto = proto - - preconfigure_tunchannel(testbed_instance, tun_guid) - -def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data): - # refresh (refreshable) attributes for second-phase - tun = testbed_instance._elements[tun_guid] - tun.peer_addr = peer_data.get("tun_addr") - tun.peer_proto = peer_data.get("tun_proto") or proto - tun.peer_port = peer_data.get("tun_port") - - postconfigure_tunchannel(testbed_instance, tun_guid) - ### Connector information ### diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index 9c0be650..c38a3c39 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -234,206 +234,3 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr -class TunChannel(object): - """ - Helper box class that implements most of the required boilerplate - for tunnelling cross connections. - - The class implements a threaded forwarder that runs in the - testbed controller process. It takes several parameters that - can be given by directly setting attributes: - - tun_port/addr/proto: information about the local endpoint. - The addresses here should be externally-reachable, - since when listening or when using the UDP protocol, - connections to this address/port will be attempted - by remote endpoitns. - - peer_port/addr/proto: information about the remote endpoint. - Usually, you set these when the cross connection - initializer/completion functions are invoked (both). - - tun_key: the agreed upon encryption key. - - listen: if set to True (and in TCP mode), it marks a - listening endpoint. Be certain that any TCP connection - is made between a listening and a non-listening - endpoint, or it won't work. - - with_pi: set if the incoming packet stream (see tun_socket) - contains PI headers - if so, they will be stripped. - - ethernet_mode: set if the incoming packet stream is - composed of ethernet frames (as opposed of IP packets). - - tun_socket: a socket or file object that can be read - from and written to. Packets will be read when available, - remote packets will be forwarded as writes. - A socket should be of type SOCK_SEQPACKET (or SOCK_DGRAM - if not possible), a file object should preserve packet - boundaries (ie, a pipe or TUN/TAP device file descriptor). - - trace_target: a file object where trace output will be sent. - It cannot be changed after launch. - By default, it's sys.stderr - """ - - def __init__(self): - # These get initialized when the channel is configured - # They're part of the TUN standard attribute set - self.tun_port = None - self.tun_addr = None - - # These get initialized when the channel is connected to its peer - self.peer_proto = None - self.peer_addr = None - self.peer_port = None - - # These get initialized when the channel is connected to its iface - self.tun_socket = None - - # same as peer proto, but for execute-time standard attribute lookups - self.tun_proto = None - - # some state - self.prepared = False - self.listen = False - self._terminate = [] # terminate signaller - self._connected = threading.Event() - self._forwarder_thread = None - - # trace to stderr - self.stderr = sys.stderr - - # Generate an initial random cryptographic key to use for tunnelling - # Upon connection, both endpoints will agree on a common one based on - # this one. - self.tun_key = ( ''.join(map(chr, [ - r.getrandbits(8) - for i in xrange(32) - for r in (random.SystemRandom(),) ]) - ).encode("base64").strip() ) - - - def __str__(self): - return "%s" % ( - self.__class__.__name__, - self.address, self.netprefix, - " up" if self.up else " down", - " snat" if self.snat else "", - ) - - def Prepare(self): - if not self.udp and self.listen and not self._forwarder_thread: - if self.listen or (self.peer_addr and self.peer_port and self.peer_proto): - self._launch() - - def Setup(self): - if not self._forwarder_thread: - self._launch() - - def Cleanup(self): - if self._forwarder_thread: - self.Kill() - - def Wait(self): - if self._forwarder_thread: - self._connected.wait() - - def Kill(self): - if self._forwarder_thread: - if not self._terminate: - self._terminate.append(None) - self._forwarder_thread.join() - - def _launch(self): - # Launch forwarder thread with a weak reference - # to self, so that we don't create any strong cycles - # and automatic refcounting works as expected - self._forwarder_thread = threading.Thread( - self._forwarder, - args = (weakref.ref(self),) ) - self._forwarder_thread.start() - - @staticmethod - def _forwarder(weak_self): - # grab strong reference - self = weak_self() - if not self: - return - - peer_port = self.peer_port - peer_addr = self.peer_addr - peer_proto= self.peer_proto - - local_port = self.tun_port - local_addr = self.tun_addr - local_proto = self.tun_proto - - stderr = self.stderr - - if local_proto != peer_proto: - raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto) - - udp = local_proto == 'udp' - listen = self.listen - - if (udp or not listen) and (not peer_port or not peer_addr): - raise RuntimeError, "Misconfigured peer for: %s" % (self,) - - if (udp or listen) and (not local_port or not local_addr): - raise RuntimeError, "Misconfigured TUN: %s" % (self,) - - TERMINATE = self._terminate - cipher_key = self.tun_key - tun = self.tun_socket - - if not tun: - raise RuntimeError, "Unconnected TUN channel %s" % (self,) - - if udp: - # listen on udp port - if remaining_args and not remaining_args[0].startswith('-'): - rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - rsock.bind((local_addr,local_port)) - rsock.connect((peer_addr,peer_port)) - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - elif listen: - # accept tcp connections - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - lsock.bind((local_addr,local_port)) - lsock.listen(1) - rsock,raddr = lsock.accept() - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - else: - # connect to tcp server - rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - for i in xrange(30): - try: - rsock.connect((peer_addr,peer_port)) - break - except socket.error: - # wait a while, retry - time.sleep(1) - else: - rsock.connect((peer_addr,peer_port)) - remote = os.fdopen(rsock.fileno(), 'r+b', 0) - - # notify that we're ready - self._connected.set() - - # drop strong reference - del self - - tun_fwd(tun, remote, - with_pi = False, - ether_mode = True, - cipher_key = cipher_key, - udp = udp, - TERMINATE = TERMINATE, - stderr = stderr - ) - - tun.close() - remote.close() - -- 2.47.0