TUN/TAP filters, initial version, with tests.
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 12 Aug 2011 14:25:51 +0000 (16:25 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Fri, 12 Aug 2011 14:25:51 +0000 (16:25 +0200)
Still not completely right: the acceptance filter seems to be applied twice.
Maybe ingress and egress acceptance filters should be separate.

src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/scripts/plr50.c [new file with mode: 0644]
src/nepi/testbeds/planetlab/scripts/plr50.py [new file with mode: 0644]
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/tunchannel.py
test/testbeds/planetlab/execute.py
tunbench.py

index 9cd1299..ddee991 100644 (file)
@@ -670,3 +670,6 @@ class TestbedController(testbed_impl.TestbedController):
     def _make_ns3_dependency(self, parameters):
         return self._make_generic(parameters, self._app.NS3Dependency)
 
+    def _make_tun_filter(self, parameters):
+        return self._make_generic(parameters, self._interfaces.TunFilter)
+
index 5b4e3a1..251010d 100644 (file)
@@ -10,6 +10,7 @@ import os
 import os.path
 import random
 import ipaddr
+import functools
 
 import tunproto
 
@@ -135,6 +136,9 @@ class TunIface(object):
         # These get initialized when the iface is connected to its node
         self.node = None
         
+        # These get initialized when the iface is connected to any filter
+        self.filter_module = None
+        
         # These get initialized when the iface is configured
         self.external_iface = None
         
@@ -228,6 +232,10 @@ class TunIface(object):
             raise RuntimeError, "Unsupported tunnelling protocol: %s" % (self.peer_proto,)
         if not self.address or not self.netprefix or not self.netmask:
             raise RuntimeError, "Misconfigured %s iface - missing address" % (self._KIND,)
+        if self.filter_module and self.peer_proto not in ('udp','tcp',None):
+            raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (self,)
+        if self.tun_cipher != 'PLAIN' and self.peer_proto not in ('udp','tcp',None):
+            raise RuntimeError, "Miscofnigured TUN: %s - ciphered tunnels only work with udp or tcp links" % (self,)
     
     def _impl_instance(self, home_path, listening):
         impl = self._PROTO_MAP[self.peer_proto](
@@ -462,3 +470,54 @@ class NetPipe(object):
         
         return local_path
     
+class TunFilter(object):
+    def __init__(self, api=None):
+        if not api:
+            api = plcapi.PLCAPI()
+        self._api = api
+        
+        # Attributes
+        self.module = None
+
+        # These get initialised when the filter is connected
+        self.peer_guid = None
+        self.peer_proto = None
+        self.iface_guid = None
+        self.peer = None
+        self.iface = None
+    
+    def _get(what, self):
+        wref = self.iface
+        if wref:
+            wref = wref()
+        if wref:
+            return getattr(wref, what)
+        else:
+            return None
+
+    def _set(what, self, val):
+        wref = self.iface
+        if wref:
+            wref = wref()
+        if wref:
+            setattr(wref, what, val)
+    
+    tun_proto = property(
+        functools.partial(_get, 'tun_proto'),
+        functools.partial(_set, 'tun_proto') )
+    tun_addr = property(
+        functools.partial(_get, 'tun_addr'),
+        functools.partial(_set, 'tun_addr') )
+    tun_port = property(
+        functools.partial(_get, 'tun_port'),
+        functools.partial(_set, 'tun_port') )
+    tun_key = property(
+        functools.partial(_get, 'tun_key'),
+        functools.partial(_set, 'tun_key') )
+    tun_cipher = property(
+        functools.partial(_get, 'tun_cipher'),
+        functools.partial(_set, 'tun_cipher') )
+    
+    del _get
+    del _set
+
index aac4d7b..2e6adb4 100644 (file)
@@ -16,6 +16,7 @@ from nepi.util.constants import ApplicationStatus as AS, \
 import functools
 import os
 import os.path
+import weakref
 
 NODE = "Node"
 NODEIFACE = "NodeInterface"
@@ -27,6 +28,7 @@ NEPIDEPENDENCY = "NepiDependency"
 NS3DEPENDENCY = "NS3Dependency"
 INTERNET = "Internet"
 NETPIPE = "NetPipe"
+TUNFILTER = "TunFilter"
 
 PL_TESTBED_ID = "planetlab"
 
@@ -103,10 +105,42 @@ def connect_tun_iface_peer(proto, testbed_instance, iface_guid, peer_iface_guid)
     iface = testbed_instance._elements[iface_guid]
     peer_iface = testbed_instance._elements[peer_iface_guid]
     iface.peer_iface = peer_iface
+    peer_iface.peer_iface = iface
     iface.peer_proto = \
-    iface.tun_proto = proto
+    iface.tun_proto = \
+    peer_iface.peer_proto = \
+    peer_iface.tun_proto = proto
     iface.tun_key = peer_iface.tun_key
 
+def connect_tun_iface_filter(testbed_instance, iface_guid, filter_guid):
+    iface = testbed_instance._elements[iface_guid]
+    filt = testbed_instance._elements[filter_guid]
+    iface.filter_module = filt
+    filt.iface_guid = iface_guid
+    filt.iface = weakref.ref(iface)
+    if filt.peer_guid:
+        connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid)
+
+def connect_filter_peer(proto, testbed_instance, filter_guid, peer_guid):
+    peer = testbed_instance._elements[peer_guid]
+    filt = testbed_instance._elements[filter_guid]
+    filt.peer_proto = proto
+    filt.peer_guid = peer_guid
+    if filt.iface_guid:
+        connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid)
+
+def connect_filter_filter(proto, testbed_instance, filter_guid, peer_guid):
+    peer = testbed_instance._elements[peer_guid]
+    filt = testbed_instance._elements[filter_guid]
+    filt.peer_proto = proto
+    peer.peer_proto = proto
+    if filt.iface_guid:
+        peer.peer_guid = filt.iface_guid
+    if peer.iface_guid:
+        filt.peer_guid = peer.iface_guid
+    if filt.iface_guid and filt.peer_guid:
+        connect_tun_iface_peer(filt.peer_proto, testbed_instance, filt.iface_guid, filt.peer_guid)
+
 def crossconnect_tun_iface_peer_init(proto, testbed_instance, iface_guid, peer_iface_data):
     iface = testbed_instance._elements[iface_guid]
     iface.peer_iface = None
@@ -133,6 +167,21 @@ def crossconnect_tun_iface_peer_both(proto, testbed_instance, iface_guid, peer_i
     crossconnect_tun_iface_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
     crossconnect_tun_iface_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
 
+def crossconnect_filter_peer_init(proto, testbed_instance, filter_guid, peer_data):
+    filt = testbed_instance._elements[filter_guid]
+    filt.peer_proto = proto
+    crossconnect_tun_iface_peer_init(filt.peer_proto, testbed_instance, filt.iface_guid, peer_data)
+
+def crossconnect_filter_peer_compl(proto, testbed_instance, filter_guid, peer_data):
+    filt = testbed_instance._elements[filter_guid]
+    filt.peer_proto = proto
+    crossconnect_tun_iface_peer_compl(filt.peer_proto, testbed_instance, filt.iface_guid, peer_data)
+
+def crossconnect_filter_peer_both(proto, testbed_instance, filter_guid, peer_data):
+    crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
+    crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
+
+
 def connect_dep(testbed_instance, node_guid, app_guid):
     node = testbed_instance._elements[node_guid]
     app = testbed_instance._elements[app_guid]
@@ -220,6 +269,12 @@ def create_tapiface(testbed_instance, guid):
     
     testbed_instance.elements[guid] = element
 
+def create_tunfilter(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_tun_filter(parameters)
+    testbed_instance.elements[guid] = element
+
+
 def create_application(testbed_instance, guid):
     parameters = testbed_instance._get_parameters(guid)
     element = testbed_instance._make_application(parameters)
@@ -509,6 +564,12 @@ connector_types = dict({
                 "max": 1, 
                 "min": 0
             }),
+    "->fd": dict({
+                "help": "TUN device file descriptor slot", 
+                "name": "->fd",
+                "max": 1, 
+                "min": 0
+            }),
    })
 
 connections = [
@@ -584,6 +645,24 @@ connections = [
         "init_code": functools.partial(connect_tun_iface_peer,"gre"),
         "can_cross": False
     }),
+    dict({
+        "from": (TESTBED_ID, TUNIFACE, "fd->"),
+        "to":   (TESTBED_ID, TUNFILTER, "->fd"),
+        "init_code": connect_tun_iface_filter,
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "tcp"),
+        "to":   (TESTBED_ID, TUNIFACE, "tcp"),
+        "init_code": functools.partial(connect_filter_peer,"tcp"),
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "udp"),
+        "to":   (TESTBED_ID, TUNIFACE, "udp"),
+        "init_code": functools.partial(connect_filter_peer,"udp"),
+        "can_cross": False
+    }),
     dict({
         "from": (TESTBED_ID, TAPIFACE, "tcp"),
         "to":   (TESTBED_ID, TAPIFACE, "tcp"),
@@ -602,6 +681,36 @@ connections = [
         "init_code": functools.partial(connect_tun_iface_peer,"gre"),
         "can_cross": False
     }),
+    dict({
+        "from": (TESTBED_ID, TAPIFACE, "fd->"),
+        "to":   (TESTBED_ID, TUNFILTER, "->fd"),
+        "init_code": connect_tun_iface_filter,
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "tcp"),
+        "to":   (TESTBED_ID, TAPIFACE, "tcp"),
+        "init_code": functools.partial(connect_filter_peer,"tcp"),
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "udp"),
+        "to":   (TESTBED_ID, TAPIFACE, "udp"),
+        "init_code": functools.partial(connect_filter_peer,"udp"),
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "tcp"),
+        "to":   (TESTBED_ID, TUNFILTER, "tcp"),
+        "init_code": functools.partial(connect_filter_filter,"tcp"),
+        "can_cross": False
+    }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "udp"),
+        "to":   (TESTBED_ID, TUNFILTER, "udp"),
+        "init_code": functools.partial(connect_filter_filter,"udp"),
+        "can_cross": False
+    }),
     dict({
         "from": (TESTBED_ID, TUNIFACE, "tcp"),
         "to":   (None, None, "tcp"),
@@ -656,6 +765,20 @@ connections = [
         "compl_code": functools.partial(crossconnect_tun_iface_peer_both,"gre"),
         "can_cross": True
     }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "tcp"),
+        "to":   (None, None, "tcp"),
+        "init_code": functools.partial(crossconnect_filter_peer_init,"tcp"),
+        "compl_code": functools.partial(crossconnect_filter_peer_compl,"tcp"),
+        "can_cross": True
+    }),
+    dict({
+        "from": (TESTBED_ID, TUNFILTER, "udp"),
+        "to":   (None, None, "udp"),
+        "init_code": functools.partial(crossconnect_filter_peer_init,"udp"),
+        "compl_code": functools.partial(crossconnect_filter_peer_compl,"udp"),
+        "can_cross": True
+    }),
 ]
 
 attributes = dict({
@@ -998,6 +1121,13 @@ attributes = dict({
                 "range": (0,60000),
                 "validation_function": validation.is_integer,
             }),
+    "module": dict({
+                "name": "module",
+                "help": "Path to a .c or .py source for a filter module, or a binary .so",
+                "type": Attribute.STRING,
+                "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+                "validation_function": validation.is_string
+            }),
     })
 
 traces = dict({
@@ -1029,7 +1159,7 @@ traces = dict({
               }),
     })
 
-create_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+create_order = [ INTERNET, NODE, NODEIFACE, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 
 configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 
@@ -1104,6 +1234,16 @@ factories_info = dict({
             "connector_types": ["node","udp","tcp","fd->","gre"],
             "tags": [tags.INTERFACE, tags.ALLOW_ADDRESSES],
         }),
+    TUNFILTER: dict({
+            "help": "TUN/TAP stream filter",
+            "category": FC.CATEGORY_CHANNELS,
+            "create_function": create_tunfilter,
+            "box_attributes": [
+                "module",
+                "tun_proto", "tun_addr", "tun_port", "tun_key", "tun_cipher",
+            ],
+            "connector_types": ["->fd","udp","tcp"],
+        }),
     APPLICATION: dict({
             "help": "Generic executable command line application",
             "category": FC.CATEGORY_APPLICATIONS,
diff --git a/src/nepi/testbeds/planetlab/scripts/plr50.c b/src/nepi/testbeds/planetlab/scripts/plr50.c
new file mode 100644 (file)
index 0000000..34d8f9a
--- /dev/null
@@ -0,0 +1,7 @@
+#include <stdlib.h>
+
+int accept_packet(const char* packet, int direction)
+{
+    return (rand() > (RAND_MAX/2));
+}
+
diff --git a/src/nepi/testbeds/planetlab/scripts/plr50.py b/src/nepi/testbeds/planetlab/scripts/plr50.py
new file mode 100644 (file)
index 0000000..fcfa687
--- /dev/null
@@ -0,0 +1,6 @@
+import random
+
+def accept_packet(packet, direction, rng=random.random):
+    return rng() > 0.5
+
+
index 98fc311..3e987ec 100644 (file)
@@ -117,6 +117,45 @@ parser.add_option(
     default = None,
     help = "If specified, packets won't be logged to standard output, "
            "but dumped to a pcap-formatted trace in the specified file. " )
+parser.add_option(
+    "--filter", dest="filter_module", metavar="PATH",
+    default = None,
+    help = "If specified, it should be either a .py or .so module. "
+           "It will be loaded, and all incoming and outgoing packets "
+           "will be routed through it. The module will not be responsible "
+           "for buffering, packet queueing is performed in tun_connect "
+           "already, so it should not concern itself with it. It should "
+           "not, however, block in one direction if the other is congested.\n"
+           "\n"
+           "Modules are expected to have the following methods:\n"
+           "\taccept_packet(packet, direction):\n"
+           "\t\tDecide whether to drop the packet. Direction is 0 for packets "
+               "coming from the local side to the remote, and 1 is for packets "
+               "coming from the remote side to the local. Return a boolean, "
+               "true if the packet is not to be dropped.\n"
+           "\tfilter_init():\n"
+           "\t\tInitializes a filtering pipe (filter_run). It should "
+               "return two file descriptors to use as a bidirectional "
+               "pipe: local and remote. 'local' is where packets from the "
+               "local side will be written to. After filtering, those packets "
+               "should be written to 'remote', where tun_connect will read "
+               "from, and it will forward them to the remote peer. "
+               "Packets from the remote peer will be written to 'remote', "
+               "where the filter is expected to read from, and eventually "
+               "forward them to the local side. If the file descriptors are "
+               "not nonblocking, they will be set to nonblocking. So it's "
+               "better to set them from the start like that.\n"
+           "\tfilter_run(local, remote):\n"
+           "\t\tIf filter_init is provided, it will be called repeatedly, "
+               "in a separate thread until the process is killed. It should "
+               "sleep at most for a second.\n"
+           "\tfilter_close(local, remote):\n"
+           "\t\tCalled then the process is killed, if filter_init was provided. "
+               "It should, among other things, close the file descriptors.\n"
+           "\n"
+           "Python modules are expected to return a tuple in filter_init, "
+           "either of file descriptors or file objects, while native ones "
+           "will receive two int*.\n" )
 
 (options, remaining_args) = parser.parse_args(sys.argv[1:])
 
@@ -425,7 +464,7 @@ def pl_vif_stop(tun_path, tun_name):
     del lock, lockfile
 
 
-def tun_fwd(tun, remote, reconnect = None):
+def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True):
     global TERMINATE
     
     tunqueue = options.vif_txqueuelen or 1000
@@ -443,7 +482,10 @@ def tun_fwd(tun, remote, reconnect = None):
         reconnect = reconnect,
         tunqueue = tunqueue,
         tunkqueue = tunkqueue,
-        cipher = options.cipher
+        cipher = options.cipher,
+        accept_local = accept_local,
+        accept_remote = accept_remote,
+        slowlocal = slowlocal
     )
 
 
@@ -492,6 +534,40 @@ tun_name = options.tun_name
 
 modeinfo = MODEINFO[options.mode]
 
+# Try to load filter module
+filter_thread = None
+if options.filter_module:
+    if options.filter_module.endswith('.py'):
+        sys.path.append(os.path.dirname(options.filter_module))
+        filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
+    elif options.filter_module.endswith('.so'):
+        filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
+    
+    try:
+        accept_packet = filter_module.accept_packet
+    except:
+        accept_packet = None
+    
+    try:
+        _filter_init = filter_module.filter_init
+        filter_run = filter_module.filter_run
+        filter_close = filter_module.filter_close
+        
+        def filter_init():
+            filter_local = ctypes.c_int(0)
+            filter_remote = ctypes.c_int(0)
+            _filter_init(filter_local, filter_remote)
+            return filter_local, filter_remote
+    except:
+        filter_init = None
+        filter_run = None
+        filter_close = None
+else:
+    accept_packet = None
+    filter_init = None
+    filter_run = None
+    filter_close = None
+
 # be careful to roll back stuff on exceptions
 tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
 try:
@@ -518,6 +594,9 @@ try:
     reconnect = None
     
     if options.pass_fd:
+        if accept_packet or filter_init:
+            raise NotImplementedError, "--pass-fd and --filter are not compatible"
+        
         if options.pass_fd.startswith("base64:"):
             options.pass_fd = base64.b64decode(
                 options.pass_fd[len("base64:"):])
@@ -545,13 +624,20 @@ try:
         
         # just wait forever
         def tun_fwd(tun, remote, **kw):
-            while not TERMINATE:
+            global TERMINATE
+            TERM = TERMINATE
+            while not TERM:
                 time.sleep(1)
         remote = None
     elif options.mode.startswith('pl-gre'):
+        if accept_packet or filter_init:
+            raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
+        
         # just wait forever
         def tun_fwd(tun, remote, **kw):
-            while not TERMINATE:
+            global TERMINATE
+            TERM = TERMINATE
+            while not TERM:
                 time.sleep(1)
         remote = remaining_args[0]
     elif options.udp:
@@ -620,6 +706,22 @@ try:
             ["tcpdump","-l","-n","-i",tun_name, "-s", "4096"]
             + ["-w",options.pcap_capture,"-U"] * bool(options.pcap_capture) )
     
+    if filter_init:
+        filter_local, filter_remote = filter_init()
+        
+        def filter_loop():
+            global TERMINATE
+            TERM = TERMINATE
+            run = filter_run
+            local = filter_local
+            remote = filter_remote
+            while not TERM:
+                run(local, remote)
+            filter_close(local, remote)
+            
+        filter_thread = threading.Thread(target=filter_loop)
+        filter_thread.start()
+    
     print >>sys.stderr, "Connected"
     
     # Try to give us high priority
@@ -630,8 +732,43 @@ try:
         # or perhaps there is no os.nice support in the system
         pass
 
-    tun_fwd(tun, remote,
-        reconnect = reconnect)
+    if not filter_init:
+        tun_fwd(tun, remote,
+            reconnect = reconnect,
+            accept_local = accept_packet,
+            accept_remote = accept_packet,
+            slowlocal = True)
+    else:
+        # Hm...
+        # ...ok, we need to:
+        #  1. Forward packets from tun to filter
+        #  2. Forward packets from remote to filter
+        #
+        # 1. needs TUN rate-limiting, while 
+        # 2. needs reconnection
+        #
+        # 1. needs ONLY TUN-side acceptance checks, while
+        # 2. needs ONLY remote-side acceptance checks
+        if isinstance(filter_local, ctypes.c_int):
+            filter_local_fd = filter_local.value
+        else:
+            filter_local_fd = filter_local
+        if isinstance(filter_remote, ctypes.c_int):
+            filter_remote_fd = filter_remote.value
+        else:
+            filter_remote_fd = filter_remote
+
+        def localside():
+            tun_fwd(tun, filter_local_fd,
+                accept_local = accept_packet,
+                slowlocal = True)
+        
+        def remoteside():
+            tun_fwd(filter_remote_fd, remote,
+                reconnect = reconnect,
+                accept_remote = accept_packet,
+                slowlocal = False)
+        
 
 finally:
     try:
@@ -641,6 +778,13 @@ finally:
         pass
     
     # tidy shutdown in every case - swallow exceptions
+    TERMINATE.append(None)
+    
+    if filter_thread:
+        try:
+            filter_thread.join()
+        except:
+            pass
 
     try:
         if tcpdump:
index 9521f38..3f003ba 100644 (file)
@@ -58,7 +58,7 @@ class TunProtoBase(object):
         # they have to be created for deployment
         # Also remove pidfile, if there is one.
         # Old pidfiles from previous runs can be troublesome.
-        cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid" % {
+        cmd = "mkdir -p %(home)s ; rm -f %(home)s/pid %(home)s/*.so" % {
             'home' : server.shell_escape(self.home_path)
         }
         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
@@ -90,6 +90,13 @@ class TunProtoBase(object):
             os.path.join(os.path.dirname(__file__), 'scripts', 'tunalloc.c'),
             re.sub(r"([.]py)[co]$", r'\1', tunchannel.__file__, 1), # pyc/o files are version-specific
         ]
+        if local.filter_module:
+            filter_sources = filter(bool,map(str.strip,local.filter_module.module.split()))
+            filter_module = filter_sources[0]
+            sources.extend(set(filter_sources))
+        else:
+            filter_module = None
+            filter_sources = None
         dest = "%s@%s:%s" % (
             local.node.slicename, local.node.hostname, 
             os.path.join(self.home_path,'.'),)
@@ -118,6 +125,16 @@ class TunProtoBase(object):
             "python setup.py install --install-lib .. && "
             "cd .. "
             
+            + ( " && "
+                "gcc -fPIC -shared %(sources)s -o %(module)s.so " % {
+                   'module' : os.path.basename(filter_module).rsplit('.',1)[0],
+                   'sources' : ' '.join(map(os.path.basename,filter_sources))
+                }
+                
+                if filter_module is not None and filter_module.endswith('.c')
+                else ""
+            )
+            
             + ( " && "
                 "wget -q -c -O python-passfd-src.tar.gz %(passfd_url)s && "
                 "mkdir -p python-passfd && "
@@ -126,7 +143,8 @@ class TunProtoBase(object):
                 "python setup.py build && "
                 "python setup.py install --install-lib .. "
                 
-                if local.tun_proto == "fd" else ""
+                if local.tun_proto == "fd" 
+                else ""
             ) 
           )
         % {
@@ -190,6 +208,16 @@ class TunProtoBase(object):
 
         if check_proto == 'gre' and local_cipher.lower() != 'plain':
             raise RuntimeError, "Misconfigured TUN: %s - GRE tunnels do not support encryption. Got %s, you MUST use PLAIN" % (local, local_cipher,)
+
+        if local.filter_module:
+            if check_proto not in ('udp', 'tcp'):
+                raise RuntimeError, "Miscofnigured TUN: %s - filtered tunnels only work with udp or tcp links" % (local,)
+            filter_module = filter(bool,map(str.strip,local.filter_module.module.split()))
+            filter_module = os.path.join('.',os.path.basename(filter_module[0]))
+            if filter_module.endswith('.c'):
+                filter_module = filter_module.rsplit('.',1)[0] + '.so'
+        else:
+            filter_module = None
         
         args = ["python", "tun_connect.py", 
             "-m", str(self.mode),
@@ -231,6 +259,8 @@ class TunProtoBase(object):
             args.extend(map(str,extra_args))
         if not listen and check_proto != 'fd':
             args.append(str(peer_addr))
+        if filter_module:
+            args.extend(("--filter", filter_module))
 
         self._logger.info("Starting %s", self)
         
@@ -353,6 +383,31 @@ class TunProtoBase(object):
                     self._logger.warn("if_name: Could not get interface name")
         return self._if_name
     
+    def if_alive(self):
+        name = self.if_name
+        if name:
+            local = self.local()
+            for i in xrange(30):
+                (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
+                    "ip show %s >/dev/null 2>&1 && echo ALIVE || echo DEAD" % (name,),
+                    host = local.node.hostname,
+                    port = None,
+                    user = local.node.slicename,
+                    agent = None,
+                    ident_key = local.node.ident_path,
+                    server_key = local.node.server_key
+                    )
+                
+                if proc.wait():
+                    time.sleep(1)
+                    continue
+                
+                if out.strip() == 'DEAD':
+                    return False
+                elif out.strip() == 'ALIVE':
+                    return True
+        return False
+    
     def async_launch(self, check_proto, listen, extra_args=[]):
         if not self._started and not self._launcher:
             self._launcher = threading.Thread(
@@ -450,6 +505,14 @@ class TunProtoBase(object):
                 break
             time.sleep(interval)
             interval = min(30.0, interval * 1.1)
+
+        if self.if_name:
+            for i in xrange(30):
+                if not self.if_alive():
+                    self._logger.info("Device down %s", self)
+                    break
+                time.sleep(interval)
+                interval = min(30.0, interval * 1.1)
     
     _TRACEMAP = {
         # tracename : (remotename, localname)
index 9e34b45..5804832 100644 (file)
@@ -191,7 +191,7 @@ def nonblock(fd):
         return False
 
 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
-        cipher='AES',
+        cipher='AES', accept_local=None, accept_remote=None, slowlocal=True,
         len=len, max=max, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
     crypto_mode = False
@@ -261,6 +261,22 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
         twrite = os.write
         tread = os.read
     
+    if accept_local is not None:
+        def tread(fd, maxlen, _tread=tread, accept=accept_local):
+            packet = _tread(fd, maxlen)
+            if accept(packet, 0):
+                return packet
+            else:
+                return None
+
+    if accept_remote is not None:
+        def rread(fd, maxlen, _rread=rread, accept=accept_remote):
+            packet = _rread(fd, maxlen)
+            if accept(packet, 1):
+                return packet
+            else:
+                return None
+    
     # Limited frame parsing, to preserve packet boundaries.
     # Which is needed, since /dev/net/tun is unbuffered
     maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
@@ -361,11 +377,12 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
                     # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
                     # don't block either (they're always ready to write), so if we flood the device 
                     # we'll have high packet loss.
-                    if not tnonblock or len(bkbuf) < tunhurry or not packetReady(bkbuf):
+                    if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
                         break
                 else:
-                    # Give some time for the kernel to process the packets
-                    time.sleep(0)
+                    if slowlocal:
+                        # Give some time for the kernel to process the packets
+                        time.sleep(0)
             except OSError,e:
                 # This except handles the entire While block on PURPOSE
                 # as an optimization (setting a try/except block is expensive)
@@ -381,6 +398,8 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
             try:
                 while 1:
                     packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
+                    if packet is None:
+                        continue
                     #rt += 1
                     fwbuf.append(packet)
                     
@@ -397,6 +416,8 @@ def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr
                 try:
                     while 1:
                         packet = rread(remote,2000)
+                        if packet is None:
+                            continue
                         #rr += 1
                         
                         if crypto_mode:
index 755bd24..4d27686 100755 (executable)
@@ -23,6 +23,15 @@ class PlanetLabExecuteTestCase(unittest.TestCase):
     
     port_base = 2000 + (os.getpid() % 1000) * 13
     
+    PLR50_PY = os.path.join(
+        os.path.dirname(planetlab.__file__), 
+        'scripts',
+        'plr50.py')
+    PLR50_C = os.path.join(
+        os.path.dirname(planetlab.__file__), 
+        'scripts',
+        'plr50.c')
+    
     def setUp(self):
         self.root_dir = tempfile.mkdtemp()
         self.__class__.port_base = self.port_base + 100
@@ -315,7 +324,7 @@ echo 'OKIDOKI'
         self.assertTrue(netpipe_stats, "Unavailable netpipe stats")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
-    def _pingtest(self, TunClass, ConnectionProto, Cipher):
+    def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None):
         instance = self.make_instance()
         
         instance.defer_create(2, "Node")
@@ -339,18 +348,38 @@ echo 'OKIDOKI'
         instance.defer_add_trace(8, "packets")
         instance.defer_add_address(8, "192.168.2.3", 24, False)
         instance.defer_connect(3, "devs", 8, "node")
-        instance.defer_connect(7, ConnectionProto, 8, ConnectionProto)
         instance.defer_create(9, "Application")
-        instance.defer_create_set(9, "command", "ping -qc1 {#[GUID-8].addr[0].[Address]#}")
+        instance.defer_create_set(9, "command", "ping -qc10 {#[GUID-8].addr[0].[Address]#}")
         instance.defer_add_trace(9, "stdout")
         instance.defer_add_trace(9, "stderr")
         instance.defer_connect(9, "node", 2, "apps")
+        
+        if Filter1:
+            instance.defer_create(10, "TunFilter")
+            instance.defer_create_set(10, "module", Filter1)
+            instance.defer_connect(7, "fd->", 10, "->fd")
+            
+        if Filter2:
+            instance.defer_create(11, "TunFilter")
+            instance.defer_create_set(11, "module", Filter2)
+            instance.defer_connect(8, "fd->", 11, "->fd")
+
+        if Filter1 and Filter2:
+            plr = "[5-9][0-9]"
+        elif Filter1 or Filter2:
+            plr = "[3-9][0-9]"
+        else:
+            plr = "0"
+       
+        instance.defer_connect(
+            (10 if Filter1 else 7), ConnectionProto, 
+            (11 if Filter2 else 8), ConnectionProto)
 
         comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
 
 --- .* ping statistics ---
-1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
-"""
+10 packets transmitted, [0-9]+ received, %s%% packet loss, time \d*ms.*
+""" % (plr,)
 
         try:
             instance.do_setup()
@@ -410,6 +439,22 @@ echo 'OKIDOKI'
     def test_tap_ping_gre(self):
         self._pingtest("TapInterface", "gre", "PLAIN")
 
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_tap_ping_udp_loss1_py(self):
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY)
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_tap_ping_udp_loss2_py(self):
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_PY, self.PLR50_PY)
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_tap_ping_udp_loss1_c(self):
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_C)
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_tap_ping_udp_loss2_c(self):
+        self._pingtest("TapInterface", "udp", "AES", self.PLR50_C, self.PLR50_C)
+
     @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
     def test_nepi_depends(self):
         instance = self.make_instance()
index b8ac97c..7af0f95 100644 (file)
@@ -21,7 +21,13 @@ def rread(remote, maxlen, remote_fd = remote.fileno(), os_read=os.read):
     bytes += len(rv)
     return rv
 
-def test(cipher, passphrase):
+def test(cipher, passphrase, plr=None):
+   if plr:
+        import random
+        def accept(packet, direction, rng=random.random):
+            return rng() > 0.5
+   else:
+        accept = None
    TERMINATE = []
    def stopme():
        time.sleep(100)
@@ -29,7 +35,8 @@ def test(cipher, passphrase):
    t = threading.Thread(target=stopme)
    t.start()
    tunchannel.tun_fwd(tun, remote, True, True, passphrase, True, TERMINATE, None, tunkqueue=500,
-        rwrite = rwrite, rread = rread, cipher=cipher)
+        rwrite = rwrite, rread = rread, cipher=cipher,
+        accept_local = accept, accept_remote = accept)
 
 # Swallow exceptions on decryption
 def decrypt(packet, crypter, super=tunchannel.decrypt):
@@ -39,6 +46,7 @@ def decrypt(packet, crypter, super=tunchannel.decrypt):
         return packet
 tunchannel.decrypt = decrypt
 
+"""
 for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'):
     if cipher is None:
         passphrase = None
@@ -51,5 +59,14 @@ for cipher in (None, 'AES', 'Blowfish', 'DES', 'DES3'):
     pstats.Stats('tunchannel.%s.profile' % cipher).strip_dirs().sort_stats('time').print_stats()
     
     print "Bandwidth (%s): %.4fMb/s" % ( cipher, bytes / 200.0 * 8 / 2**20, )
+"""
+
+bytes = 0
+cProfile.runctx('test(None,None,0.5)',globals(),locals(),'tunchannel.plr.profile')
+
+print "Profile (50% PLR):"
+pstats.Stats('tunchannel.plr.profile').strip_dirs().sort_stats('time').print_stats()
+
+print "Bandwidth (50%% PLR): %.4fMb/s" % ( bytes / 200.0 * 8 / 2**20, )