Refactor a bit more, connect, wait, create and configure functions are also general...
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 16 May 2011 14:21:10 +0000 (16:21 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 16 May 2011 14:21:10 +0000 (16:21 +0200)
Move that code to tunchannel_impl, to separate it from the core tunchannel stuff (which may be required if that generic implementation isn't applicable for some - ie, for tun_connect in PL).

src/nepi/testbeds/ns3/execute.py
src/nepi/testbeds/ns3/factories_metadata_v3_9_RC3.py
src/nepi/testbeds/ns3/metadata_v3_9_RC3.py
src/nepi/util/tunchannel.py

index 2b10a74..94f4335 100644 (file)
@@ -12,9 +12,9 @@ import random
 import socket
 import weakref
 
-from nepi.util.tunchannel import TunChannel
-
 class TestbedController(testbed_impl.TestbedController):
+    from nepi.util.tunchannel_impl import TunChannel
+    
     LOCAL_FACTORIES = {
         'ns3::Nepi::TunChannel' : TunChannel,
     }
@@ -26,9 +26,6 @@ class TestbedController(testbed_impl.TestbedController):
         self._traces = dict()
         self._simulator_thread = None
         self._condition = None
-        
-        # local factories
-        self.TunChannel = TunChannel
 
     @property
     def home_directory(self):
index b8dc66b..2889a1b 100644 (file)
@@ -4,6 +4,11 @@
 from nepi.util.constants import AF_INET, STATUS_NOT_STARTED, STATUS_RUNNING, \
         STATUS_FINISHED, STATUS_UNDETERMINED
 
+from nepi.util.tunchannel_impl import \
+    preconfigure_tunchannel, postconfigure_tunchannel, \
+    wait_tunchannel, create_tunchannel
+
+
 def _get_ipv4_protocol_guid(testbed_instance, node_guid):
     # search for the Ipv4L3Protocol asociated with the device
     protos_guids = testbed_instance.get_connected(node_guid, "protos", "node")
@@ -159,14 +164,6 @@ def create_ipv4protocol(testbed_instance, guid):
     static_routing = testbed_instance.ns3.Ipv4StaticRouting()
     list_routing.AddRoutingProtocol(static_routing, 1)
 
-def create_tunchannel(testbed_instance, guid, devnull = []):
-    if not devnull:
-        # just so it's not open if not needed
-        devnull.append(open("/dev/null","w"))
-    element = testbed_instance.TunChannel()
-    element.stderr = devnull[0] # silence tracing
-    testbed_instance._elements[guid] = element
-
 
 ### Start/Stop functions ###
 
@@ -181,10 +178,6 @@ def stop_application(testbed_instance, guid):
     now = testbed_instance.ns3.Simulator.Now()
     element.SetStopTime(now)
 
-def wait_tunchannel(testbed_instance, guid):
-    element = testbed_instance.elements[guid]
-    element.Wait()
-
 
 ### Status functions ###
 
@@ -254,43 +247,6 @@ def configure_device(testbed_instance, guid):
         ipv4.AddAddress(ifindex, inaddr)
         ipv4.SetMetric(ifindex, 1)
         ipv4.SetUp(ifindex)
-
-def preconfigure_tunchannel(testbed_instance, guid):
-    element = testbed_instance._elements[guid]
-    
-    # Find external interface, if any
-    import os
-    public_addr = os.popen(
-        "/sbin/ifconfig "
-        "| grep $(ip route | grep default | awk '{print $3}' "
-                "| awk -F. '{print $1\"[.]\"$2}') "
-        "| head -1 | awk '{print $2}' "
-        "| awk -F : '{print $2}'").read().rstrip()
-    element.tun_addr = public_addr
-
-    # Set standard TUN attributes
-    if not element.tun_port and element.tun_addr:
-        element.tun_port = 15000 + int(guid)
-
-    # First-phase setup
-    if element.peer_proto:
-        # cross tun
-        if not element.tun_addr or not element.tun_port:
-            listening = True
-        elif not element.peer_addr or not element.peer_port:
-            listening = True
-        else:
-            # both have addresses...
-            # ...the one with the lesser address listens
-            listening = element.tun_addr < element.peer_addr
-        element.listen = listening
-        element.Prepare()
-
-def postconfigure_tunchannel(testbed_instance, guid):
-    element = testbed_instance._elements[guid]
-    
-    # Second-phase setup
-    element.Setup()
     
 def _add_static_route(ns3, static_routing, 
         address, netprefix, nexthop_address, ifindex):
index 0effdb0..96f30e6 100644 (file)
@@ -8,6 +8,11 @@ from nepi.util import validation
 import os.path
 import functools
 
+from nepi.util.tunchannel_impl import \
+    crossconnect_tunchannel_peer_init, \
+    crossconnect_tunchannel_peer_compl
+
+
 ### Connection functions ####
 
 def connect_node_device(testbed_instance, node_guid, device_guid):
@@ -137,28 +142,6 @@ def connect_tunchannel_fd(testbed_instance, tun_guid, fdnd_guid):
     # Send the other endpoint to the TUN channel
     tun.tun_socket = sock2
 
-def connect_tunchannel_peer_init(testbed_instance, tun_guid, cross_data):
-    tun = testbed_instance._elements[tun_guid]
-
-def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data):
-    tun = testbed_instance._elements[tun_guid]
-    tun.peer_addr = peer_data.get("tun_addr")
-    tun.peer_proto = peer_data.get("tun_proto") or proto
-    tun.peer_port = peer_data.get("tun_port")
-    tun.tun_key = min(tun.tun_key, peer_data.get("tun_key"))
-    tun.tun_proto = proto
-    
-    preconfigure_tunchannel(testbed_instance, tun_guid)
-
-def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data):
-    # refresh (refreshable) attributes for second-phase
-    tun = testbed_instance._elements[tun_guid]
-    tun.peer_addr = peer_data.get("tun_addr")
-    tun.peer_proto = peer_data.get("tun_proto") or proto
-    tun.peer_port = peer_data.get("tun_port")
-    
-    postconfigure_tunchannel(testbed_instance, tun_guid)
-    
 
 ### Connector information ###
 
index 9c0be65..c38a3c3 100644 (file)
@@ -234,206 +234,3 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
 
 
 
-class TunChannel(object):
-    """
-    Helper box class that implements most of the required boilerplate
-    for tunnelling cross connections.
-    
-    The class implements a threaded forwarder that runs in the
-    testbed controller process. It takes several parameters that
-    can be given by directly setting attributes:
-    
-        tun_port/addr/proto: information about the local endpoint.
-            The addresses here should be externally-reachable,
-            since when listening or when using the UDP protocol,
-            connections to this address/port will be attempted
-            by remote endpoitns.
-        
-        peer_port/addr/proto: information about the remote endpoint.
-            Usually, you set these when the cross connection 
-            initializer/completion functions are invoked (both).
-        
-        tun_key: the agreed upon encryption key.
-        
-        listen: if set to True (and in TCP mode), it marks a
-            listening endpoint. Be certain that any TCP connection
-            is made between a listening and a non-listening
-            endpoint, or it won't work.
-        
-        with_pi: set if the incoming packet stream (see tun_socket)
-            contains PI headers - if so, they will be stripped.
-        
-        ethernet_mode: set if the incoming packet stream is
-            composed of ethernet frames (as opposed of IP packets).
-        
-        tun_socket: a socket or file object that can be read
-            from and written to. Packets will be read when available,
-            remote packets will be forwarded as writes.
-            A socket should be of type SOCK_SEQPACKET (or SOCK_DGRAM
-            if not possible), a file object should preserve packet
-            boundaries (ie, a pipe or TUN/TAP device file descriptor).
-        
-        trace_target: a file object where trace output will be sent.
-            It cannot be changed after launch.
-            By default, it's sys.stderr
-    """
-    
-    def __init__(self):
-        # These get initialized when the channel is configured
-        # They're part of the TUN standard attribute set
-        self.tun_port = None
-        self.tun_addr = None
-        
-        # These get initialized when the channel is connected to its peer
-        self.peer_proto = None
-        self.peer_addr = None
-        self.peer_port = None
-        
-        # These get initialized when the channel is connected to its iface
-        self.tun_socket = None
-
-        # same as peer proto, but for execute-time standard attribute lookups
-        self.tun_proto = None 
-        
-        # some state
-        self.prepared = False
-        self.listen = False
-        self._terminate = [] # terminate signaller
-        self._connected = threading.Event()
-        self._forwarder_thread = None
-        
-        # trace to stderr
-        self.stderr = sys.stderr
-        
-        # Generate an initial random cryptographic key to use for tunnelling
-        # Upon connection, both endpoints will agree on a common one based on
-        # this one.
-        self.tun_key = ( ''.join(map(chr, [ 
-                    r.getrandbits(8) 
-                    for i in xrange(32) 
-                    for r in (random.SystemRandom(),) ])
-                ).encode("base64").strip() )        
-        
-
-    def __str__(self):
-        return "%s<ip:%s/%s %s%s>" % (
-            self.__class__.__name__,
-            self.address, self.netprefix,
-            " up" if self.up else " down",
-            " snat" if self.snat else "",
-        )
-
-    def Prepare(self):
-        if not self.udp and self.listen and not self._forwarder_thread:
-            if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
-                self._launch()
-    
-    def Setup(self):
-        if not self._forwarder_thread:
-            self._launch()
-    
-    def Cleanup(self):
-        if self._forwarder_thread:
-            self.Kill()
-
-    def Wait(self):
-        if self._forwarder_thread:
-            self._connected.wait()
-
-    def Kill(self):    
-        if self._forwarder_thread:
-            if not self._terminate:
-                self._terminate.append(None)
-            self._forwarder_thread.join()
-
-    def _launch(self):
-        # Launch forwarder thread with a weak reference
-        # to self, so that we don't create any strong cycles
-        # and automatic refcounting works as expected
-        self._forwarder_thread = threading.Thread(
-            self._forwarder,
-            args = (weakref.ref(self),) )
-        self._forwarder_thread.start()
-    
-    @staticmethod
-    def _forwarder(weak_self):
-        # grab strong reference
-        self = weak_self()
-        if not self:
-            return
-        
-        peer_port = self.peer_port
-        peer_addr = self.peer_addr
-        peer_proto= self.peer_proto
-
-        local_port = self.tun_port
-        local_addr = self.tun_addr
-        local_proto = self.tun_proto
-        
-        stderr = self.stderr
-        
-        if local_proto != peer_proto:
-            raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
-        
-        udp = local_proto == 'udp'
-        listen = self.listen
-
-        if (udp or not listen) and (not peer_port or not peer_addr):
-            raise RuntimeError, "Misconfigured peer for: %s" % (self,)
-
-        if (udp or listen) and (not local_port or not local_addr):
-            raise RuntimeError, "Misconfigured TUN: %s" % (self,)
-        
-        TERMINATE = self._terminate
-        cipher_key = self.tun_key
-        tun = self.tun_socket
-        
-        if not tun:
-            raise RuntimeError, "Unconnected TUN channel %s" % (self,)
-        
-        if udp:
-            # listen on udp port
-            if remaining_args and not remaining_args[0].startswith('-'):
-                rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
-                rsock.bind((local_addr,local_port))
-                rsock.connect((peer_addr,peer_port))
-            remote = os.fdopen(rsock.fileno(), 'r+b', 0)
-        elif listen:
-            # accept tcp connections
-            lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-            lsock.bind((local_addr,local_port))
-            lsock.listen(1)
-            rsock,raddr = lsock.accept()
-            remote = os.fdopen(rsock.fileno(), 'r+b', 0)
-        else:
-            # connect to tcp server
-            rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-            for i in xrange(30):
-                try:
-                    rsock.connect((peer_addr,peer_port))
-                    break
-                except socket.error:
-                    # wait a while, retry
-                    time.sleep(1)
-            else:
-                rsock.connect((peer_addr,peer_port))
-            remote = os.fdopen(rsock.fileno(), 'r+b', 0)
-        
-        # notify that we're ready
-        self._connected.set()
-        
-        # drop strong reference
-        del self
-        
-        tun_fwd(tun, remote,
-            with_pi = False, 
-            ether_mode = True, 
-            cipher_key = cipher_key, 
-            udp = udp, 
-            TERMINATE = TERMINATE,
-            stderr = stderr
-        )
-        
-        tun.close()
-        remote.close()
-