import os
import sys
import threading
+import random
+import socket
+import weakref
+
+class TunChannel(object):
+ def __init__(self):
+ # These get initialized when the channel is configured
+ self.external_addr = None
+
+ # These get initialized when the channel is configured
+ # They're part of the TUN standard attribute set
+ self.tun_port = None
+ self.tun_addr = None
+
+ # These get initialized when the channel is connected to its peer
+ self.peer_proto = None
+ self.peer_addr = None
+ self.peer_port = None
+
+ # These get initialized when the channel is connected to its iface
+ self.tun_socket = None
+
+ # same as peer proto, but for execute-time standard attribute lookups
+ self.tun_proto = None
+
+ # some state
+ self.prepared = False
+ self.listen = False
+ self._terminate = [] # terminate signaller
+ self._connected = threading.Event()
+ self._forwarder_thread = None
+
+ # Generate an initial random cryptographic key to use for tunnelling
+ # Upon connection, both endpoints will agree on a common one based on
+ # this one.
+ self.tun_key = ( ''.join(map(chr, [
+ r.getrandbits(8)
+ for i in xrange(32)
+ for r in (random.SystemRandom(),) ])
+ ).encode("base64").strip() )
+
+
+ def __str__(self):
+ return "%s<ip:%s/%s %s%s>" % (
+ self.__class__.__name__,
+ self.address, self.netprefix,
+ " up" if self.up else " down",
+ " snat" if self.snat else "",
+ )
+
+ def Prepare(self):
+ if not self.udp and self.listen and not self._forwarder_thread:
+ if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
+ self._launch()
+
+ def Setup(self):
+ if not self._forwarder_thread:
+ self._launch()
+
+ def Cleanup(self):
+ if self._forwarder_thread:
+ self.Kill()
+
+ def Wait(self):
+ if self._forwarder_thread:
+ self._connected.wait()
+
+ def Kill(self):
+ if self._forwarder_thread:
+ if not self._terminate:
+ self._terminate.append(None)
+ self._forwarder_thread.join()
+
+ def _launch(self):
+ # Launch forwarder thread with a weak reference
+ # to self, so that we don't create any strong cycles
+ # and automatic refcounting works as expected
+ self._forwarder_thread = threading.Thread(
+ self._forwarder,
+ args = (weakref.ref(self),) )
+ self._forwarder_thread.start()
+
+ @staticmethod
+ def _forwarder(weak_self):
+ import tunchannel
+
+ # grab strong reference
+ self = weak_self()
+ if not self:
+ return
+
+ peer_port = self.peer_port
+ peer_addr = self.peer_addr
+ peer_proto= self.peer_proto
+
+ local_port = self.tun_port
+ local_addr = self.tun_addr
+ local_proto = self.tun_proto
+
+ if local_proto != peer_proto:
+ raise RuntimeError, "Peering protocol mismatch: %s != %s" % (local_proto, peer_proto)
+
+ udp = local_proto == 'udp'
+ listen = self.listen
+
+ if (udp or not listen) and (not peer_port or not peer_addr):
+ raise RuntimeError, "Misconfigured peer for: %s" % (self,)
+
+ if (udp or listen) and (not local_port or not local_addr):
+ raise RuntimeError, "Misconfigured TUN: %s" % (self,)
+
+ TERMINATE = self._terminate
+ cipher_key = self.tun_key
+ tun = self.tun_socket
+
+ if not tun:
+ raise RuntimeError, "Unconnected TUN channel %s" % (self,)
+
+ if udp:
+ # listen on udp port
+ if remaining_args and not remaining_args[0].startswith('-'):
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+ rsock.bind((local_addr,local_port))
+ rsock.connect((peer_addr,peer_port))
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+ elif listen:
+ # accept tcp connections
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ lsock.bind((local_addr,local_port))
+ lsock.listen(1)
+ rsock,raddr = lsock.accept()
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+ else:
+ # connect to tcp server
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ for i in xrange(30):
+ try:
+ rsock.connect((peer_addr,peer_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ time.sleep(1)
+ else:
+ rsock.connect((peer_addr,peer_port))
+ remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+
+ # notify that we're ready
+ self._connected.set()
+
+ # drop strong reference
+ del self
+
+ tunchannel.tun_fwd(tun, remote,
+ with_pi = False,
+ ether_mode = True,
+ cipher_key = cipher_key,
+ udp = udp,
+ TERMINATE = TERMINATE,
+ stderr = open("/dev/null","w") # silence logging
+ )
+
+ tun.close()
+ remote.close()
+
class TestbedController(testbed_impl.TestbedController):
+ LOCAL_FACTORIES = {
+ 'ns3::Nepi::TunChannel' : TunChannel,
+ }
+
def __init__(self, testbed_version):
super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
self._ns3 = None
self._traces = dict()
self._simulator_thread = None
self._condition = None
+
+ # local factories
+ self.TunChannel = TunChannel
@property
def home_directory(self):
factory.box_attributes.is_attribute_invisible(name):
return
element = self._elements[guid]
- ns3_value = self._to_ns3_value(guid, name, value)
- element.SetAttribute(name, ns3_value)
+ if factory_id in self.LOCAL_FACTORIES:
+ setattr(element, name, value)
+ else:
+ ns3_value = self._to_ns3_value(guid, name, value)
+ element.SetAttribute(name, ns3_value)
def get(self, guid, name, time = TIME_NOW):
value = super(TestbedController, self).get(guid, name, time)
if factory.box_attributes.is_attribute_design_only(name) or \
factory.box_attributes.is_attribute_invisible(name):
return value
+ if factory_id in self.LOCAL_FACTORIES:
+ return getattr(element, name)
TypeId = self.ns3.TypeId()
typeid = TypeId.LookupByName(factory_id)
info = TypeId.AttributeInfo()
construct_params[name] = value
return construct_params
+
+
static_routing = testbed_instance.ns3.Ipv4StaticRouting()
list_routing.AddRoutingProtocol(static_routing, 1)
+def create_tunchannel(testbed_instance, guid):
+ element = testbed_instance.TunChannel()
+ testbed_instance._elements[guid] = element
+
+
### Start/Stop functions ###
def start_application(testbed_instance, guid):
now = testbed_instance.ns3.Simulator.Now()
element.SetStopTime(now)
+def wait_tunchannel(testbed_instance, guid):
+ element = testbed_instance.elements[guid]
+ element.Wait()
+
+
### Status functions ###
def status_application(testbed_instance, guid):
ipv4.SetMetric(ifindex, 1)
ipv4.SetUp(ifindex)
+def preconfigure_tunchannel(testbed_instance, guid):
+ element = testbed_instance._elements[guid]
+
+ # Find external interface, if any
+ import os
+ public_addr = os.popen(
+ "/sbin/ifconfig "
+ "| grep $(ip route | grep default | awk '{print $3}' "
+ "| awk -F. '{print $1\"[.]\"$2}') "
+ "| head -1 | awk '{print $2}' "
+ "| awk -F : '{print $2}'").read().rstrip()
+ element.external_addr = public_addr
+
+ # Set standard TUN attributes
+ if (not element.tun_addr or not element.tun_port) and element.external_addr:
+ element.tun_addr = element.external_addr
+ element.tun_port = 15000 + int(guid)
+
+ # First-phase setup
+ if element.peer_proto:
+ # cross tun
+ if not element.tun_addr or not element.tun_port:
+ listening = True
+ elif not element.peer_addr or not element.peer_port:
+ listening = True
+ else:
+ # both have addresses...
+ # ...the one with the lesser address listens
+ listening = element.tun_addr < element.peer_addr
+ element.listen = listening
+ element.Prepare()
+
+def postconfigure_tunchannel(testbed_instance, guid):
+ element = testbed_instance._elements[guid]
+
+ # Second-phase setup
+ element.Setup()
+
def _add_static_route(ns3, static_routing,
address, netprefix, nexthop_address, ifindex):
if netprefix == 0:
"LinuxSocketAddress",
"tun_proto", "tun_addr", "tun_port", "tun_key"],
"traces": ["fdpcap"]
+ }),
+ "ns3::Nepi::TunChannel": dict({
+ "category": "Channel",
+ "create_function": create_tunchannel,
+ "preconfigure_function": preconfigure_tunchannel,
+ "configure_function": postconfigure_tunchannel,
+ "start_function": wait_tunchannel,
+ "help": "Channel to forward FileDescriptorNetDevice data to "
+ "other TAP interfaces supporting the NEPI tunneling protocol.",
+ "connector_types": ["fd->", "udp", "tcp"],
+ "allow_addresses": False,
+ "box_attributes": ["tun_proto", "tun_addr", "tun_port", "tun_key"]
}),
"ns3::CsmaNetDevice": dict({
"category": "Device",
from nepi.core.attributes import Attribute
from nepi.util import validation
import os.path
+import functools
### Connection functions ####
testbed_instance.set(fdnd_guid, "tun_port", 0)
testbed_instance.set(fdnd_guid, "tun_key", "\xfa"*32) # unimportant, fds aren't encrypted
+def connect_tunchannel_fd(testbed_instance, tun_guid, fdnd_guid):
+ fdnd = testbed_instance._elements[fdnd_guid]
+ tun = testbed_instance._elements[tun_guid]
+
+ # XXX: check the method StringToBuffer of ns3::FileDescriptorNetDevice
+ # to see how the address should be decoded
+ endpoint = fdnd.GetEndpoint()
+ address = endpoint.replace(":", "").decode('hex')[2:]
+ testbed_instance.set(fdnd_guid, "LinuxSocketAddress", address)
+
+ # Create socket pair to connect the FDND and the TunChannel with it
+ import socket
+ sock1, sock2 = socket.socketpair(
+ socket.AF_UNIX, socket.SOCK_SEQPACKET)
+
+ # Send one endpoint to the FDND
+ import passfd
+ import socket
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ sock.connect(address)
+ passfd.sendfd(sock, sock1.fileno(), '0')
+
+ # Store a reference to the endpoint to keep the socket alive
+ fdnd._endpoint_socket = sock1
+
+ # Send the other endpoint to the TUN channel
+ tun.tun_socket = sock2
+
+def connect_tunchannel_peer_init(testbed_instance, tun_guid, cross_data):
+ tun = testbed_instance._elements[tun_guid]
+
+def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data):
+ tun = testbed_instance._elements[tun_guid]
+ tun.peer_addr = peer_data.get("tun_addr")
+ tun.peer_proto = peer_data.get("tun_proto") or proto
+ tun.peer_port = peer_data.get("tun_port")
+ tun.tun_key = min(tun.tun_key, peer_data.get("tun_key"))
+ tun.tun_proto = proto
+
+ preconfigure_tunchannel(testbed_instance, tun_guid)
+
+def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data):
+ # refresh (refreshable) attributes for second-phase
+ tun = testbed_instance._elements[tun_guid]
+ tun.peer_addr = peer_data.get("tun_addr")
+ tun.peer_proto = peer_data.get("tun_proto") or proto
+ tun.peer_port = peer_data.get("tun_port")
+
+ postconfigure_tunchannel(testbed_instance, tun_guid)
+
+
### Connector information ###
connector_types = dict({
"max": 1,
"min": 0
}),
+ "tcp": dict({
+ "help": "ip-ip tunneling over TCP link",
+ "name": "tcp",
+ "max": 1,
+ "min": 0
+ }),
+ "udp": dict({
+ "help": "ip-ip tunneling over UDP datagrams",
+ "name": "udp",
+ "max": 1,
+ "min": 0
+ }),
})
connections = [
"init_code": connect_fd,
"can_cross": True
}),
+ dict({
+ "from": ( "ns3", "ns3::Nepi::TunChannel", "fd->" ),
+ "to": ( "ns3", "ns3::FileDescriptorNetDevice", "->fd" ),
+ "init_code": connect_tunchannel_fd,
+ "can_cross": False
+ }),
+ dict({
+ "from": ( "ns3", "ns3::Nepi::TunChannel", "tcp"),
+ "to": (None, None, "tcp"),
+ "init_code": functools.partial(crossconnect_tunchannel_peer_init,"tcp"),
+ "compl_code": functools.partial(crossconnect_tunchannel_peer_compl,"tcp"),
+ "can_cross": True
+ }),
+ dict({
+ "from": ( "ns3", "ns3::Nepi::TunChannel", "udp"),
+ "to": (None, None, "udp"),
+ "init_code": functools.partial(crossconnect_tunchannel_peer_init,"udp"),
+ "compl_code": functools.partial(crossconnect_tunchannel_peer_compl,"udp"),
+ "can_cross": True
+ }),
]
traces = dict({
"ns3::OnOffApplication",
"ns3::VirtualNetDevice",
"ns3::FileDescriptorNetDevice",
+ "ns3::Nepi::TunChannel",
"ns3::TapBridge",
"ns3::BridgeChannel",
"ns3::BridgeNetDevice",