Custom queues and a new and shiny TOS queue
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 17 Aug 2011 17:19:32 +0000 (19:19 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Wed, 17 Aug 2011 17:19:32 +0000 (19:19 +0200)
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/scripts/plr50.c
src/nepi/testbeds/planetlab/scripts/plr50.py
src/nepi/testbeds/planetlab/scripts/tosqueue.py [new file with mode: 0644]
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/tunchannel.py
test/testbeds/planetlab/execute.py
tunbench.py

index 2e6adb4..d357e9b 100644 (file)
@@ -1128,6 +1128,13 @@ attributes = dict({
                 "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
                 "validation_function": validation.is_string
             }),
+    "args": dict({
+                "name": "args",
+                "help": "Module arguments - comma-separated list of name=value pairs",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
     })
 
 traces = dict({
@@ -1235,11 +1242,57 @@ factories_info = dict({
             "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES],
         }),
     TUNFILTER: dict({
-            "help": "TUN/TAP stream filter",
+            "help": "TUN/TAP stream filter\n\n"
+                    "If specified, it should be either a .py or .so module. "
+                    "It will be loaded, and all incoming and outgoing packets "
+                    "will be routed through it. The filter will not be responsible "
+                    "for buffering, packet queueing is performed in tun_connect "
+                    "already, so it should not concern itself with it. It should "
+                    "not, however, block in one direction if the other is congested.\n"
+                    "\n"
+                    "Modules are expected to have the following methods:\n"
+                    "\tinit(**args)\n"
+                    "\t\tIf arguments are given, this method will be called with the\n"
+                    "\t\tgiven arguments (as keyword args in python modules, or a single\n"
+                    "\taccept_packet(packet, direction):\n"
+                    "\t\tDecide whether to drop the packet. Direction is 0 for packets "
+                        "coming from the local side to the remote, and 1 is for packets "
+                        "coming from the remote side to the local. Return a boolean, "
+                        "true if the packet is not to be dropped.\n"
+                    "\tfilter_init():\n"
+                    "\t\tInitializes a filtering pipe (filter_run). It should "
+                        "return two file descriptors to use as a bidirectional "
+                        "pipe: local and remote. 'local' is where packets from the "
+                        "local side will be written to. After filtering, those packets "
+                        "should be written to 'remote', where tun_connect will read "
+                        "from, and it will forward them to the remote peer. "
+                        "Packets from the remote peer will be written to 'remote', "
+                        "where the filter is expected to read from, and eventually "
+                        "forward them to the local side. If the file descriptors are "
+                        "not nonblocking, they will be set to nonblocking. So it's "
+                        "better to set them from the start like that.\n"
+                    "\tfilter_run(local, remote):\n"
+                    "\t\tIf filter_init is provided, it will be called repeatedly, "
+                        "in a separate thread until the process is killed. It should "
+                        "sleep at most for a second.\n"
+                    "\tfilter_close(local, remote):\n"
+                    "\t\tCalled then the process is killed, if filter_init was provided. "
+                        "It should, among other things, close the file descriptors.\n"
+                    "\n"
+                    "Python modules are expected to return a tuple in filter_init, "
+                    "either of file descriptors or file objects, while native ones "
+                    "will receive two int*.\n"
+                    "\n"
+                    "Python modules can additionally contain a custom queue class "
+                    "that will replace the FIFO used by default. The class should "
+                    "be named 'queueclass' and contain an interface compatible with "
+                    "collections.deque. That is, indexing (especiall for q[0]), "
+                    "bool(q), popleft, appendleft, pop (right), append (right), "
+                    "len(q) and clear.",
             "category": FC.CATEGORY_CHANNELS,
             "create_function": create_tunfilter,
             "box_attributes": [
-                "module",
+                "module", "args",
                 "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
             ],
             "connector_types": ["->fd","udp","tcp"],
index a6fc4f0..7332783 100644 (file)
@@ -1,7 +1,15 @@
 #include <stdlib.h>
+#include <stdio.h>
+
+static int plr = 50;
+
+int init(const char* args)
+{
+    sscanf(args, "plr=%d", &plr);
+}
 
 int accept_packet(const char* packet, int direction)
 {
-    return (direction != 0) || (rand() > (RAND_MAX/2));
+    return (direction != 0) || (rand() > (RAND_MAX*100/plr));
 }
 
index 734c2a9..b7dde17 100644 (file)
@@ -1,8 +1,14 @@
 import random
 
+_plr = 0.5
+
 random.seed(1234)
 
+def init(plr):
+    global _plr
+    _plr = float(plr) / 100.0
+
 def accept_packet(packet, direction, rng=random.random):
-    return direction or rng() > 0.5
+    return direction or rng() > _plr
 
 
diff --git a/src/nepi/testbeds/planetlab/scripts/tosqueue.py b/src/nepi/testbeds/planetlab/scripts/tosqueue.py
new file mode 100644 (file)
index 0000000..b7b0463
--- /dev/null
@@ -0,0 +1,110 @@
+import collections
+import itertools
+
+_size = 1000
+
+class TOSQueue(object):
+    def __init__(self):
+        self.size = _size
+        self.queues = collections.defaultdict(collections.deque)
+        self.retries = collections.deque()
+        self.len = 0
+        
+        # Prepare collection order
+        self.order = [
+            (precedence << 5) | (thoughput << 3) | (reliability << 2)
+            for precedence in xrange(7,-1,-1) 
+            for thoughput in (0,1,1)
+            for reliability in (0,1)
+        ]
+        self.cycle = None
+        self.cyclelen = None
+        self.cycle_update = True
+        self.classes = set()
+    
+    def __nonzero__(self):
+        return self.len > 0
+    
+    def __len__(self):
+        return self.len
+    
+    def clear(self):
+        self.classes.clear()
+        self.cycle = None
+        self.cyclelen = None
+        self.cycle_update = True
+        self.len = 0
+        self.queues.clear()
+        self.retries = collections.deque()
+    
+    def queuefor(self, packet, ord=ord, len=len, classmask=0xEC):
+        if len(packet) >= 2:
+            tos = ord(packet[1])
+            return (tos & classmask, tos & 0x10)
+        else:
+            return (0,0)
+    
+    def append(self, packet, len=len):
+        qi,urgent = self.queuefor(packet)
+        q = self.queues[qi]
+        if len(q) < _size:
+            classes = self.classes
+            if qi not in classes:
+                classes.add(qi)
+                self.cycle_update = True
+            if urgent:
+                q.appendleft(packet)
+            else:
+                q.append(packet)
+            self.len += 1
+
+    def appendleft(self, packet):
+        self.retries.append(packet)
+        self.len += 1
+    
+    def pop(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.pop):
+        return self.popleft(pop=pop)
+    
+    def popleft(self, xrange=xrange, len=len, iter=iter, pop=collections.deque.popleft):
+        if self.retries:
+            rv = pop(self.retries)
+            self.len -= 1
+            return rv
+        
+        queues = self.queues
+        classes = self.classes
+        
+        if len(classes)==1:
+            # shortcut for non-tos traffic
+            rv = pop(queues[iter(classes).next()])
+            self.len -= 1
+            return rv
+        
+        if self.cycle_update:
+            cycle = filter(classes.__contains__, self.order)
+            self.cycle = itertools.cycle(cycle)
+            self.cyclelen = len(cycle)
+            self.cycle_update = False
+
+        cycle = self.cycle.next
+        for i in xrange(self.cyclelen):
+            qi = cycle()
+            if qi in classes:
+                q = queues[qi]
+                if q:
+                    rv = pop(q)
+                    self.len -= 1
+                    return rv
+                else:
+                    # Needs to update the cycle
+                    classes.remove(qi)
+                    self.cycle_update = True
+        else:
+            raise IndexError, "pop from an empty queue"
+
+queueclass = TOSQueue
+
+def init(size):
+    global _size
+    _size = size
+
index 28356f7..b917f52 100644 (file)
@@ -122,12 +122,16 @@ parser.add_option(
     default = None,
     help = "If specified, it should be either a .py or .so module. "
            "It will be loaded, and all incoming and outgoing packets "
-           "will be routed through it. The module will not be responsible "
+           "will be routed through it. The filter will not be responsible "
            "for buffering, packet queueing is performed in tun_connect "
            "already, so it should not concern itself with it. It should "
            "not, however, block in one direction if the other is congested.\n"
            "\n"
            "Modules are expected to have the following methods:\n"
+           "\tinit(**args)\n"
+           "\t\tIf arguments are given, this method will be called with the\n"
+           "\t\tgiven arguments (as keyword args in python modules, or a single\n"
+           "\t\tstring in c modules).\n"
            "\taccept_packet(packet, direction):\n"
            "\t\tDecide whether to drop the packet. Direction is 0 for packets "
                "coming from the local side to the remote, and 1 is for packets "
@@ -155,7 +159,21 @@ parser.add_option(
            "\n"
            "Python modules are expected to return a tuple in filter_init, "
            "either of file descriptors or file objects, while native ones "
-           "will receive two int*.\n" )
+           "will receive two int*.\n"
+           "\n"
+           "Python modules can additionally contain a custom queue class "
+           "that will replace the FIFO used by default. The class should "
+           "be named 'queueclass' and contain an interface compatible with "
+           "collections.deque. That is, indexing (especiall for q[0]), "
+           "bool(q), popleft, appendleft, pop (right), append (right), "
+           "len(q) and clear. When using a custom queue, queue size will "
+           "have no effect, pass an effective queue size to the module "
+           "by using filter_args" )
+parser.add_option(
+    "--filter-args", dest="filter_args", metavar="FILE",
+    default = None,
+    help = "If specified, packets won't be logged to standard output, "
+           "but dumped to a pcap-formatted trace in the specified file. " )
 
 (options, remaining_args) = parser.parse_args(sys.argv[1:])
 
@@ -542,14 +560,29 @@ if options.filter_module:
     if options.filter_module.endswith('.py'):
         sys.path.append(os.path.dirname(options.filter_module))
         filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
+        if options.filter_args:
+            try:
+                filter_args = dict(map(lambda x:x.split('=',1),options.filter_args.split(',')))
+                filter_module.init(**filter_args)
+            except:
+                pass
     elif options.filter_module.endswith('.so'):
         filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
-    
+        if options.filter_args:
+            try:
+                filter_module.init(options.filter_args)
+            except:
+                pass
     try:
         accept_packet = filter_module.accept_packet
     except:
         accept_packet = None
     
+    try:
+        queueclass = filter_module.queueclass
+    except:
+        queueclass = None
+    
     try:
         _filter_init = filter_module.filter_init
         filter_run = filter_module.filter_run
index ae0b069..779476c 100644 (file)
@@ -217,8 +217,10 @@ class TunProtoBase(object):
             filter_module = os.path.join('.',os.path.basename(filter_module[0]))
             if filter_module.endswith('.c'):
                 filter_module = filter_module.rsplit('.',1)[0] + '.so'
+            filter_args = local.filter_module.args
         else:
             filter_module = None
+            filter_args = None
         
         args = ["python", "tun_connect.py", 
             "-m", str(self.mode),
@@ -262,6 +264,8 @@ class TunProtoBase(object):
             args.append(str(peer_addr))
         if filter_module:
             args.extend(("--filter", filter_module))
+        if filter_args:
+            args.extend(("--filter-args", filter_args))
 
         self._logger.info("Starting %s", self)
         
index d622170..5ccba6d 100644 (file)
@@ -192,7 +192,7 @@ def nonblock(fd):
         return False
 
 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
-        cipher='AES', accept_local=None, accept_remote=None, slowlocal=True,
+        cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None,
         len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
     crypto_mode = False
@@ -290,22 +290,25 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
                 else:
                     return None
     
-    # Limited frame parsing, to preserve packet boundaries.
-    # Which is needed, since /dev/net/tun is unbuffered
     maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
     tunhurry = max(0,maxbkbuf/2)
-    fwbuf = collections.deque()
-    bkbuf = collections.deque()
+    
+    if queueclass is None:
+        queueclass = collections.deque
+        maxfwbuf = maxbkbuf = 2000000000
+    
+    fwbuf = queueclass()
+    bkbuf = queueclass()
     nfwbuf = 0
     nbkbuf = 0
-    if ether_mode:
+    if ether_mode or udp:
         packetReady = bool
-        pullPacket = collections.deque.popleft
-        reschedule = collections.deque.appendleft
+        pullPacket = queueclass.popleft
+        reschedule = queueclass.appendleft
     else:
         packetReady = _packetReady
         pullPacket = _pullPacket
-        reschedule = collections.deque.appendleft
+        reschedule = queueclass.appendleft
     tunfd = tun.fileno()
     os_read = os.read
     os_write = os.write
index 5cf8398..4790c3f 100755 (executable)
@@ -348,7 +348,7 @@ echo 'OKIDOKI'
         self.assertTrue(netpipe_stats, "Unavailable netpipe stats")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
-    def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None):
+    def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None, Filter1args=None, Filter2args=None):
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
@@ -381,11 +381,15 @@ echo 'OKIDOKI'
         if Filter1:
             instance.defer_create(10, "TunFilter")
             instance.defer_create_set(10, "module", Filter1)
+            if Filter1args:
+                instance.defer_create_set(10, "args", Filter1args)
             instance.defer_connect(7, "fd->", 10, "->fd")
             
         if Filter2:
             instance.defer_create(11, "TunFilter")
             instance.defer_create_set(11, "module", Filter2)
+            if Filter2args:
+                instance.defer_create_set(11, "args", Filter2args)
             instance.defer_connect(8, "fd->", 11, "->fd")
 
         if Filter1 and Filter2:
@@ -468,19 +472,19 @@ echo 'OKIDOKI'
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_tap_ping_udp_loss1_py(self):
-        self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY)
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, None, "plr=50")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_tap_ping_udp_loss2_py(self):
-        self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY)
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY, "plr=50", "plr=50")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_tap_ping_udp_loss1_c(self):
-        self._pingtest("TapInterface", "udp", "AES", self.PLR50_C)
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, None, "plr=50")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_tap_ping_udp_loss2_c(self):
-        self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C)
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C, "plr=50", "plr=50")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_nepi_depends(self):
index 97daba7..46bb767 100644 (file)
@@ -21,13 +21,20 @@ def rread(remote, maxlen, remote_fd = remote.fileno(), os_read=os.read):
     bytes += len(rv)
     return rv
 
-def test(cipher, passphrase, plr=None):
+def test(cipher, passphrase, plr=None, queuemodule=None):
    if plr:
         import random
         def accept(packet, direction, rng=random.random):
             return rng() > 0.5
    else:
         accept = None
+   if queuemodule:
+        import os, os.path
+        sys.path.append(os.path.join(
+            os.path.dirname(__file__), 
+            'src','nepi','testbeds','planetlab','scripts'))
+        queuemodule = __import__(queuemodule)
+        queueclass = queuemodule.queueclass
    TERMINATE = []
    def stopme():
        time.sleep(100)
@@ -35,7 +42,7 @@ def test(cipher, passphrase, plr=None):
    t = threading.Thread(target=stopme)
    t.start()
    tunchannel.tun_fwd(tun, remote, True, True, passphrase, True, TERMINATE, None, tunkqueue=500,
-        rwrite = rwrite, rread = rread, cipher=cipher,
+        rwrite = rwrite, rread = rread, cipher=cipher, queueclass=queueclass,
         accept_local = accept, accept_remote = accept)
 
 # Swallow exceptions on decryption
@@ -45,7 +52,7 @@ def decrypt(packet, crypter, super=tunchannel.decrypt):
     except:
         return packet
 tunchannel.decrypt = decrypt
-
+"""
 for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'):
     if cipher is None:
         passphrase = None
@@ -66,5 +73,13 @@ print "Profile (50% PLR):"
 pstats.Stats('tunchannel.plr.profile').strip_dirs().sort_stats('time').print_stats()
 
 print "Bandwidth (50%% PLR): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )
+"""
+
+bytes = 0
+cProfile.runctx('test(None,None,None,"tosqueue")',globals(),locals(),'tunchannel.tos.profile')
+
+print "Profile (TOS):"
+pstats.Stats('tunchannel.tos.profile').strip_dirs().sort_stats('time').print_stats()
 
+print "Bandwidth (TOS): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )