From: Claudio-Daniel Freire Date: Wed, 17 Aug 2011 17:19:32 +0000 (+0200) Subject: Custom queues and a new and shiny TOS queue X-Git-Tag: nepi-3.0.0~296 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=ed0007db1dc032ac9729d4f71a34d5a86ab1e813;p=nepi.git Custom queues and a new and shiny TOS queue --- diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index 2e6adb46..d357e9b1 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -1128,6 +1128,13 @@ attributes = dict({ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, "validation_function": validation.is_string }), + "args": dict({ + "name": "args", + "help": "Module arguments - comma-separated list of name=value pairs", + "type": Attribute.STRING, + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_string + }), }) traces = dict({ @@ -1235,11 +1242,57 @@ factories_info = dict({ "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES], }), TUNFILTER: dict({ - "help": "TUN/TAP stream filter", + "help": "TUN/TAP stream filter\n\n" + "If specified, it should be either a .py or .so module. " + "It will be loaded, and all incoming and outgoing packets " + "will be routed through it. The filter will not be responsible " + "for buffering, packet queueing is performed in tun_connect " + "already, so it should not concern itself with it. It should " + "not, however, block in one direction if the other is congested.\n" + "\n" + "Modules are expected to have the following methods:\n" + "\tinit(**args)\n" + "\t\tIf arguments are given, this method will be called with the\n" + "\t\tgiven arguments (as keyword args in python modules, or a single\n" + "\taccept_packet(packet, direction):\n" + "\t\tDecide whether to drop the packet. Direction is 0 for packets " + "coming from the local side to the remote, and 1 is for packets " + "coming from the remote side to the local. Return a boolean, " + "true if the packet is not to be dropped.\n" + "\tfilter_init():\n" + "\t\tInitializes a filtering pipe (filter_run). It should " + "return two file descriptors to use as a bidirectional " + "pipe: local and remote. 'local' is where packets from the " + "local side will be written to. After filtering, those packets " + "should be written to 'remote', where tun_connect will read " + "from, and it will forward them to the remote peer. " + "Packets from the remote peer will be written to 'remote', " + "where the filter is expected to read from, and eventually " + "forward them to the local side. If the file descriptors are " + "not nonblocking, they will be set to nonblocking. So it's " + "better to set them from the start like that.\n" + "\tfilter_run(local, remote):\n" + "\t\tIf filter_init is provided, it will be called repeatedly, " + "in a separate thread until the process is killed. It should " + "sleep at most for a second.\n" + "\tfilter_close(local, remote):\n" + "\t\tCalled then the process is killed, if filter_init was provided. " + "It should, among other things, close the file descriptors.\n" + "\n" + "Python modules are expected to return a tuple in filter_init, " + "either of file descriptors or file objects, while native ones " + "will receive two int*.\n" + "\n" + "Python modules can additionally contain a custom queue class " + "that will replace the FIFO used by default. The class should " + "be named 'queueclass' and contain an interface compatible with " + "collections.deque. That is, indexing (especiall for q[0]), " + "bool(q), popleft, appendleft, pop (right), append (right), " + "len(q) and clear.", "category": FC.CATEGORY_CHANNELS, "create_function": create_tunfilter, "box_attributes": [ - "module", + "module", "args", "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher", ], "connector_types": ["->fd","udp","tcp"], diff --git a/src/nepi/testbeds/planetlab/scripts/plr50.c b/src/nepi/testbeds/planetlab/scripts/plr50.c index a6fc4f01..73327837 100644 --- a/src/nepi/testbeds/planetlab/scripts/plr50.c +++ b/src/nepi/testbeds/planetlab/scripts/plr50.c @@ -1,7 +1,15 @@ #include +#include + +static int plr = 50; + +int init(const char* args) +{ + sscanf(args, "plr=%d", &plr); +} int accept_packet(const char* packet, int direction) { - return (direction != 0) || (rand() > (RAND_MAX/2)); + return (direction != 0) || (rand() > (RAND_MAX*100/plr)); } diff --git a/src/nepi/testbeds/planetlab/scripts/plr50.py b/src/nepi/testbeds/planetlab/scripts/plr50.py index 734c2a98..b7dde178 100644 --- a/src/nepi/testbeds/planetlab/scripts/plr50.py +++ b/src/nepi/testbeds/planetlab/scripts/plr50.py @@ -1,8 +1,14 @@ import random +_plr = 0.5 + random.seed(1234) +def init(plr): + global _plr + _plr = float(plr) / 100.0 + def accept_packet(packet, direction, rng=random.random): - return direction or rng() > 0.5 + return direction or rng() > _plr diff --git a/src/nepi/testbeds/planetlab/scripts/tosqueue.py b/src/nepi/testbeds/planetlab/scripts/tosqueue.py new file mode 100644 index 00000000..b7b0463b --- /dev/null +++ b/src/nepi/testbeds/planetlab/scripts/tosqueue.py @@ -0,0 +1,110 @@ +import collections +import itertools + +_size = 1000 + +class TOSQueue(object): + def __init__(self): + self.size = _size + self.queues = collections.defaultdict(collections.deque) + self.retries = collections.deque() + self.len = 0 + + # Prepare collection order + self.order = [ + (precedence << 5) | (thoughput << 3) | (reliability << 2) + for precedence in xrange(7,-1,-1) + for thoughput in (0,1,1) + for reliability in (0,1) + ] + self.cycle = None + self.cyclelen = None + self.cycle_update = True + self.classes = set() + + def __nonzero__(self): + return self.len > 0 + + def __len__(self): + return self.len + + def clear(self): + self.classes.clear() + self.cycle = None + self.cyclelen = None + self.cycle_update = True + self.len = 0 + self.queues.clear() + self.retries = collections.deque() + + def queuefor(self, packet, ord=ord, len=len, classmask=0xEC): + if len(packet) >= 2: + tos = ord(packet[1]) + return (tos & classmask, tos & 0x10) + else: + return (0,0) + + def append(self, packet, len=len): + qi,urgent = self.queuefor(packet) + q = self.queues[qi] + if len(q) < _size: + classes = self.classes + if qi not in classes: + classes.add(qi) + self.cycle_update = True + if urgent: + q.appendleft(packet) + else: + q.append(packet) + self.len += 1 + + def appendleft(self, packet): + self.retries.append(packet) + self.len += 1 + + def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop): + return self.popleft(pop=pop) + + def popleft(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.popleft): + if self.retries: + rv = pop(self.retries) + self.len -= 1 + return rv + + queues = self.queues + classes = self.classes + + if len(classes)==1: + # shortcut for non-tos traffic + rv = pop(queues[iter(classes).next()]) + self.len -= 1 + return rv + + if self.cycle_update: + cycle = filter(classes.__contains__, self.order) + self.cycle = itertools.cycle(cycle) + self.cyclelen = len(cycle) + self.cycle_update = False + + cycle = self.cycle.next + for i in xrange(self.cyclelen): + qi = cycle() + if qi in classes: + q = queues[qi] + if q: + rv = pop(q) + self.len -= 1 + return rv + else: + # Needs to update the cycle + classes.remove(qi) + self.cycle_update = True + else: + raise IndexError, "pop from an empty queue" + +queueclass = TOSQueue + +def init(size): + global _size + _size = size + diff --git a/src/nepi/testbeds/planetlab/scripts/tun_connect.py b/src/nepi/testbeds/planetlab/scripts/tun_connect.py index 28356f7a..b917f52c 100644 --- a/src/nepi/testbeds/planetlab/scripts/tun_connect.py +++ b/src/nepi/testbeds/planetlab/scripts/tun_connect.py @@ -122,12 +122,16 @@ parser.add_option( default = None, help = "If specified, it should be either a .py or .so module. " "It will be loaded, and all incoming and outgoing packets " - "will be routed through it. The module will not be responsible " + "will be routed through it. The filter will not be responsible " "for buffering, packet queueing is performed in tun_connect " "already, so it should not concern itself with it. It should " "not, however, block in one direction if the other is congested.\n" "\n" "Modules are expected to have the following methods:\n" + "\tinit(**args)\n" + "\t\tIf arguments are given, this method will be called with the\n" + "\t\tgiven arguments (as keyword args in python modules, or a single\n" + "\t\tstring in c modules).\n" "\taccept_packet(packet, direction):\n" "\t\tDecide whether to drop the packet. Direction is 0 for packets " "coming from the local side to the remote, and 1 is for packets " @@ -155,7 +159,21 @@ parser.add_option( "\n" "Python modules are expected to return a tuple in filter_init, " "either of file descriptors or file objects, while native ones " - "will receive two int*.\n" ) + "will receive two int*.\n" + "\n" + "Python modules can additionally contain a custom queue class " + "that will replace the FIFO used by default. The class should " + "be named 'queueclass' and contain an interface compatible with " + "collections.deque. That is, indexing (especiall for q[0]), " + "bool(q), popleft, appendleft, pop (right), append (right), " + "len(q) and clear. When using a custom queue, queue size will " + "have no effect, pass an effective queue size to the module " + "by using filter_args" ) +parser.add_option( + "--filter-args", dest="filter_args", metavar="FILE", + default = None, + help = "If specified, packets won't be logged to standard output, " + "but dumped to a pcap-formatted trace in the specified file. " ) (options, remaining_args) = parser.parse_args(sys.argv[1:]) @@ -542,14 +560,29 @@ if options.filter_module: if options.filter_module.endswith('.py'): sys.path.append(os.path.dirname(options.filter_module)) filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0]) + if options.filter_args: + try: + filter_args = dict(map(lambda x:x.split('=',1),options.filter_args.split(','))) + filter_module.init(**filter_args) + except: + pass elif options.filter_module.endswith('.so'): filter_module = ctypes.cdll.LoadLibrary(options.filter_module) - + if options.filter_args: + try: + filter_module.init(options.filter_args) + except: + pass try: accept_packet = filter_module.accept_packet except: accept_packet = None + try: + queueclass = filter_module.queueclass + except: + queueclass = None + try: _filter_init = filter_module.filter_init filter_run = filter_module.filter_run diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index ae0b069e..779476cd 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -217,8 +217,10 @@ class TunProtoBase(object): filter_module = os.path.join('.',os.path.basename(filter_module[0])) if filter_module.endswith('.c'): filter_module = filter_module.rsplit('.',1)[0] + '.so' + filter_args = local.filter_module.args else: filter_module = None + filter_args = None args = ["python", "tun_connect.py", "-m", str(self.mode), @@ -262,6 +264,8 @@ class TunProtoBase(object): args.append(str(peer_addr)) if filter_module: args.extend(("--filter", filter_module)) + if filter_args: + args.extend(("--filter-args", filter_args)) self._logger.info("Starting %s", self) diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index d6221708..5ccba6d8 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -192,7 +192,7 @@ def nonblock(fd): return False def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000, - cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, + cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket, retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ): crypto_mode = False @@ -290,22 +290,25 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr else: return None - # Limited frame parsing, to preserve packet boundaries. - # Which is needed, since /dev/net/tun is unbuffered maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue) tunhurry = max(0,maxbkbuf/2) - fwbuf = collections.deque() - bkbuf = collections.deque() + + if queueclass is None: + queueclass = collections.deque + maxfwbuf = maxbkbuf = 2000000000 + + fwbuf = queueclass() + bkbuf = queueclass() nfwbuf = 0 nbkbuf = 0 - if ether_mode: + if ether_mode or udp: packetReady = bool - pullPacket = collections.deque.popleft - reschedule = collections.deque.appendleft + pullPacket = queueclass.popleft + reschedule = queueclass.appendleft else: packetReady = _packetReady pullPacket = _pullPacket - reschedule = collections.deque.appendleft + reschedule = queueclass.appendleft tunfd = tun.fileno() os_read = os.read os_write = os.write diff --git a/test/testbeds/planetlab/execute.py b/test/testbeds/planetlab/execute.py index 5cf8398f..4790c3f3 100755 --- a/test/testbeds/planetlab/execute.py +++ b/test/testbeds/planetlab/execute.py @@ -348,7 +348,7 @@ echo 'OKIDOKI' self.assertTrue(netpipe_stats, "Unavailable netpipe stats") @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") - def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None): + def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None, Filter1args=None, Filter2args=None): instance = self.make_instance() instance.defer_create(2, "Node") @@ -381,11 +381,15 @@ echo 'OKIDOKI' if Filter1: instance.defer_create(10, "TunFilter") instance.defer_create_set(10, "module", Filter1) + if Filter1args: + instance.defer_create_set(10, "args", Filter1args) instance.defer_connect(7, "fd->", 10, "->fd") if Filter2: instance.defer_create(11, "TunFilter") instance.defer_create_set(11, "module", Filter2) + if Filter2args: + instance.defer_create_set(11, "args", Filter2args) instance.defer_connect(8, "fd->", 11, "->fd") if Filter1 and Filter2: @@ -468,19 +472,19 @@ echo 'OKIDOKI' @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") def test_tap_ping_udp_loss1_py(self): - self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY) + self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, None, "plr=50") @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") def test_tap_ping_udp_loss2_py(self): - self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY) + self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY, "plr=50", "plr=50") @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") def test_tap_ping_udp_loss1_c(self): - self._pingtest("TapInterface", "udp", "AES", self.PLR50_C) + self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, None, "plr=50") @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") def test_tap_ping_udp_loss2_c(self): - self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C) + self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C, "plr=50", "plr=50") @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") def test_nepi_depends(self): diff --git a/tunbench.py b/tunbench.py index 97daba78..46bb7670 100644 --- a/tunbench.py +++ b/tunbench.py @@ -21,13 +21,20 @@ def rread(remote, maxlen, remote_fd = remote.fileno(), os_read=os.read): bytes += len(rv) return rv -def test(cipher, passphrase, plr=None): +def test(cipher, passphrase, plr=None, queuemodule=None): if plr: import random def accept(packet, direction, rng=random.random): return rng() > 0.5 else: accept = None + if queuemodule: + import os, os.path + sys.path.append(os.path.join( + os.path.dirname(__file__), + 'src','nepi','testbeds','planetlab','scripts')) + queuemodule = __import__(queuemodule) + queueclass = queuemodule.queueclass TERMINATE = [] def stopme(): time.sleep(100) @@ -35,7 +42,7 @@ def test(cipher, passphrase, plr=None): t = threading.Thread(target=stopme) t.start() tunchannel.tun_fwd(tun, remote, True, True, passphrase, True, TERMINATE, None, tunkqueue=500, - rwrite = rwrite, rread = rread, cipher=cipher, + rwrite = rwrite, rread = rread, cipher=cipher, queueclass=queueclass, accept_local = accept, accept_remote = accept) # Swallow exceptions on decryption @@ -45,7 +52,7 @@ def decrypt(packet, crypter, super=tunchannel.decrypt): except: return packet tunchannel.decrypt = decrypt - +""" for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'): if cipher is None: passphrase = None @@ -66,5 +73,13 @@ print "Profile (50% PLR):" pstats.Stats('tunchannel.plr.profile').strip_dirs().sort_stats('time').print_stats() print "Bandwidth (50%% PLR): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, ) +""" + +bytes = 0 +cProfile.runctx('test(None,None,None,"tosqueue")',globals(),locals(),'tunchannel.tos.profile') + +print "Profile (TOS):" +pstats.Stats('tunchannel.tos.profile').strip_dirs().sort_stats('time').print_stats() +print "Bandwidth (TOS): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )