import socket
import weakref
-class TunChannel(object):
- def __init__(self):
- # These get initialized when the channel is configured
- self.external_addr = None
-
- # These get initialized when the channel is configured
- # They're part of the TUN standard attribute set
- self.tun_port = None
- self.tun_addr = None
-
- # These get initialized when the channel is connected to its peer
- self.peer_proto = None
- self.peer_addr = None
- self.peer_port = None
-
- # These get initialized when the channel is connected to its iface
- self.tun_socket = None
-
- # same as peer proto, but for execute-time standard attribute lookups
- self.tun_proto = None
-
- # some state
- self.prepared = False
- self.listen = False
- self._terminate = [] # terminate signaller
- self._connected = threading.Event()
- self._forwarder_thread = None
-
- # Generate an initial random cryptographic key to use for tunnelling
- # Upon connection, both endpoints will agree on a common one based on
- # this one.
- self.tun_key = ( ''.join(map(chr, [
- r.getrandbits(8)
- for i in xrange(32)
- for r in (random.SystemRandom(),) ])
- ).encode("base64").strip() )
-
-
- def __str__(self):
- return "%s<ip:%s/%s %s%s>" % (
- self.__class__.__name__,
- self.address, self.netprefix,
- " up" if self.up else " down",
- " snat" if self.snat else "",
- )
-
- def Prepare(self):
- if not self.udp and self.listen and not self._forwarder_thread:
- if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
- self._launch()
-
- def Setup(self):
- if not self._forwarder_thread:
- self._launch()
-
- def Cleanup(self):
- if self._forwarder_thread:
- self.Kill()
-
- def Wait(self):
- if self._forwarder_thread:
- self._connected.wait()
-
- def Kill(self):
- if self._forwarder_thread:
- if not self._terminate:
- self._terminate.append(None)
- self._forwarder_thread.join()
-
- def _launch(self):
- # Launch forwarder thread with a weak reference
- # to self, so that we don't create any strong cycles
- # and automatic refcounting works as expected
- self._forwarder_thread = threading.Thread(
- self._forwarder,
- args = (weakref.ref(self),) )
- self._forwarder_thread.start()
-
- @staticmethod
- def _forwarder(weak_self):
- import tunchannel
-
- # grab strong reference
- self = weak_self()
- if not self:
- return
-
- peer_port = self.peer_port
- peer_addr = self.peer_addr
- peer_proto= self.peer_proto
-
- local_port = self.tun_port
- local_addr = self.tun_addr
- local_proto = self.tun_proto
-
- if local_proto != peer_proto:
- raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
-
- udp = local_proto == 'udp'
- listen = self.listen
-
- if (udp or not listen) and (not peer_port or not peer_addr):
- raise RuntimeError, "Misconfigured peer for: %s" % (self,)
-
- if (udp or listen) and (not local_port or not local_addr):
- raise RuntimeError, "Misconfigured TUN: %s" % (self,)
-
- TERMINATE = self._terminate
- cipher_key = self.tun_key
- tun = self.tun_socket
-
- if not tun:
- raise RuntimeError, "Unconnected TUN channel %s" % (self,)
-
- if udp:
- # listen on udp port
- if remaining_args and not remaining_args[0].startswith('-'):
- rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
- rsock.bind((local_addr,local_port))
- rsock.connect((peer_addr,peer_port))
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
- elif listen:
- # accept tcp connections
- lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- lsock.bind((local_addr,local_port))
- lsock.listen(1)
- rsock,raddr = lsock.accept()
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
- else:
- # connect to tcp server
- rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- for i in xrange(30):
- try:
- rsock.connect((peer_addr,peer_port))
- break
- except socket.error:
- # wait a while, retry
- time.sleep(1)
- else:
- rsock.connect((peer_addr,peer_port))
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
-
- # notify that we're ready
- self._connected.set()
-
- # drop strong reference
- del self
-
- tunchannel.tun_fwd(tun, remote,
- with_pi = False,
- ether_mode = True,
- cipher_key = cipher_key,
- udp = udp,
- TERMINATE = TERMINATE,
- stderr = open("/dev/null","w") # silence logging
- )
-
- tun.close()
- remote.close()
-
+from nepi.util.tunchannel import TunChannel
class TestbedController(testbed_impl.TestbedController):
LOCAL_FACTORIES = {
import os
import struct
import socket
+import threading
def ipfmt(ip):
ipbytes = map(ord,ip.decode("hex"))
raise
bkbuf += packet
+
+
+class TunChannel(object):
+ """
+ Helper box class that implements most of the required boilerplate
+ for tunnelling cross connections.
+
+ The class implements a threaded forwarder that runs in the
+ testbed controller process. It takes several parameters that
+ can be given by directly setting attributes:
+
+ tun_port/addr/proto: information about the local endpoint.
+ The addresses here should be externally-reachable,
+ since when listening or when using the UDP protocol,
+ connections to this address/port will be attempted
+ by remote endpoitns.
+
+ peer_port/addr/proto: information about the remote endpoint.
+ Usually, you set these when the cross connection
+ initializer/completion functions are invoked (both).
+
+ tun_key: the agreed upon encryption key.
+
+ listen: if set to True (and in TCP mode), it marks a
+ listening endpoint. Be certain that any TCP connection
+ is made between a listening and a non-listening
+ endpoint, or it won't work.
+
+ with_pi: set if the incoming packet stream (see tun_socket)
+ contains PI headers - if so, they will be stripped.
+
+ ethernet_mode: set if the incoming packet stream is
+ composed of ethernet frames (as opposed of IP packets).
+
+ tun_socket: a socket or file object that can be read
+ from and written to. Packets will be read when available,
+ remote packets will be forwarded as writes.
+ A socket should be of type SOCK_SEQPACKET (or SOCK_DGRAM
+ if not possible), a file object should preserve packet
+ boundaries (ie, a pipe or TUN/TAP device file descriptor).
+
+ trace_target: a file object where trace output will be sent.
+ It cannot be changed after launch.
+ By default, it's sys.stderr
+ """
+
+ def __init__(self):
+ # These get initialized when the channel is configured
+ # They're part of the TUN standard attribute set
+ self.tun_port = None
+ self.tun_addr = None
+
+ # These get initialized when the channel is connected to its peer
+ self.peer_proto = None
+ self.peer_addr = None
+ self.peer_port = None
+
+ # These get initialized when the channel is connected to its iface
+ self.tun_socket = None
+
+ # same as peer proto, but for execute-time standard attribute lookups
+ self.tun_proto = None
+
+ # some state
+ self.prepared = False
+ self.listen = False
+ self._terminate = [] # terminate signaller
+ self._connected = threading.Event()
+ self._forwarder_thread = None
+
+ # trace to stderr
+ self.stderr = sys.stderr
+
+ # Generate an initial random cryptographic key to use for tunnelling
+ # Upon connection, both endpoints will agree on a common one based on
+ # this one.
+ self.tun_key = ( ''.join(map(chr, [
+ r.getrandbits(8)
+ for i in xrange(32)
+ for r in (random.SystemRandom(),) ])
+ ).encode("base64").strip() )
+
+
+ def __str__(self):
+ return "%s<ip:%s/%s %s%s>" % (
+ self.__class__.__name__,
+ self.address, self.netprefix,
+ " up" if self.up else " down",
+ " snat" if self.snat else "",
+ )
+
+ def Prepare(self):
+ if not self.udp and self.listen and not self._forwarder_thread:
+ if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
+ self._launch()
+
+ def Setup(self):
+ if not self._forwarder_thread:
+ self._launch()
+
+ def Cleanup(self):
+ if self._forwarder_thread:
+ self.Kill()
+
+ def Wait(self):
+ if self._forwarder_thread:
+ self._connected.wait()
+
+ def Kill(self):
+ if self._forwarder_thread:
+ if not self._terminate:
+ self._terminate.append(None)
+ self._forwarder_thread.join()
+
+ def _launch(self):
+ # Launch forwarder thread with a weak reference
+ # to self, so that we don't create any strong cycles
+ # and automatic refcounting works as expected
+ self._forwarder_thread = threading.Thread(
+ self._forwarder,
+ args = (weakref.ref(self),) )
+ self._forwarder_thread.start()
+
+ @staticmethod
+ def _forwarder(weak_self):
+ # grab strong reference
+ self = weak_self()
+ if not self:
+ return
+
+ peer_port = self.peer_port
+ peer_addr = self.peer_addr
+ peer_proto= self.peer_proto
+
+ local_port = self.tun_port
+ local_addr = self.tun_addr
+ local_proto = self.tun_proto
+
+ stderr = self.stderr
+
+ if local_proto != peer_proto:
+ raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
+
+ udp = local_proto == 'udp'
+ listen = self.listen
+
+ if (udp or not listen) and (not peer_port or not peer_addr):
+ raise RuntimeError, "Misconfigured peer for: %s" % (self,)
+
+ if (udp or listen) and (not local_port or not local_addr):
+ raise RuntimeError, "Misconfigured TUN: %s" % (self,)
+
+ TERMINATE = self._terminate
+ cipher_key = self.tun_key
+ tun = self.tun_socket
+
+ if not tun:
+ raise RuntimeError, "Unconnected TUN channel %s" % (self,)
+
+ if udp:
+ # listen on udp port
+ if remaining_args and not remaining_args[0].startswith('-'):
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+ rsock.bind((local_addr,local_port))
+ rsock.connect((peer_addr,peer_port))
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+ elif listen:
+ # accept tcp connections
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ lsock.bind((local_addr,local_port))
+ lsock.listen(1)
+ rsock,raddr = lsock.accept()
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+ else:
+ # connect to tcp server
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ for i in xrange(30):
+ try:
+ rsock.connect((peer_addr,peer_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ time.sleep(1)
+ else:
+ rsock.connect((peer_addr,peer_port))
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+
+ # notify that we're ready
+ self._connected.set()
+
+ # drop strong reference
+ del self
+
+ tun_fwd(tun, remote,
+ with_pi = False,
+ ether_mode = True,
+ cipher_key = cipher_key,
+ udp = udp,
+ TERMINATE = TERMINATE,
+ stderr = stderr
+ )
+
+ tun.close()
+ remote.close()
+