Still not completely right: the acceptance filter seems to be applied twice.
Maybe ingress and egress acceptance filters should be separate.
def _make_ns3_dependency(self, parameters):
return self._make_generic(parameters, self._app.NS3Dependency)
+ def _make_tun_filter(self, parameters):
+ return self._make_generic(parameters, self._interfaces.TunFilter)
+
import os.path
import random
import ipaddr
+import functools
import tunproto
# These get initialized when the iface is connected to its node
self.node = None
+ # These get initialized when the iface is connected to any filter
+ self.filter_module = None
+
# These get initialized when the iface is configured
self.external_iface = None
raise RuntimeError, "Unsupported tunnelling protocol: %s" % (self.peer_proto,)
if not self.address or not self.netprefix or not self.netmask:
raise RuntimeError, "Misconfigured %s iface - missing address" % (self._KIND,)
+ if self.filter_module and self.peer_proto not in ('udp','tcp',None):
+ raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (self,)
+ if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None):
+ raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,)
def _impl_instance(self, home_path, listening):
impl = self._PROTO_MAP[self.peer_proto](
return local_path
+class TunFilter(object):
+ def __init__(self, api=None):
+ if not api:
+ api = plcapi.PLCAPI()
+ self._api = api
+
+ # Attributes
+ self.module = None
+
+ # These get initialised when the filter is connected
+ self.peer_guid = None
+ self.peer_proto = None
+ self.iface_guid = None
+ self.peer = None
+ self.iface = None
+
+ def _get(what, self):
+ wref = self.iface
+ if wref:
+ wref = wref()
+ if wref:
+ return getattr(wref, what)
+ else:
+ return None
+
+ def _set(what, self, val):
+ wref = self.iface
+ if wref:
+ wref = wref()
+ if wref:
+ setattr(wref, what, val)
+
+ tun_proto = property(
+ functools.partial(_get, 'tun_proto'),
+ functools.partial(_set, 'tun_proto') )
+ tun_addr = property(
+ functools.partial(_get, 'tun_addr'),
+ functools.partial(_set, 'tun_addr') )
+ tun_port = property(
+ functools.partial(_get, 'tun_port'),
+ functools.partial(_set, 'tun_port') )
+ tun_key = property(
+ functools.partial(_get, 'tun_key'),
+ functools.partial(_set, 'tun_key') )
+ tun_cipher = property(
+ functools.partial(_get, 'tun_cipher'),
+ functools.partial(_set, 'tun_cipher') )
+
+ del _get
+ del _set
+
import functools
import os
import os.path
+import weakref
NODE = "Node"
NODEIFACE = "NodeInterface"
NS3DEPENDENCY = "NS3Dependency"
INTERNET = "Internet"
NETPIPE = "NetPipe"
+TUNFILTER = "TunFilter"
PL_TESTBED_ID = "planetlab"
iface = testbed_instance._elements[iface_guid]
peer_iface = testbed_instance._elements[peer_iface_guid]
iface.peer_iface = peer_iface
+ peer_iface.peer_iface = iface
iface.peer_proto = \
- iface.tun_proto = proto
+ iface.tun_proto = \
+ peer_iface.peer_proto = \
+ peer_iface.tun_proto = proto
iface.tun_key = peer_iface.tun_key
+def connect_tun_iface_filter(testbed_instance, iface_guid, filter_guid):
+ iface = testbed_instance._elements[iface_guid]
+ filt = testbed_instance._elements[filter_guid]
+ iface.filter_module = filt
+ filt.iface_guid = iface_guid
+ filt.iface = weakref.ref(iface)
+ if filt.peer_guid:
+ connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid)
+
+def connect_filter_peer(proto, testbed_instance, filter_guid, peer_guid):
+ peer = testbed_instance._elements[peer_guid]
+ filt = testbed_instance._elements[filter_guid]
+ filt.peer_proto = proto
+ filt.peer_guid = peer_guid
+ if filt.iface_guid:
+ connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid)
+
+def connect_filter_filter(proto, testbed_instance, filter_guid, peer_guid):
+ peer = testbed_instance._elements[peer_guid]
+ filt = testbed_instance._elements[filter_guid]
+ filt.peer_proto = proto
+ peer.peer_proto = proto
+ if filt.iface_guid:
+ peer.peer_guid = filt.iface_guid
+ if peer.iface_guid:
+ filt.peer_guid = peer.iface_guid
+ if filt.iface_guid and filt.peer_guid:
+ connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid)
+
def crossconnect_tun_iface_peer_init(proto, testbed_instance, iface_guid, peer_iface_data):
iface = testbed_instance._elements[iface_guid]
iface.peer_iface = None
crossconnect_tun_iface_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
crossconnect_tun_iface_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
+def crossconnect_filter_peer_init(proto, testbed_instance, filter_guid, peer_data):
+ filt = testbed_instance._elements[filter_guid]
+ filt.peer_proto = proto
+ crossconnect_tun_iface_peer_init(filt.peer_proto, testbed_instance, filt.iface_guid, peer_data)
+
+def crossconnect_filter_peer_compl(proto, testbed_instance, filter_guid, peer_data):
+ filt = testbed_instance._elements[filter_guid]
+ filt.peer_proto = proto
+ crossconnect_tun_iface_peer_compl(filt.peer_proto, testbed_instance, filt.iface_guid, peer_data)
+
+def crossconnect_filter_peer_both(proto, testbed_instance, filter_guid, peer_data):
+ crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
+ crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
+
+
def connect_dep(testbed_instance, node_guid, app_guid):
node = testbed_instance._elements[node_guid]
app = testbed_instance._elements[app_guid]
testbed_instance.elements[guid] = element
+def create_tunfilter(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_tun_filter(parameters)
+ testbed_instance.elements[guid] = element
+
+
def create_application(testbed_instance, guid):
parameters = testbed_instance._get_parameters(guid)
element = testbed_instance._make_application(parameters)
"max": 1,
"min": 0
}),
+ "->fd": dict({
+ "help": "TUN device file descriptor slot",
+ "name": "->fd",
+ "max": 1,
+ "min": 0
+ }),
})
connections = [
"init_code": functools.partial(connect_tun_iface_peer,"gre"),
"can_cross": False
}),
+ dict({
+ "from": (TESTBED_ID, TUNIFACE, "fd->"),
+ "to": (TESTBED_ID, TUNFILTER, "->fd"),
+ "init_code": connect_tun_iface_filter,
+ "can_cross": False
+ }),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "tcp"),
+ "to": (TESTBED_ID, TUNIFACE, "tcp"),
+ "init_code": functools.partial(connect_filter_peer,"tcp"),
+ "can_cross": False
+ }),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "udp"),
+ "to": (TESTBED_ID, TUNIFACE, "udp"),
+ "init_code": functools.partial(connect_filter_peer,"udp"),
+ "can_cross": False
+ }),
dict({
"from": (TESTBED_ID, TAPIFACE, "tcp"),
"to": (TESTBED_ID, TAPIFACE, "tcp"),
"init_code": functools.partial(connect_tun_iface_peer,"gre"),
"can_cross": False
}),
+ dict({
+ "from": (TESTBED_ID, TAPIFACE, "fd->"),
+ "to": (TESTBED_ID, TUNFILTER, "->fd"),
+ "init_code": connect_tun_iface_filter,
+ "can_cross": False
+ }),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "tcp"),
+ "to": (TESTBED_ID, TAPIFACE, "tcp"),
+ "init_code": functools.partial(connect_filter_peer,"tcp"),
+ "can_cross": False
+ }),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "udp"),
+ "to": (TESTBED_ID, TAPIFACE, "udp"),
+ "init_code": functools.partial(connect_filter_peer,"udp"),
+ "can_cross": False
+ }),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "tcp"),
+ "to": (TESTBED_ID, TUNFILTER, "tcp"),
+ "init_code": functools.partial(connect_filter_filter,"tcp"),
+ "can_cross": False
+ }),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "udp"),
+ "to": (TESTBED_ID, TUNFILTER, "udp"),
+ "init_code": functools.partial(connect_filter_filter,"udp"),
+ "can_cross": False
+ }),
dict({
"from": (TESTBED_ID, TUNIFACE, "tcp"),
"to": (None, None, "tcp"),
"compl_code": functools.partial(crossconnect_tun_iface_peer_both,"gre"),
"can_cross": True
}),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "tcp"),
+ "to": (None, None, "tcp"),
+ "init_code": functools.partial(crossconnect_filter_peer_init,"tcp"),
+ "compl_code": functools.partial(crossconnect_filter_peer_compl,"tcp"),
+ "can_cross": True
+ }),
+ dict({
+ "from": (TESTBED_ID, TUNFILTER, "udp"),
+ "to": (None, None, "udp"),
+ "init_code": functools.partial(crossconnect_filter_peer_init,"udp"),
+ "compl_code": functools.partial(crossconnect_filter_peer_compl,"udp"),
+ "can_cross": True
+ }),
]
attributes = dict({
"range": (0,60000),
"validation_function": validation.is_integer,
}),
+ "module": dict({
+ "name": "module",
+ "help": "Path to a .c or .py source for a filter module, or a binary .so",
+ "type": Attribute.STRING,
+ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+ "validation_function": validation.is_string
+ }),
})
traces = dict({
}),
})
-create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+create_order = [ INTERNET, NODE, NODEIFACE, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
"connector_types": ["node","udp","tcp","fd->","gre"],
"tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES],
}),
+ TUNFILTER: dict({
+ "help": "TUN/TAP stream filter",
+ "category": FC.CATEGORY_CHANNELS,
+ "create_function": create_tunfilter,
+ "box_attributes": [
+ "module",
+ "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
+ ],
+ "connector_types": ["->fd","udp","tcp"],
+ }),
APPLICATION: dict({
"help": "Generic executable command line application",
"category": FC.CATEGORY_APPLICATIONS,
--- /dev/null
+#include <stdlib.h>
+
+int accept_packet(const char* packet, int direction)
+{
+ return (rand() > (RAND_MAX/2));
+}
+
--- /dev/null
+import random
+
+def accept_packet(packet, direction, rng=random.random):
+ return rng() > 0.5
+
+
default = None,
help = "If specified, packets won't be logged to standard output, "
"but dumped to a pcap-formatted trace in the specified file. " )
+parser.add_option(
+ "--filter", dest="filter_module", metavar="PATH",
+ default = None,
+ help = "If specified, it should be either a .py or .so module. "
+ "It will be loaded, and all incoming and outgoing packets "
+ "will be routed through it. The module will not be responsible "
+ "for buffering, packet queueing is performed in tun_connect "
+ "already, so it should not concern itself with it. It should "
+ "not, however, block in one direction if the other is congested.\n"
+ "\n"
+ "Modules are expected to have the following methods:\n"
+ "\taccept_packet(packet, direction):\n"
+ "\t\tDecide whether to drop the packet. Direction is 0 for packets "
+ "coming from the local side to the remote, and 1 is for packets "
+ "coming from the remote side to the local. Return a boolean, "
+ "true if the packet is not to be dropped.\n"
+ "\tfilter_init():\n"
+ "\t\tInitializes a filtering pipe (filter_run). It should "
+ "return two file descriptors to use as a bidirectional "
+ "pipe: local and remote. 'local' is where packets from the "
+ "local side will be written to. After filtering, those packets "
+ "should be written to 'remote', where tun_connect will read "
+ "from, and it will forward them to the remote peer. "
+ "Packets from the remote peer will be written to 'remote', "
+ "where the filter is expected to read from, and eventually "
+ "forward them to the local side. If the file descriptors are "
+ "not nonblocking, they will be set to nonblocking. So it's "
+ "better to set them from the start like that.\n"
+ "\tfilter_run(local, remote):\n"
+ "\t\tIf filter_init is provided, it will be called repeatedly, "
+ "in a separate thread until the process is killed. It should "
+ "sleep at most for a second.\n"
+ "\tfilter_close(local, remote):\n"
+ "\t\tCalled then the process is killed, if filter_init was provided. "
+ "It should, among other things, close the file descriptors.\n"
+ "\n"
+ "Python modules are expected to return a tuple in filter_init, "
+ "either of file descriptors or file objects, while native ones "
+ "will receive two int*.\n" )
(options, remaining_args) = parser.parse_args(sys.argv[1:])
del lock, lockfile
-def tun_fwd(tun, remote, reconnect = None):
+def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True):
global TERMINATE
tunqueue = options.vif_txqueuelen or 1000
reconnect = reconnect,
tunqueue = tunqueue,
tunkqueue = tunkqueue,
- cipher = options.cipher
+ cipher = options.cipher,
+ accept_local = accept_local,
+ accept_remote = accept_remote,
+ slowlocal = slowlocal
)
modeinfo = MODEINFO[options.mode]
+# Try to load filter module
+filter_thread = None
+if options.filter_module:
+ if options.filter_module.endswith('.py'):
+ sys.path.append(os.path.dirname(options.filter_module))
+ filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
+ elif options.filter_module.endswith('.so'):
+ filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
+
+ try:
+ accept_packet = filter_module.accept_packet
+ except:
+ accept_packet = None
+
+ try:
+ _filter_init = filter_module.filter_init
+ filter_run = filter_module.filter_run
+ filter_close = filter_module.filter_close
+
+ def filter_init():
+ filter_local = ctypes.c_int(0)
+ filter_remote = ctypes.c_int(0)
+ _filter_init(filter_local, filter_remote)
+ return filter_local, filter_remote
+ except:
+ filter_init = None
+ filter_run = None
+ filter_close = None
+else:
+ accept_packet = None
+ filter_init = None
+ filter_run = None
+ filter_close = None
+
# be careful to roll back stuff on exceptions
tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
try:
reconnect = None
if options.pass_fd:
+ if accept_packet or filter_init:
+ raise NotImplementedError, "--pass-fd and --filter are not compatible"
+
if options.pass_fd.startswith("base64:"):
options.pass_fd = base64.b64decode(
options.pass_fd[len("base64:"):])
# just wait forever
def tun_fwd(tun, remote, **kw):
- while not TERMINATE:
+ global TERMINATE
+ TERM = TERMINATE
+ while not TERM:
time.sleep(1)
remote = None
elif options.mode.startswith('pl-gre'):
+ if accept_packet or filter_init:
+ raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
+
# just wait forever
def tun_fwd(tun, remote, **kw):
- while not TERMINATE:
+ global TERMINATE
+ TERM = TERMINATE
+ while not TERM:
time.sleep(1)
remote = remaining_args[0]
elif options.udp:
["tcpdump","-l","-n","-i",tun_name, "-s", "4096"]
+ ["-w",options.pcap_capture,"-U"] * bool(options.pcap_capture) )
+ if filter_init:
+ filter_local, filter_remote = filter_init()
+
+ def filter_loop():
+ global TERMINATE
+ TERM = TERMINATE
+ run = filter_run
+ local = filter_local
+ remote = filter_remote
+ while not TERM:
+ run(local, remote)
+ filter_close(local, remote)
+
+ filter_thread = threading.Thread(target=filter_loop)
+ filter_thread.start()
+
print >>sys.stderr, "Connected"
# Try to give us high priority
# or perhaps there is no os.nice support in the system
pass
- tun_fwd(tun, remote,
- reconnect = reconnect)
+ if not filter_init:
+ tun_fwd(tun, remote,
+ reconnect = reconnect,
+ accept_local = accept_packet,
+ accept_remote = accept_packet,
+ slowlocal = True)
+ else:
+ # Hm...
+ # ...ok, we need to:
+ # 1. Forward packets from tun to filter
+ # 2. Forward packets from remote to filter
+ #
+ # 1. needs TUN rate-limiting, while
+ # 2. needs reconnection
+ #
+ # 1. needs ONLY TUN-side acceptance checks, while
+ # 2. needs ONLY remote-side acceptance checks
+ if isinstance(filter_local, ctypes.c_int):
+ filter_local_fd = filter_local.value
+ else:
+ filter_local_fd = filter_local
+ if isinstance(filter_remote, ctypes.c_int):
+ filter_remote_fd = filter_remote.value
+ else:
+ filter_remote_fd = filter_remote
+
+ def localside():
+ tun_fwd(tun, filter_local_fd,
+ accept_local = accept_packet,
+ slowlocal = True)
+
+ def remoteside():
+ tun_fwd(filter_remote_fd, remote,
+ reconnect = reconnect,
+ accept_remote = accept_packet,
+ slowlocal = False)
+
finally:
try:
pass
# tidy shutdown in every case - swallow exceptions
+ TERMINATE.append(None)
+
+ if filter_thread:
+ try:
+ filter_thread.join()
+ except:
+ pass
try:
if tcpdump:
# they have to be created for deployment
# Also remove pidfile, if there is one.
# Old pidfiles from previous runs can be troublesome.
- cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
+ cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
'home' : server.shell_escape(self.home_path)
}
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
]
+ if local.filter_module:
+ filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
+ filter_module = filter_sources[0]
+ sources.extend(set(filter_sources))
+ else:
+ filter_module = None
+ filter_sources = None
dest = "%s@%s:%s" % (
local.node.slicename, local.node.hostname,
os.path.join(self.home_path,'.'),)
"python setup.py install --install-lib .. && "
"cd .. "
+ + ( " && "
+ "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
+ 'module' : os.path.basename(filter_module).rsplit('.',1)[0],
+ 'sources' : ' '.join(map(os.path.basename,filter_sources))
+ }
+
+ if filter_module is not None and filter_module.endswith('.c')
+ else ""
+ )
+
+ ( " && "
"wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
"mkdir -p python-passfd && "
"python setup.py build && "
"python setup.py install --install-lib .. "
- if local.tun_proto == "fd" else ""
+ if local.tun_proto == "fd"
+ else ""
)
)
% {
if check_proto == 'gre' and local_cipher.lower() != 'plain':
raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
+
+ if local.filter_module:
+ if check_proto not in ('udp', 'tcp'):
+ raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
+ filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
+ filter_module = os.path.join('.',os.path.basename(filter_module[0]))
+ if filter_module.endswith('.c'):
+ filter_module = filter_module.rsplit('.',1)[0] + '.so'
+ else:
+ filter_module = None
args = ["python", "tun_connect.py",
"-m", str(self.mode),
args.extend(map(str,extra_args))
if not listen and check_proto != 'fd':
args.append(str(peer_addr))
+ if filter_module:
+ args.extend(("--filter", filter_module))
self._logger.info("Starting %s", self)
self._logger.warn("if_name: Could not get interface name")
return self._if_name
+ def if_alive(self):
+ name = self.if_name
+ if name:
+ local = self.local()
+ for i in xrange(30):
+ (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
+ "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
+ host = local.node.hostname,
+ port = None,
+ user = local.node.slicename,
+ agent = None,
+ ident_key = local.node.ident_path,
+ server_key = local.node.server_key
+ )
+
+ if proc.wait():
+ time.sleep(1)
+ continue
+
+ if out.strip() == 'DEAD':
+ return False
+ elif out.strip() == 'ALIVE':
+ return True
+ return False
+
def async_launch(self, check_proto, listen, extra_args=[]):
if not self._started and not self._launcher:
self._launcher = threading.Thread(
break
time.sleep(interval)
interval = min(30.0, interval * 1.1)
+
+ if self.if_name:
+ for i in xrange(30):
+ if not self.if_alive():
+ self._logger.info("Device down %s", self)
+ break
+ time.sleep(interval)
+ interval = min(30.0, interval * 1.1)
_TRACEMAP = {
# tracename : (remotename, localname)
return False
def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
- cipher='AES',
+ cipher='AES', accept_local=None, accept_remote=None, slowlocal=True,
len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
crypto_mode = False
twrite = os.write
tread = os.read
+ if accept_local is not None:
+ def tread(fd, maxlen, _tread=tread, accept=accept_local):
+ packet = _tread(fd, maxlen)
+ if accept(packet, 0):
+ return packet
+ else:
+ return None
+
+ if accept_remote is not None:
+ def rread(fd, maxlen, _rread=rread, accept=accept_remote):
+ packet = _rread(fd, maxlen)
+ if accept(packet, 1):
+ return packet
+ else:
+ return None
+
# Limited frame parsing, to preserve packet boundaries.
# Which is needed, since /dev/net/tun is unbuffered
maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
# behind. TUN devices discard packets if their queue is full (tunkqueue), but they
# don't block either (they're always ready to write), so if we flood the device
# we'll have high packet loss.
- if not tnonblock or len(bkbuf) < tunhurry or not packetReady(bkbuf):
+ if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
break
else:
- # Give some time for the kernel to process the packets
- time.sleep(0)
+ if slowlocal:
+ # Give some time for the kernel to process the packets
+ time.sleep(0)
except OSError,e:
# This except handles the entire While block on PURPOSE
# as an optimization (setting a try/except block is expensive)
try:
while 1:
packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
+ if packet is None:
+ continue
#rt += 1
fwbuf.append(packet)
try:
while 1:
packet = rread(remote,2000)
+ if packet is None:
+ continue
#rr += 1
if crypto_mode:
port_base = 2000 + (os.getpid() % 1000) * 13
+ PLR50_PY = os.path.join(
+ os.path.dirname(planetlab.__file__),
+ 'scripts',
+ 'plr50.py')
+ PLR50_C = os.path.join(
+ os.path.dirname(planetlab.__file__),
+ 'scripts',
+ 'plr50.c')
+
def setUp(self):
self.root_dir = tempfile.mkdtemp()
self.__class__.port_base = self.port_base + 100
self.assertTrue(netpipe_stats, "Unavailable netpipe stats")
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
- def _pingtest(self, TunClass, ConnectionProto, Cipher):
+ def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None):
instance = self.make_instance()
instance.defer_create(2, "Node")
instance.defer_add_trace(8, "packets")
instance.defer_add_address(8, "192.168.2.3", 24, False)
instance.defer_connect(3, "devs", 8, "node")
- instance.defer_connect(7, ConnectionProto, 8, ConnectionProto)
instance.defer_create(9, "Application")
- instance.defer_create_set(9, "command", "ping -qc1 {#[GUID-8].addr[0].[Address]#}")
+ instance.defer_create_set(9, "command", "ping -qc10 {#[GUID-8].addr[0].[Address]#}")
instance.defer_add_trace(9, "stdout")
instance.defer_add_trace(9, "stderr")
instance.defer_connect(9, "node", 2, "apps")
+
+ if Filter1:
+ instance.defer_create(10, "TunFilter")
+ instance.defer_create_set(10, "module", Filter1)
+ instance.defer_connect(7, "fd->", 10, "->fd")
+
+ if Filter2:
+ instance.defer_create(11, "TunFilter")
+ instance.defer_create_set(11, "module", Filter2)
+ instance.defer_connect(8, "fd->", 11, "->fd")
+
+ if Filter1 and Filter2:
+ plr = "[5-9][0-9]"
+ elif Filter1 or Filter2:
+ plr = "[3-9][0-9]"
+ else:
+ plr = "0"
+
+ instance.defer_connect(
+ (10 if Filter1 else 7), ConnectionProto,
+ (11 if Filter2 else 8), ConnectionProto)
comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
--- .* ping statistics ---
-1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
-"""
+10 packets transmitted, [0-9]+ received, %s%% packet loss, time \d*ms.*
+""" % (plr,)
try:
instance.do_setup()
def test_tap_ping_gre(self):
self._pingtest("TapInterface", "gre", "PLAIN")
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_tap_ping_udp_loss1_py(self):
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY)
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_tap_ping_udp_loss2_py(self):
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY)
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_tap_ping_udp_loss1_c(self):
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_C)
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_tap_ping_udp_loss2_c(self):
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C)
+
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_nepi_depends(self):
instance = self.make_instance()
bytes += len(rv)
return rv
-def test(cipher, passphrase):
+def test(cipher, passphrase, plr=None):
+ if plr:
+ import random
+ def accept(packet, direction, rng=random.random):
+ return rng() > 0.5
+ else:
+ accept = None
TERMINATE = []
def stopme():
time.sleep(100)
t = threading.Thread(target=stopme)
t.start()
tunchannel.tun_fwd(tun, remote, True, True, passphrase, True, TERMINATE, None, tunkqueue=500,
- rwrite = rwrite, rread = rread, cipher=cipher)
+ rwrite = rwrite, rread = rread, cipher=cipher,
+ accept_local = accept, accept_remote = accept)
# Swallow exceptions on decryption
def decrypt(packet, crypter, super=tunchannel.decrypt):
return packet
tunchannel.decrypt = decrypt
+"""
for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'):
if cipher is None:
passphrase = None
pstats.Stats('tunchannel.%s.profile' % cipher).strip_dirs().sort_stats('time').print_stats()
print "Bandwidth (%s): %.4fMb/s" % ( cipher, bytes / 200.0 * 8 / 2**20, )
+"""
+
+bytes = 0
+cProfile.runctx('test(None,None,0.5)',globals(),locals(),'tunchannel.plr.profile')
+
+print "Profile (50% PLR):"
+pstats.Stats('tunchannel.plr.profile').strip_dirs().sort_stats('time').print_stats()
+
+print "Bandwidth (50%% PLR): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )