def add_ns3_fdnd(self, node, ns3_desc):
fdnd = ns3_desc.create("ns3::FdNetDevice")
node.connector("devs").connect(fdnd.connector("node"))
- fdnd.enable_trace("FileDescriptorPcapTrace")
+ fdnd.enable_trace("FdPcapTrace")
return fdnd
def add_ns3_node(self, ns3_desc):
cross_connector_type_name,
True)
if connect_code:
- self._logger.debug("Cross-connect: guid: %d, connect_code: %s " % (
- guid, connect_code.func.__name__))
+ if hasattr(connect_code, "func"):
+ func_name = connect_code.func.__name__
+ elif hasattr(connect_code, "__name__"):
+ func_name = connect_code.__name__
+ else:
+ func_name = repr(connect_code)
+ self._logger.debug("Cross-connect - guid: %d, connect_code: %s " % (
+ guid, func_name))
elem_cross_data = cross_data[cross_testbed_guid][cross_guid]
connect_code(self, guid, elem_cross_data)
trace.close()
for guid, element in self._elements.iteritems():
if isinstance(element, self.TunChannel):
- element.Cleanup()
+ element.cleanup()
else:
factory_id = self._create[guid]
if factory_id == "Node":
from nepi.util.tunchannel_impl import \
preconfigure_tunchannel, postconfigure_tunchannel, \
- wait_tunchannel, create_tunchannel, \
+ prestart_tunchannel, create_tunchannel, \
crossconnect_tunchannel_peer_init, \
crossconnect_tunchannel_peer_compl
"create_function": create_tunchannel,
"preconfigure_function": preconfigure_tunchannel,
"configure_function": postconfigure_tunchannel,
- "prestart_function": wait_tunchannel,
+ "prestart_function": prestart_tunchannel,
"help": "Channel to forward "+TAPIFACE+" data to "
"other TAP interfaces supporting the NEPI tunneling protocol.",
"connector_types": ["->fd", "udp", "tcp"],
# Store a reference to the endpoint to keep the socket alive
fdnd._endpoint_socket = sock1
fdnd.SetFileDescriptor(sock1.fileno())
-
+
# Send the other endpoint to the TUN channel
tun.tun_socket = sock2
# the default presence of PI headers)
tun.with_pi = True
+
### Connector information ###
connector_types = dict({
for element in self._elements.itervalues():
if isinstance(element, self.LOCAL_TYPES):
# graceful shutdown of locally-implemented objects
- element.Cleanup()
+ element.cleanup()
if self.ns3:
if not self.ns3.Simulator.IsFinished():
self.stop()
FactoryCategories as FC
from nepi.util.tunchannel_impl import \
preconfigure_tunchannel, postconfigure_tunchannel, \
- wait_tunchannel, create_tunchannel
+ prestart_tunchannel, create_tunchannel
import re
wifi_standards = dict({
"create_function": create_tunchannel,
"preconfigure_function": preconfigure_tunchannel,
"configure_function": postconfigure_tunchannel,
- "prestart_function": wait_tunchannel,
+ "prestart_function": prestart_tunchannel,
"help": "Channel to forward FdNetDevice data to "
"other TAP interfaces supporting the NEPI tunneling protocol.",
"connector_types": ["fd->", "udp", "tcp"],
if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None):
raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,)
- def _impl_instance(self, home_path, listening):
+ def _impl_instance(self, home_path):
impl = self._PROTO_MAP[self.peer_proto](
- self, self.peer_iface, home_path, self.tun_key, listening)
+ self, self.peer_iface, home_path, self.tun_key)
impl.port = self.tun_port
return impl
else:
self._delay_recover = True
- def prepare(self, home_path, listening):
- if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
+ def prepare(self, home_path):
+ if not self.peer_iface and (self.peer_proto and self.peer_addr and self.peer_port):
# Ad-hoc peer_iface
self.peer_iface = _CrossIface(
self.peer_proto,
self.peer_cipher)
if self.peer_iface:
if not self.peer_proto_impl:
- self.peer_proto_impl = self._impl_instance(home_path, listening)
+ self.peer_proto_impl = self._impl_instance(home_path)
if self._delay_recover:
self.peer_proto_impl.recover()
- else:
- self.peer_proto_impl.prepare()
- def setup(self):
+ def launch(self):
if self.peer_proto_impl:
- self.peer_proto_impl.setup()
+ self.peer_proto_impl.launch()
def cleanup(self):
if self.peer_proto_impl:
self.peer_proto_impl.destroy()
self.peer_proto_impl = None
- def async_launch_wait(self):
+ def wait(self):
if self.peer_proto_impl:
- self.peer_proto_impl.async_launch_wait()
+ self.peer_proto_impl.wait()
def sync_trace(self, local_dir, whichtrace, tracemap = None):
if self.peer_proto_impl:
element.validate()
# First-phase setup
- if element.peer_proto:
- if element.peer_iface and isinstance(element.peer_iface, testbed_instance._interfaces.TunIface):
- # intra tun
- listening = id(element) < id(element.peer_iface)
- else:
- # cross tun
- if not element.tun_addr or not element.tun_port:
- listening = True
- elif not element.peer_addr or not element.peer_port:
- listening = True
- else:
- # both have addresses...
- # ...the one with the lesser address listens
- listening = element.tun_addr < element.peer_addr
- element.prepare(
- 'tun-%s' % (guid,),
- listening)
+ element.prepare('tun-%s' % (guid,))
def postconfigure_tuniface(testbed_instance, guid):
element = testbed_instance._elements[guid]
# Second-phase setup
- element.setup()
+ element.launch()
-def wait_tuniface(testbed_instance, guid):
+def prestart_tuniface(testbed_instance, guid):
element = testbed_instance._elements[guid]
# Second-phase setup
- element.async_launch_wait()
-
+ element.wait()
def configure_node(testbed_instance, guid):
node = testbed_instance._elements[guid]
"create_function": create_tuniface,
"preconfigure_function": preconfigure_tuniface,
"configure_function": postconfigure_tuniface,
- "prestart_function": wait_tuniface,
+ "prestart_function": prestart_tuniface,
"box_attributes": [
"up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
"txqueuelen",
"create_function": create_tapiface,
"preconfigure_function": preconfigure_tuniface,
"configure_function": postconfigure_tuniface,
- "prestart_function": wait_tuniface,
+ "prestart_function": prestart_tuniface,
"box_attributes": [
"up", "if_name", "mtu", "snat", "pointopoint", "multicast", "bwlimit",
"txqueuelen",
tun_path = '/dev/net/tun'
hostaddr = socket.gethostbyname(socket.gethostname())
-usage = "usage: %prog [options] <remote-endpoint>"
+usage = "usage: %prog [options]"
parser = optparse.OptionParser(usage=usage)
default = "/dev/net/tun",
help = "TUN/TAP device file path or file descriptor number")
parser.add_option(
- "-p", "--port", dest="port", metavar="PORT", type="int",
+ "-p", "--peer-port", dest="peer-port", metavar="PEER_PORT", type="int",
default = 15000,
- help = "Peering TCP port to connect or listen to.")
+ help = "Remote TCP/UDP port to connect to.")
parser.add_option(
"--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET",
default = None,
"If given, all other connectivity options are ignored, tun_connect will "
"simply wait to be killed after passing the file descriptor, and it will be "
"the receiver's responsability to handle the tunneling.")
-
parser.add_option(
"-m", "--mode", dest="mode", metavar="MODE",
default = "none",
"by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), "
"and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have "
"to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).")
+parser.add_option(
+ "-t", "--protocol", dest="protocol", metavar="PROTOCOL",
+ default = None,
+ help =
+ "Set protocol. One of tcp, udp, fd, gre. In any mode except none, a TUN/TAP will be created.")
parser.add_option(
"-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS",
default = None,
help =
"See mode. This specifies the VIF_MASK, "
"a number indicating the network type (ie: 24 for a C-class network).")
+parser.add_option(
+ "-P", "--port", dest="port", type="int", metavar="PORT",
+ default = None,
+ help =
+ "This specifies the LOCAL_PORT. This will be the local bind port for UDP/TCP.")
parser.add_option(
"-S", "--vif-snat", dest="vif_snat",
action = "store_true",
default = False,
help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " )
parser.add_option(
- "-P", "--vif-pointopoint", dest="vif_pointopoint", metavar="DST_ADDR",
+ "-Z", "--vif-pointopoint", dest="vif_pointopoint", metavar="DST_ADDR",
default = None,
help =
"See mode. This specifies the remote endpoint's virtual address, "
help =
"This specifies the interface's emulated bandwidth in bytes per second." )
parser.add_option(
- "-u", "--udp", dest="udp", metavar="PORT", type="int",
+ "-a", "--peer-address", dest="peer_addr", metavar="PEER_ADDRESS",
default = None,
help =
- "Bind to the specified UDP port locally, and send UDP datagrams to the "
- "remote endpoint, creating a tunnel through UDP rather than TCP." )
+ "This specifies the PEER_ADDRESS, "
+ "the IP address of the remote interface.")
parser.add_option(
"-k", "--key", dest="cipher_key", metavar="KEY",
default = None,
help = "If specified, packets won't be logged to standard output, "
"but dumped to a pcap-formatted trace in the specified file. " )
-(options, remaining_args) = parser.parse_args(sys.argv[1:])
+(options,) = parser.parse_args(sys.argv[1:])
options.cipher = {
'aes' : 'AES',
with_pi = options.mode.startswith('pl-'),
ether_mode = tun_name.startswith('tap'),
cipher_key = options.cipher_key,
- udp = options.udp,
+ udp = options.protocol == 'udp',
TERMINATE = TERMINATE,
stderr = None,
reconnect = reconnect,
reconnect = None
mcastthread = None
- if options.pass_fd:
+ if options.protocol == 'fd':
if accept_packet or filter_init:
raise NotImplementedError, "--pass-fd and --filter are not compatible"
while not TERM:
time.sleep(1)
remote = None
- elif options.mode.startswith('pl-gre'):
+ elif options.protocol == "gre":
if accept_packet or filter_init:
raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
TERM = TERMINATE
while not TERM:
time.sleep(1)
- remote = remaining_args[0]
- elif options.udp:
+ remote = options.peer_addr
+ elif options.protocol == "udp":
# connect to remote endpoint
- if remaining_args and not remaining_args[0].startswith('-'):
- print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
- print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
- rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
- retrydelay = 1.0
- for i in xrange(30):
- if TERMINATE:
- raise OSError, "Killed"
- try:
- rsock.bind((hostaddr,options.udp))
- break
- except socket.error:
- # wait a while, retry
- print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
- time.sleep(min(30.0,retrydelay))
- retrydelay *= 1.1
- else:
- rsock.bind((hostaddr,options.udp))
- rsock.connect((remaining_args[0],options.port))
+ if options.peer_addr and options.peer_port:
+ remote = tunchannel.udp_establish(TERMINATE, hostaddr,
+ options.port, options.peer_addr, options.peer_port)
else:
print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
raise AssertionError, "Error: need a remote endpoint in UDP mode"
-
- # Wait for other peer
- tunchannel.udp_handshake(TERMINATE, rsock)
-
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
- else:
+ elif options.protocol == "tcp":
# connect to remote endpoint
- if remaining_args and not remaining_args[0].startswith('-'):
- print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
- rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- retrydelay = 1.0
- for i in xrange(30):
- if TERMINATE:
- raise OSError, "Killed"
- try:
- rsock.connect((remaining_args[0],options.port))
- break
- except socket.error:
- # wait a while, retry
- print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
- time.sleep(min(30.0,retrydelay))
- retrydelay *= 1.1
- else:
- rsock.connect((remaining_args[0],options.port))
+ if options.peer_addr and options.peer_port:
+ remote = tunchannel.tcp_establish(TERMINATE, hostaddr,
+ options.port, options.peer_addr, options.peer_port)
else:
- print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
- lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- retrydelay = 1.0
- for i in xrange(30):
- if TERMINATE:
- raise OSError, "Killed"
- try:
- lsock.bind((hostaddr,options.port))
- break
- except socket.error:
- # wait a while, retry
- print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
- time.sleep(min(30.0,retrydelay))
- retrydelay *= 1.1
- else:
- lsock.bind((hostaddr,options.port))
- lsock.listen(1)
- rsock,raddr = lsock.accept()
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
+ print >>sys.stderr, "Error: need a remote endpoint in TCP mode"
+ raise AssertionError, "Error: need a remote endpoint in TCP mode"
+ else:
+ msg = "Error: Invalid protocol %s" % options.protocol
+ print >>sys.stderr, msg
+ raise AssertionError, msg
if filter_init:
filter_local, filter_remote = filter_init()
self.key = key
self.home_path = home_path
-
- self._launcher = None
+
self._started = False
- self._started_listening = False
- self._starting = False
+
self._pid = None
self._ppid = None
self._if_name = None
if proc.wait():
raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
-
def _install_scripts(self):
local = self.local()
if proc.wait():
raise RuntimeError, "Failed to set up TUN forwarder: %s %s" % (out,err,)
- def launch(self, check_proto, listen, extra_args=[]):
- if self._starting:
- raise AssertionError, "Double start"
-
- self._starting = True
-
+ def launch(self, check_proto):
peer = self.peer()
local = self.local()
peer_port = peer.tun_port
peer_addr = peer.tun_addr
- peer_proto= peer.tun_proto
- peer_cipher=peer.tun_cipher
+ peer_proto = peer.tun_proto
+ peer_cipher = peer.tun_cipher
local_port = self.port
local_cap = local.capture
local_snat = local.snat
local_txq = local.txqueuelen
local_p2p = local.pointopoint
- local_cipher=local.tun_cipher
- local_mcast= local.multicast
- local_bwlim= local.bwlimit
+ local_cipher = local.tun_cipher
+ local_mcast = local.multicast
+ local_bwlim = local.bwlimit
if not local_p2p and hasattr(peer, 'address'):
local_p2p = peer.address
if local_cipher != peer_cipher:
raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
- if not listen and ((peer_proto != 'fd' and not peer_port) or not peer_addr):
- raise RuntimeError, "Misconfigured peer: %s" % (peer,)
-
- if listen and ((peer_proto != 'fd' and not local_port) or not local_addr or not local_mask):
- raise RuntimeError, "Misconfigured TUN: %s" % (local,)
-
if check_proto == 'gre' and local_cipher.lower() != 'plain':
raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
args = ["python", "tun_connect.py",
"-m", str(self.mode),
+ "-t", str(check_proto),
"-A", str(local_addr),
"-M", str(local_mask),
"-C", str(local_cipher)]
])
elif check_proto == 'gre':
args.extend([
- "-K", str(min(local_port, peer_port))
+ "-K", str(min(local_port, peer_port)),
+ "-a", str(peer_addr),
])
+ # both udp and tcp
else:
args.extend([
- "-p", str(local_port if listen else peer_port),
+ "-P", str(local_port),
+ "-p", str(peer_port),
+ "-a", str(peer_addr),
"-k", str(self.key)
])
if local_snat:
args.append("-S")
if local_p2p:
- args.extend(("-P",str(local_p2p)))
+ args.extend(("-Z",str(local_p2p)))
if local_txq:
args.extend(("-Q",str(local_txq)))
if not local_cap:
args.append("--multicast")
if local_bwlim:
args.extend(("-b",str(local_bwlim*1024)))
- if extra_args:
- args.extend(map(str,extra_args))
- if not listen and check_proto != 'fd':
- args.append(str(peer_addr))
if filter_module:
args.extend(("--filter", filter_module))
if filter_args:
if proc.wait():
raise RuntimeError, "Failed to set up TUN: %s %s" % (out,err,)
-
+
self._started = True
def recover(self):
# Tunnel should be still running in its node
# Just check its pidfile and we're done
self._started = True
- self._started_listening = True
self.checkpid()
- def _launch_and_wait(self, *p, **kw):
- try:
- self.__launch_and_wait(*p, **kw)
- except:
- if self._launcher:
- import sys
- self._launcher._exc.append(sys.exc_info())
- else:
- raise
-
- def __launch_and_wait(self, *p, **kw):
+ def wait(self):
local = self.local()
- self.launch(*p, **kw)
-
- # Wait for the process to be started
- while self.status() == rspawn.NOT_STARTED:
- time.sleep(1.0)
-
# Wait for the connection to be established
retrytime = 2.0
for spin in xrange(30):
)
proc.wait()
- if out.strip() == '1':
- self._started_listening = True
-
time.sleep(min(30.0, retrytime))
retrytime *= 1.1
else:
return True
return False
- def async_launch(self, check_proto, listen, extra_args=[]):
- if not self._started and not self._launcher:
- self._launcher = threading.Thread(
- target = self._launch_and_wait,
- args = (check_proto, listen, extra_args))
- self._launcher._exc = []
- self._launcher.start()
-
- def async_launch_wait(self):
- if self._launcher:
- self._launcher.join()
-
- if self._launcher._exc:
- exctyp,exval,exctrace = self._launcher._exc[0]
- raise exctyp,exval,exctrace
- elif not self._started:
- raise RuntimeError, "Failed to launch TUN forwarder"
- elif not self._started:
- self.launch()
-
- def async_launch_wait_listening(self):
- if self._launcher:
- for x in xrange(180):
- if self._launcher._exc:
- exctyp,exval,exctrace = self._launcher._exc[0]
- raise exctyp,exval,exctrace
- elif self._started and self._started_listening:
- break
- time.sleep(1)
- elif not self._started:
- self.launch()
-
def checkpid(self):
local = self.local()
def remote_trace_path(self, whichtrace, tracemap = None):
tracemap = self._TRACEMAP if not tracemap else tracemap
-
if whichtrace not in tracemap:
return None
return local_path
-
- def prepare(self):
- """
- First-phase setup
-
- eg: set up listening ports
- """
- raise NotImplementedError
-
- def setup(self):
- """
- Second-phase setup
-
- eg: connect to peer
- """
- raise NotImplementedError
-
def shutdown(self):
- """
- Cleanup
- """
- raise NotImplementedError
+ self.kill()
def destroy(self):
- """
- Second-phase cleanup
- """
- pass
-
+ self.waitkill()
class TunProtoUDP(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoUDP, self).__init__(local, peer, home_path, key)
- self.listening = listening
-
- def prepare(self):
- pass
-
- def setup(self):
- self.async_launch('udp', False, ("-u",str(self.port)))
- def shutdown(self):
- self.kill()
-
- def destroy(self):
- self.waitkill()
-
- def launch(self, check_proto='udp', listen=False, extra_args=None):
- if extra_args is None:
- extra_args = ("-u",str(self.port))
- super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoUDP, self).launch('udp')
class TunProtoFD(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoFD, self).__init__(local, peer, home_path, key)
- self.listening = listening
-
- def prepare(self):
- pass
-
- def setup(self):
- self.async_launch('fd', False)
- def shutdown(self):
- self.kill()
-
- def destroy(self):
- self.waitkill()
-
- def launch(self, check_proto='fd', listen=False, extra_args=[]):
- super(TunProtoFD, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoFD, self).launch('fd')
class TunProtoGRE(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoGRE, self).__init__(local, peer, home_path, key)
- self.listening = listening
self.mode = 'pl-gre-ip'
-
- def prepare(self):
- pass
-
- def setup(self):
- self.async_launch('gre', False)
-
- def shutdown(self):
- self.kill()
- def destroy(self):
- self.waitkill()
-
- def launch(self, check_proto='gre', listen=False, extra_args=[]):
- super(TunProtoGRE, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoGRE, self).launch('gre')
class TunProtoTCP(TunProtoBase):
- def __init__(self, local, peer, home_path, key, listening):
+ def __init__(self, local, peer, home_path, key):
super(TunProtoTCP, self).__init__(local, peer, home_path, key)
- self.listening = listening
- def prepare(self):
- if self.listening:
- self.async_launch('tcp', True)
-
- def setup(self):
- if not self.listening:
- # make sure our peer is ready
- peer = self.peer()
- if peer and peer.peer_proto_impl:
- peer.peer_proto_impl.async_launch_wait_listening()
-
- if not self._started:
- self.async_launch('tcp', False)
-
- self.checkpid()
-
- def shutdown(self):
- self.kill()
-
- def destroy(self):
- self.waitkill()
-
- def launch(self, check_proto='tcp', listen=None, extra_args=[]):
- if listen is None:
- listen = self.listening
- super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
+ def launch(self):
+ super(TunProtoTCP, self).launch('tcp')
class TapProtoUDP(TunProtoUDP):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoUDP, self).__init__(local, peer, home_path, key)
self.mode = 'pl-tap'
class TapProtoTCP(TunProtoTCP):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoTCP, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoTCP, self).__init__(local, peer, home_path, key)
self.mode = 'pl-tap'
class TapProtoFD(TunProtoFD):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoFD, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoFD, self).__init__(local, peer, home_path, key)
self.mode = 'pl-tap'
class TapProtoGRE(TunProtoGRE):
- def __init__(self, local, peer, home_path, key, listening):
- super(TapProtoGRE, self).__init__(local, peer, home_path, key, listening)
+ def __init__(self, local, peer, home_path, key):
+ super(TapProtoGRE, self).__init__(local, peer, home_path, key)
self.mode = 'pl-gre-eth'
-
-
TUN_PROTO_MAP = {
'tcp' : TunProtoTCP,
'udp' : TunProtoUDP,
'gre' : TapProtoGRE,
}
-
import threading
import errno
import fcntl
+import random
import traceback
import functools
import collections
retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
crypto_mode = False
crypter = None
+
try:
if cipher_key and cipher:
import Crypto.Cipher
if rread is None:
def rread(remote, maxlen, os_read=os.read):
return os_read(remote_fd, maxlen)
-
+
rnonblock = nonblock(remote)
tnonblock = nonblock(tun)
remoteok = True
+
while not TERMINATE:
wset = []
if packetReady(bkbuf):
if e.args[0] == errno.EINTR:
# just retry
continue
+ else:
+ raise
# check for errors
if errs:
try:
for x in xrange(maxbatch):
packet = rread(remote,2000)
+
#rr += 1
if crypto_mode:
#print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
+def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
+ retrydelay = 1.0
+ for i in xrange(30):
+ # TERMINATE is a array. An item can be added to TERMINATE, from
+ # outside this function to force termination of the loop
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ rsock.bind((local_addr, local_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ rsock.bind((local_addr, local_port))
+ print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
+ print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
+ rsock.connect((peer_addr, peer_port))
+ return rsock
def udp_handshake(TERMINATE, rsock):
endme = False
endme = True
keepalive_thread.join()
+def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
+ peer_port)
+ udp_handshake(TERMINATE, rsock)
+ return rsock
+
+def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
+ sock = None
+ retrydelay = 1.0
+ # The peer has a firewall that prevents a response to the connect, we
+ # will be forever blocked in the connect, so we put a reasonable timeout.
+ sock.settimeout(10)
+ # We wait for
+ for i in xrange(30):
+ if stop:
+ break
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ rsock.connect((peer_addr, peer_port))
+ sock = rsock
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ rsock.connect((peer_addr, peer_port))
+ sock = rsock
+ if sock:
+ sock.settimeout(0)
+ return sock
+
+def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
+ sock = None
+ retrydelay = 1.0
+ # We try to bind to the local virtual interface.
+ # It might not exist yet so we wait in a loop.
+ for i in xrange(30):
+ if stop:
+ break
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ lsock.bind((local_addr, local_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ lsock.bind((local_addr, local_port))
+
+ # Now we wait until the other side connects.
+ # The other side might not be ready yet, so we also wait in a loop for timeouts.
+ timeout = 1
+ lsock.listen(1)
+ for i in xrange(30):
+ if TERMINATE:
+ raise OSError, "Killed"
+ rlist, wlist, xlist = select.select([lsock], [], [], timeout)
+ if stop:
+ break
+ if lsock in rlist:
+ sock,raddr = lsock.accept()
+ break
+ timeout += 5
+ return sock
+
+def tcp_handshake(TERMINATE, sock, listen, dice):
+ # we are going to use a barrier algorithm to decide wich side listen.
+ # each side will "roll a dice" and send the resulting value to the other
+ # side.
+ result = None
+ sock.settimeout(5)
+ for i in xrange(100):
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ hand = dice.read()
+ sock.send(hand)
+ peer_hand = sock.recv(1)
+ if hand < peer_hand:
+ if listen:
+ result = sock
+ break
+ elif hand > peer_hand:
+ if not listen:
+ result = sock
+ break
+ else:
+ dice.release()
+ dice.throw()
+ except socket.error:
+ dice.release()
+ break
+ if result:
+ sock.settimeout(0)
+ return result
+
+def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ def listen(stop, result, lsock, dice):
+ lsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
+ if lsock:
+ lsock = tcp_handshake(TERMINATE, lsock, True, dice)
+ if lsock:
+ stop[0] = True
+ result[0] = lsock
+
+ def connect(stop, result, rsock, dice):
+ rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
+ if rsock:
+ rsock = tcp_handshake(TERMINATE, rsock, False, dice)
+ if sock:
+ stop[0] = True
+ result[0] = rsock
+
+ dice = Dice()
+ dice.throw()
+ stop = []
+ result = []
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ connect_thread = threading.Thread(target=connect, args=(stop, result, rsock, dice))
+ listen_thread = threading.Thread(target=listen, args=(stop, result, lsock, dice))
+ connect_thread.start()
+ listen_thread.start()
+ connect_thread.join()
+ listen_thread.join()
+ if not result:
+ raise OSError, "Error: tcp_establish could not establish connection."
+ sock = result[0]
+ if sock == lsock:
+ rsock.close()
+ else:
+ lsock.close()
+ return sock
+
+class Dice(object):
+ def __init__(self):
+ self._condition = threading.Condition(threading.Lock())
+ self._readers = 0
+ self._value = None
+
+ def read(self):
+ self._condition.acquire()
+ try:
+ self._readers += 1
+ finally:
+ self._condition.release()
+ return self._value
+
+ def release(self):
+ self._condition.acquire()
+ try:
+ if self._readers > 0:
+ self._readers -= 1
+ if self._readers == 0:
+ self._condition.notifyAll()
+ finally:
+ self._condition.release()
+
+ def throw(self):
+ self._condition.acquire()
+ try:
+ while self._readers > 0:
+ self._condition.wait()
+ self._value = random.randint(1, 6)
+ finally:
+ self._condition.release()
+
import weakref
import time
-from tunchannel import tun_fwd, udp_handshake
+from tunchannel import tun_fwd, udp_establish, tcp_establish
class TunChannel(object):
"""
tun_key: the agreed upon encryption key.
- listen: if set to True (and in TCP mode), it marks a
- listening endpoint. Be certain that any TCP connection
- is made between a listening and a non-listening
- endpoint, or it won't work.
-
with_pi: set if the incoming packet stream (see tun_socket)
contains PI headers - if so, they will be stripped.
ethernet_mode: set if the incoming packet stream is
composed of ethernet frames (as opposed of IP packets).
- udp: set to use UDP datagrams instead of TCP connections.
-
tun_socket: a socket or file object that can be read
from and written to. Packets will be read when available,
remote packets will be forwarded as writes.
def __init__(self):
# Some operational attributes
- self.listen = False
self.ethernet_mode = True
self.with_pi = False
self._exc = [] # exception store, to relay exceptions from the forwarder thread
self._connected = threading.Event()
self._forwarder_thread = None
-
+
# trace to stderr
self.stderr = sys.stderr
self.tun_cipher,
)
- def Prepare(self):
- if self.tun_proto:
- udp = self.tun_proto == "udp"
- if not udp and self.listen and not self._forwarder_thread:
- if self.listen or (self.peer_addr and self.peer_port and self.peer_proto):
- self._launch()
-
- def Setup(self):
+ def launch(self):
+ # self.tun_proto is only set if the channel is connected
+ # launch has to be a no-op in unconnected channels because
+ # it is called at configuration time, which for cross connections
+ # happens before connection.
if self.tun_proto:
if not self._forwarder_thread:
self._launch()
- def Cleanup(self):
+ def cleanup(self):
if self._forwarder_thread:
- self.Kill()
+ self.kill()
- def Wait(self):
+ def wait(self):
if self._forwarder_thread:
self._connected.wait()
for exc in self._exc:
eTyp, eVal, eLoc = exc
raise eTyp, eVal, eLoc
- def Kill(self):
+ def kill(self):
if self._forwarder_thread:
if not self._terminate:
self._terminate.append(None)
if local_cipher != peer_cipher:
raise RuntimeError, "Peering cipher mismatch: %s != %s" % (local_cipher, peer_cipher)
- udp = local_proto == 'udp'
- listen = self.listen
-
- if (udp or not listen) and (not peer_port or not peer_addr):
+ if not peer_port or not peer_addr:
raise RuntimeError, "Misconfigured peer for: %s" % (self,)
- if (udp or listen) and (not local_port or not local_addr):
+ if not local_port or not local_addr:
raise RuntimeError, "Misconfigured TUN: %s" % (self,)
TERMINATE = self._terminate
cipher_key = self.tun_key
tun = self.tun_socket
-
+ udp = local_proto == 'udp'
+
if not tun:
raise RuntimeError, "Unconnected TUN channel %s" % (self,)
-
- if udp:
- # listen on udp port
- rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
- for i in xrange(30):
- try:
- rsock.bind((local_addr,local_port))
- break
- except socket.error:
- # wait a while, retry
- time.sleep(1)
- else:
- rsock.bind((local_addr,local_port))
- rsock.connect((peer_addr,peer_port))
- udp_handshake(TERMINATE, rsock)
+
+ if local_proto == 'udp':
+ rsock = udp_establish(TERMINATE, local_addr, local_port,
+ peer_addr, peer_port)
remote = os.fdopen(rsock.fileno(), 'r+b', 0)
- elif listen:
- # accept tcp connections
- lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- for i in xrange(30):
- try:
- lsock.bind((local_addr,local_port))
- break
- except socket.error:
- # wait a while, retry
- time.sleep(1)
- else:
- lsock.bind((local_addr,local_port))
- lsock.listen(1)
- rsock,raddr = lsock.accept()
+ elif local_proto == 'tcp':
+ rsock = tcp_establish(TERMINATE, local_addr, local_port,
+ peer_addr, peer_port)
remote = os.fdopen(rsock.fileno(), 'r+b', 0)
else:
- # connect to tcp server
- rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- for i in xrange(30):
- try:
- rsock.connect((peer_addr,peer_port))
- break
- except socket.error:
- # wait a while, retry
- time.sleep(1)
- else:
- rsock.connect((peer_addr,peer_port))
- remote = os.fdopen(rsock.fileno(), 'r+b', 0)
-
+ raise RuntimeError, "Bad protocol for %s: %r" % (self,local_proto)
+
# notify that we're ready
self._connected.set()
if not element.tun_port and element.tun_addr:
element.tun_port = 15000 + int(guid)
- # First-phase setup
- if element.peer_proto:
- # cross tun
- if not element.tun_addr or not element.tun_port:
- listening = True
- elif not element.peer_addr or not element.peer_port:
- listening = True
- else:
- # both have addresses...
- # ...the one with the lesser address listens
- listening = element.tun_addr < element.peer_addr
- element.listen = listening
- element.Prepare()
-
def postconfigure_tunchannel(testbed_instance, guid):
"""
TunChannel preconfiguration.
Should be adequate for most implementations.
"""
element = testbed_instance._elements[guid]
-
- # Second-phase setup
- element.Setup()
-
+
+ element.launch()
def crossconnect_tunchannel_peer_init(proto, testbed_instance, tun_guid, peer_data,
preconfigure_tunchannel = preconfigure_tunchannel):
tun.peer_cipher = peer_data.get("tun_cipher")
tun.tun_key = min(tun.tun_key, peer_data.get("tun_key"))
tun.tun_proto = proto
-
+
preconfigure_tunchannel(testbed_instance, tun_guid)
def crossconnect_tunchannel_peer_compl(proto, testbed_instance, tun_guid, peer_data,
tun.peer_proto = peer_data.get("tun_proto") or proto
tun.peer_port = peer_data.get("tun_port")
tun.peer_cipher = peer_data.get("tun_cipher")
-
+
postconfigure_tunchannel(testbed_instance, tun_guid)
-
-
-def wait_tunchannel(testbed_instance, guid):
+def prestart_tunchannel(testbed_instance, guid):
"""
Wait for the channel forwarder to be up and running.
be certain to start TunChannels before applications that might require them.
"""
element = testbed_instance.elements[guid]
- element.Wait()
+ element.wait()
ns1.connector("protos").connect(arp1.connector("node"))
ns1.connector("protos").connect(icmp1.connector("node"))
ns1if = ns3.create("ns3::FdNetDevice")
- ns1if.enable_trace("FileDescriptorPcapTrace")
+ ns1if.enable_trace("FdPcapTrace")
ns1if.set_attribute_value("label", "ns1if")
ns1tc = ns3.create("ns3::Nepi::TunChannel")
ns1.connector("devs").connect(ns1if.connector("node"))
ns1.connector("protos").connect(arp1.connector("node"))
ns1.connector("protos").connect(icmp1.connector("node"))
ns1if = ns3_desc.create("ns3::FdNetDevice")
- ns1if.enable_trace("FileDescriptorPcapTrace")
+ ns1if.enable_trace("FdPcapTrace")
ns1if.set_attribute_value("label", "ns1if")
ns1.connector("devs").connect(ns1if.connector("node"))
tap1.connector("fd->").connect(ns1if.connector("->fd"))
ns1.connector("protos").connect(arp1.connector("node"))
ns1.connector("protos").connect(icmp1.connector("node"))
ns1if = ns3_desc.create("ns3::FdNetDevice")
- ns1if.enable_trace("FileDescriptorPcapTrace")
+ ns1if.enable_trace("FdPcapTrace")
ns1if.set_attribute_value("label", "ns1if")
ns1.connector("devs").connect(ns1if.connector("node"))
tap1.connector("fd->").connect(ns1if.connector("->fd"))
ns1.connector("protos").connect(arp1.connector("node"))
ns1.connector("protos").connect(icmp1.connector("node"))
ns1if = ns3_desc.create("ns3::FdNetDevice")
- ns1if.enable_trace("FileDescriptorPcapTrace")
+ ns1if.enable_trace("FdPcapTrace")
ns1if.set_attribute_value("label", "ns1if")
ns1.connector("devs").connect(ns1if.connector("node"))
tap0.connector("fd->").connect(ns1if.connector("->fd"))