"flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
"validation_function": validation.is_string
}),
+ "args": dict({
+ "name": "args",
+ "help": "Module arguments - comma-separated list of name=value pairs",
+ "type": Attribute.STRING,
+ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+ "validation_function": validation.is_string
+ }),
})
traces = dict({
"tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES],
}),
TUNFILTER: dict({
- "help": "TUN/TAP stream filter",
+ "help": "TUN/TAP stream filter\n\n"
+ "If specified, it should be either a .py or .so module. "
+ "It will be loaded, and all incoming and outgoing packets "
+ "will be routed through it. The filter will not be responsible "
+ "for buffering, packet queueing is performed in tun_connect "
+ "already, so it should not concern itself with it. It should "
+ "not, however, block in one direction if the other is congested.\n"
+ "\n"
+ "Modules are expected to have the following methods:\n"
+ "\tinit(**args)\n"
+ "\t\tIf arguments are given, this method will be called with the\n"
+ "\t\tgiven arguments (as keyword args in python modules, or a single\n"
+ "\taccept_packet(packet, direction):\n"
+ "\t\tDecide whether to drop the packet. Direction is 0 for packets "
+ "coming from the local side to the remote, and 1 is for packets "
+ "coming from the remote side to the local. Return a boolean, "
+ "true if the packet is not to be dropped.\n"
+ "\tfilter_init():\n"
+ "\t\tInitializes a filtering pipe (filter_run). It should "
+ "return two file descriptors to use as a bidirectional "
+ "pipe: local and remote. 'local' is where packets from the "
+ "local side will be written to. After filtering, those packets "
+ "should be written to 'remote', where tun_connect will read "
+ "from, and it will forward them to the remote peer. "
+ "Packets from the remote peer will be written to 'remote', "
+ "where the filter is expected to read from, and eventually "
+ "forward them to the local side. If the file descriptors are "
+ "not nonblocking, they will be set to nonblocking. So it's "
+ "better to set them from the start like that.\n"
+ "\tfilter_run(local, remote):\n"
+ "\t\tIf filter_init is provided, it will be called repeatedly, "
+ "in a separate thread until the process is killed. It should "
+ "sleep at most for a second.\n"
+ "\tfilter_close(local, remote):\n"
+ "\t\tCalled then the process is killed, if filter_init was provided. "
+ "It should, among other things, close the file descriptors.\n"
+ "\n"
+ "Python modules are expected to return a tuple in filter_init, "
+ "either of file descriptors or file objects, while native ones "
+ "will receive two int*.\n"
+ "\n"
+ "Python modules can additionally contain a custom queue class "
+ "that will replace the FIFO used by default. The class should "
+ "be named 'queueclass' and contain an interface compatible with "
+ "collections.deque. That is, indexing (especiall for q[0]), "
+ "bool(q), popleft, appendleft, pop (right), append (right), "
+ "len(q) and clear.",
"category": FC.CATEGORY_CHANNELS,
"create_function": create_tunfilter,
"box_attributes": [
- "module",
+ "module", "args",
"tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
],
"connector_types": ["->fd","udp","tcp"],
#include <stdlib.h>
+#include <stdio.h>
+
+static int plr = 50;
+
+int init(const char* args)
+{
+ sscanf(args, "plr=%d", &plr);
+}
int accept_packet(const char* packet, int direction)
{
- return (direction != 0) || (rand() > (RAND_MAX/2));
+ return (direction != 0) || (rand() > (RAND_MAX*100/plr));
}
import random
+_plr = 0.5
+
random.seed(1234)
+def init(plr):
+ global _plr
+ _plr = float(plr) / 100.0
+
def accept_packet(packet, direction, rng=random.random):
- return direction or rng() > 0.5
+ return direction or rng() > _plr
--- /dev/null
+import collections
+import itertools
+
+_size = 1000
+
+class TOSQueue(object):
+ def __init__(self):
+ self.size = _size
+ self.queues = collections.defaultdict(collections.deque)
+ self.retries = collections.deque()
+ self.len = 0
+
+ # Prepare collection order
+ self.order = [
+ (precedence << 5) | (thoughput << 3) | (reliability << 2)
+ for precedence in xrange(7,-1,-1)
+ for thoughput in (0,1,1)
+ for reliability in (0,1)
+ ]
+ self.cycle = None
+ self.cyclelen = None
+ self.cycle_update = True
+ self.classes = set()
+
+ def __nonzero__(self):
+ return self.len > 0
+
+ def __len__(self):
+ return self.len
+
+ def clear(self):
+ self.classes.clear()
+ self.cycle = None
+ self.cyclelen = None
+ self.cycle_update = True
+ self.len = 0
+ self.queues.clear()
+ self.retries = collections.deque()
+
+ def queuefor(self, packet, ord=ord, len=len, classmask=0xEC):
+ if len(packet) >= 2:
+ tos = ord(packet[1])
+ return (tos & classmask, tos & 0x10)
+ else:
+ return (0,0)
+
+ def append(self, packet, len=len):
+ qi,urgent = self.queuefor(packet)
+ q = self.queues[qi]
+ if len(q) < _size:
+ classes = self.classes
+ if qi not in classes:
+ classes.add(qi)
+ self.cycle_update = True
+ if urgent:
+ q.appendleft(packet)
+ else:
+ q.append(packet)
+ self.len += 1
+
+ def appendleft(self, packet):
+ self.retries.append(packet)
+ self.len += 1
+
+ def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
+ return self.popleft(pop=pop)
+
+ def popleft(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.popleft):
+ if self.retries:
+ rv = pop(self.retries)
+ self.len -= 1
+ return rv
+
+ queues = self.queues
+ classes = self.classes
+
+ if len(classes)==1:
+ # shortcut for non-tos traffic
+ rv = pop(queues[iter(classes).next()])
+ self.len -= 1
+ return rv
+
+ if self.cycle_update:
+ cycle = filter(classes.__contains__, self.order)
+ self.cycle = itertools.cycle(cycle)
+ self.cyclelen = len(cycle)
+ self.cycle_update = False
+
+ cycle = self.cycle.next
+ for i in xrange(self.cyclelen):
+ qi = cycle()
+ if qi in classes:
+ q = queues[qi]
+ if q:
+ rv = pop(q)
+ self.len -= 1
+ return rv
+ else:
+ # Needs to update the cycle
+ classes.remove(qi)
+ self.cycle_update = True
+ else:
+ raise IndexError, "pop from an empty queue"
+
+queueclass = TOSQueue
+
+def init(size):
+ global _size
+ _size = size
+
default = None,
help = "If specified, it should be either a .py or .so module. "
"It will be loaded, and all incoming and outgoing packets "
- "will be routed through it. The module will not be responsible "
+ "will be routed through it. The filter will not be responsible "
"for buffering, packet queueing is performed in tun_connect "
"already, so it should not concern itself with it. It should "
"not, however, block in one direction if the other is congested.\n"
"\n"
"Modules are expected to have the following methods:\n"
+ "\tinit(**args)\n"
+ "\t\tIf arguments are given, this method will be called with the\n"
+ "\t\tgiven arguments (as keyword args in python modules, or a single\n"
+ "\t\tstring in c modules).\n"
"\taccept_packet(packet, direction):\n"
"\t\tDecide whether to drop the packet. Direction is 0 for packets "
"coming from the local side to the remote, and 1 is for packets "
"\n"
"Python modules are expected to return a tuple in filter_init, "
"either of file descriptors or file objects, while native ones "
- "will receive two int*.\n" )
+ "will receive two int*.\n"
+ "\n"
+ "Python modules can additionally contain a custom queue class "
+ "that will replace the FIFO used by default. The class should "
+ "be named 'queueclass' and contain an interface compatible with "
+ "collections.deque. That is, indexing (especiall for q[0]), "
+ "bool(q), popleft, appendleft, pop (right), append (right), "
+ "len(q) and clear. When using a custom queue, queue size will "
+ "have no effect, pass an effective queue size to the module "
+ "by using filter_args" )
+parser.add_option(
+ "--filter-args", dest="filter_args", metavar="FILE",
+ default = None,
+ help = "If specified, packets won't be logged to standard output, "
+ "but dumped to a pcap-formatted trace in the specified file. " )
(options, remaining_args) = parser.parse_args(sys.argv[1:])
if options.filter_module.endswith('.py'):
sys.path.append(os.path.dirname(options.filter_module))
filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
+ if options.filter_args:
+ try:
+ filter_args = dict(map(lambda x:x.split('=',1),options.filter_args.split(',')))
+ filter_module.init(**filter_args)
+ except:
+ pass
elif options.filter_module.endswith('.so'):
filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
-
+ if options.filter_args:
+ try:
+ filter_module.init(options.filter_args)
+ except:
+ pass
try:
accept_packet = filter_module.accept_packet
except:
accept_packet = None
+ try:
+ queueclass = filter_module.queueclass
+ except:
+ queueclass = None
+
try:
_filter_init = filter_module.filter_init
filter_run = filter_module.filter_run
filter_module = os.path.join('.',os.path.basename(filter_module[0]))
if filter_module.endswith('.c'):
filter_module = filter_module.rsplit('.',1)[0] + '.so'
+ filter_args = local.filter_module.args
else:
filter_module = None
+ filter_args = None
args = ["python", "tun_connect.py",
"-m", str(self.mode),
args.append(str(peer_addr))
if filter_module:
args.extend(("--filter", filter_module))
+ if filter_args:
+ args.extend(("--filter-args", filter_args))
self._logger.info("Starting %s", self)
return False
def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
- cipher='AES', accept_local=None, accept_remote=None, slowlocal=True,
+ cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None,
len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
crypto_mode = False
else:
return None
- # Limited frame parsing, to preserve packet boundaries.
- # Which is needed, since /dev/net/tun is unbuffered
maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
tunhurry = max(0,maxbkbuf/2)
- fwbuf = collections.deque()
- bkbuf = collections.deque()
+
+ if queueclass is None:
+ queueclass = collections.deque
+ maxfwbuf = maxbkbuf = 2000000000
+
+ fwbuf = queueclass()
+ bkbuf = queueclass()
nfwbuf = 0
nbkbuf = 0
- if ether_mode:
+ if ether_mode or udp:
packetReady = bool
- pullPacket = collections.deque.popleft
- reschedule = collections.deque.appendleft
+ pullPacket = queueclass.popleft
+ reschedule = queueclass.appendleft
else:
packetReady = _packetReady
pullPacket = _pullPacket
- reschedule = collections.deque.appendleft
+ reschedule = queueclass.appendleft
tunfd = tun.fileno()
os_read = os.read
os_write = os.write
self.assertTrue(netpipe_stats, "Unavailable netpipe stats")
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
- def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None):
+ def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None, Filter1args=None, Filter2args=None):
instance = self.make_instance()
instance.defer_create(2, "Node")
if Filter1:
instance.defer_create(10, "TunFilter")
instance.defer_create_set(10, "module", Filter1)
+ if Filter1args:
+ instance.defer_create_set(10, "args", Filter1args)
instance.defer_connect(7, "fd->", 10, "->fd")
if Filter2:
instance.defer_create(11, "TunFilter")
instance.defer_create_set(11, "module", Filter2)
+ if Filter2args:
+ instance.defer_create_set(11, "args", Filter2args)
instance.defer_connect(8, "fd->", 11, "->fd")
if Filter1 and Filter2:
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_tap_ping_udp_loss1_py(self):
- self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY)
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, None, "plr=50")
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_tap_ping_udp_loss2_py(self):
- self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY)
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY, "plr=50", "plr=50")
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_tap_ping_udp_loss1_c(self):
- self._pingtest("TapInterface", "udp", "AES", self.PLR50_C)
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, None, "plr=50")
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_tap_ping_udp_loss2_c(self):
- self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C)
+ self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C, "plr=50", "plr=50")
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_nepi_depends(self):
bytes += len(rv)
return rv
-def test(cipher, passphrase, plr=None):
+def test(cipher, passphrase, plr=None, queuemodule=None):
if plr:
import random
def accept(packet, direction, rng=random.random):
return rng() > 0.5
else:
accept = None
+ if queuemodule:
+ import os, os.path
+ sys.path.append(os.path.join(
+ os.path.dirname(__file__),
+ 'src','nepi','testbeds','planetlab','scripts'))
+ queuemodule = __import__(queuemodule)
+ queueclass = queuemodule.queueclass
TERMINATE = []
def stopme():
time.sleep(100)
t = threading.Thread(target=stopme)
t.start()
tunchannel.tun_fwd(tun, remote, True, True, passphrase, True, TERMINATE, None, tunkqueue=500,
- rwrite = rwrite, rread = rread, cipher=cipher,
+ rwrite = rwrite, rread = rread, cipher=cipher, queueclass=queueclass,
accept_local = accept, accept_remote = accept)
# Swallow exceptions on decryption
except:
return packet
tunchannel.decrypt = decrypt
-
+"""
for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'):
if cipher is None:
passphrase = None
pstats.Stats('tunchannel.plr.profile').strip_dirs().sort_stats('time').print_stats()
print "Bandwidth (50%% PLR): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )
+"""
+
+bytes = 0
+cProfile.runctx('test(None,None,None,"tosqueue")',globals(),locals(),'tunchannel.tos.profile')
+
+print "Profile (TOS):"
+pstats.Stats('tunchannel.tos.profile').strip_dirs().sort_stats('time').print_stats()
+print "Bandwidth (TOS): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )