Merge with HEAD, close aly's branch.
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Sun, 4 Sep 2011 17:31:46 +0000 (19:31 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Sun, 4 Sep 2011 17:31:46 +0000 (19:31 +0200)
setup.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/interfaces.py
src/nepi/testbeds/planetlab/metadata.py
src/nepi/testbeds/planetlab/multicast.py [new file with mode: 0644]
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/scripts/mcastfwd.py [new file with mode: 0644]
src/nepi/testbeds/planetlab/scripts/mrouted-3.9.5-pl.patch [new file with mode: 0644]
src/nepi/testbeds/planetlab/scripts/tun_connect.py
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/tunchannel.py

index 80b9299..9e4719a 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -22,6 +22,8 @@ setup(
             "nepi.util.graphtools",
             "nepi.util" ],
         package_dir = {"": "src"},
-        package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"],
+        package_data = {"nepi.testbeds.planetlab" : [
+                                "scripts/*.py", "scripts/*.c", "scripts/*.patch", 
+                        ],
                         "nepi.util" : ["*.tpl"] },
     )
index 90028f6..27ed23d 100644 (file)
@@ -37,10 +37,11 @@ class TestbedController(testbed_impl.TestbedController):
         self.slicename = None
         self._traces = dict()
 
-        import node, interfaces, application
+        import node, interfaces, application, multicast
         self._node = node
         self._interfaces = interfaces
         self._app = application
+        self._multicast = multicast
         
         self._blacklist = set()
         self._just_provisioned = set()
@@ -718,4 +719,13 @@ class TestbedController(testbed_impl.TestbedController):
     def _make_tos_queue_filter(self, parameters):
         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
 
+    def _make_multicast_forwarder(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastForwarder)
+
+    def _make_multicast_announcer(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
+
+    def _make_multicast_router(self, parameters):
+        return self._make_generic(parameters, self._multicast.MulticastRouter)
+
 
index 1f313c1..56720a3 100644 (file)
@@ -139,6 +139,7 @@ class TunIface(object):
         
         # These get initialized when the iface is connected to any filter
         self.filter_module = None
+        self.multicast_forwarder = None
         
         # These get initialized when the iface is configured
         self.external_iface = None
@@ -242,6 +243,7 @@ class TunIface(object):
         impl = self._PROTO_MAP[self.peer_proto](
             self, self.peer_iface, home_path, self.tun_key)
         impl.port = self.tun_port
+        impl.cross_slice = not self.peer_iface or isinstance(self.peer_iface, _CrossIface)
         return impl
     
     def recover(self):
index 7754c0a..f5d72f1 100644 (file)
@@ -31,6 +31,9 @@ NETPIPE = "NetPipe"
 TUNFILTER = "TunFilter"
 CLASSQUEUEFILTER = "ClassQueueFilter"
 TOSQUEUEFILTER = "TosQueueFilter"
+MULTICASTFORWARDER = "MulticastForwarder"
+MULTICASTANNOUNCER = "MulticastAnnouncer"
+MULTICASTROUTER = "MulticastRouter"
 
 TUNFILTERS = (TUNFILTER, CLASSQUEUEFILTER, TOSQUEUEFILTER)
 TAPFILTERS = (TUNFILTER, )
@@ -192,9 +195,9 @@ def crossconnect_filter_peer_both(proto, testbed_instance, filter_guid, peer_dat
     crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
     crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
 
-def connect_dep(testbed_instance, node_guid, app_guid):
-    node = testbed_instance._elements[node_guid]
-    app = testbed_instance._elements[app_guid]
+def connect_dep(testbed_instance, node_guid, app_guid, node=None, app=None):
+    node = node or testbed_instance._elements[node_guid]
+    app = app or testbed_instance._elements[app_guid]
     app.node = node
     
     if app.depends:
@@ -213,6 +216,24 @@ def connect_dep(testbed_instance, node_guid, app_guid):
     if app.rpmFusion:
         node.rpmFusion = True
 
+def connect_forwarder(testbed_instance, node_guid, fwd_guid):
+    node = testbed_instance._elements[node_guid]
+    fwd = testbed_instance._elements[fwd_guid]
+    node.multicast_forwarder = fwd
+    
+    if fwd.router:
+        connect_dep(testbed_instance, node_guid, None, app=fwd.router)
+
+    connect_dep(testbed_instance, node_guid, fwd_guid)
+
+def connect_router(testbed_instance, fwd_guid, router_guid):
+    fwd = testbed_instance._elements[fwd_guid]
+    router = testbed_instance._elements[router_guid]
+    fwd.router = router
+    
+    if fwd.node:
+        connect_dep(testbed_instance, None, router_guid, node=fwd.node)
+
 def connect_node_netpipe(testbed_instance, node_guid, netpipe_guid):
     node = testbed_instance._elements[node_guid]
     netpipe = testbed_instance._elements[netpipe_guid]
@@ -330,6 +351,33 @@ def create_ns3_dependency(testbed_instance, guid):
     
     testbed_instance.elements[guid] = element
 
+def create_multicast_forwarder(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_multicast_forwarder(parameters)
+    
+    # Just inject configuration stuff
+    element.home_path = "nepi-mcfwd-%s" % (guid,)
+    
+    testbed_instance.elements[guid] = element
+
+def create_multicast_announcer(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_multicast_announcer(parameters)
+    
+    # Just inject configuration stuff
+    element.home_path = "nepi-mcann-%s" % (guid,)
+    
+    testbed_instance.elements[guid] = element
+
+def create_multicast_router(testbed_instance, guid):
+    parameters = testbed_instance._get_parameters(guid)
+    element = testbed_instance._make_multicast_router(parameters)
+    
+    # Just inject configuration stuff
+    element.home_path = "nepi-mcrt-%s" % (guid,)
+    
+    testbed_instance.elements[guid] = element
+
 def create_internet(testbed_instance, guid):
     parameters = testbed_instance._get_parameters(guid)
     element = testbed_instance._make_internet(parameters)
@@ -490,6 +538,41 @@ def configure_dependency(testbed_instance, guid):
     # Install stuff
     dep.async_setup()
 
+def configure_announcer(testbed_instance, guid):
+    # Link ifaces
+    fwd = testbed_instance._elements[guid]
+    fwd.ifaces = [ dev
+        for node_guid in testbed_instance.get_connected(guid, "node", "apps")
+        for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
+        for dev in ( testbed_instance._elements.get(dev_guid) ,)
+        if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
+            and dev.multicast ]
+    
+    # Install stuff
+    configure_dependency(testbed_instance, guid)
+
+def configure_forwarder(testbed_instance, guid):
+    configure_announcer(testbed_instance, guid)
+    
+    # Link ifaces to forwarder
+    fwd = testbed_instance._elements[guid]
+    for iface in fwd.ifaces:
+        iface.multicast_forwarder = '/var/run/mcastfwd'
+
+def configure_router(testbed_instance, guid):
+    # Link ifaces
+    rt = testbed_instance._elements[guid]
+    rt.nonifaces = [ dev
+        for fwd_guid in testbed_instance.get_connected(guid, "fwd", "router")
+        for node_guid in testbed_instance.get_connected(fwd_guid, "node", "apps")
+        for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
+        for dev in ( testbed_instance._elements.get(dev_guid) ,)
+        if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
+            and not dev.multicast ]
+    
+    # Install stuff
+    configure_dependency(testbed_instance, guid)
+
 def configure_netpipe(testbed_instance, guid):
     netpipe = testbed_instance._elements[guid]
     
@@ -536,6 +619,18 @@ connector_types = dict({
                 "max": 1, 
                 "min": 1
             }),
+    "router": dict({
+                "help": "Connector to a routing daemon", 
+                "name": "router",
+                "max": 1, 
+                "min": 1
+            }),
+    "fwd": dict({
+                "help": "Forwarder this routing daemon communicates with", 
+                "name": "fwd",
+                "max": 1, 
+                "min": 1
+            }),
     "pipes": dict({
                 "help": "Connector to a NetPipe", 
                 "name": "pipes",
@@ -602,32 +697,32 @@ connections = [
     }),
     dict({
         "from": (TESTBED_ID, NODE, "apps"),
-        "to":   (TESTBED_ID, APPLICATION, "node"),
+        "to":   (TESTBED_ID, (APPLICATION, MULTICASTANNOUNCER), "node"),
         "init_code": connect_dep,
         "can_cross": False
     }),
     dict({
         "from": (TESTBED_ID, NODE, "deps"),
-        "to":   (TESTBED_ID, DEPENDENCY, "node"),
+        "to":   (TESTBED_ID, (DEPENDENCY, NEPIDEPENDENCY, NS3DEPENDENCY), "node"),
         "init_code": connect_dep,
         "can_cross": False
     }),
     dict({
-        "from": (TESTBED_ID, NODE, "deps"),
-        "to":   (TESTBED_ID, NEPIDEPENDENCY, "node"),
-        "init_code": connect_dep,
+        "from": (TESTBED_ID, NODE, "pipes"),
+        "to":   (TESTBED_ID, NETPIPE, "node"),
+        "init_code": connect_node_netpipe,
         "can_cross": False
     }),
     dict({
-        "from": (TESTBED_ID, NODE, "deps"),
-        "to":   (TESTBED_ID, NS3DEPENDENCY, "node"),
-        "init_code": connect_dep,
+        "from": (TESTBED_ID, NODE, "apps"),
+        "to":   (TESTBED_ID, MULTICASTFORWARDER, "node"),
+        "init_code": connect_forwarder,
         "can_cross": False
     }),
     dict({
-        "from": (TESTBED_ID, NODE, "pipes"),
-        "to":   (TESTBED_ID, NETPIPE, "node"),
-        "init_code": connect_node_netpipe,
+        "from": (TESTBED_ID, MULTICASTFORWARDER, "router"),
+        "to":   (TESTBED_ID, MULTICASTROUTER, "fwd"),
+        "init_code": connect_router,
         "can_cross": False
     }),
     dict({
@@ -1167,6 +1262,15 @@ attributes = dict({
                 "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
                 "validation_function": validation.is_string
             }),
+    "routing_algorithm": dict({      
+            "name": "algorithm",
+            "help": "Routing algorithm.",
+            "value": "dvmrp",
+            "type": Attribute.ENUM, 
+            "allowed": ["dvmrp"],
+            "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+            "validation_function": validation.is_enum,
+        }),
     })
 
 traces = dict({
@@ -1206,15 +1310,34 @@ traces = dict({
             }),
     })
 
-create_order = [ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+create_order = [ 
+    INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, 
+    MULTICASTANNOUNCER, MULTICASTFORWARDER, MULTICASTROUTER, 
+    TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, 
+    NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
 
-configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
+configure_order = [ 
+    INTERNET, Parallel(NODE), 
+    NODEIFACE, 
+    Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER), 
+    Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, 
+    Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 
 # Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
-start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
+start_order = [ INTERNET, 
+    NODEIFACE, 
+    Parallel(TAPIFACE), Parallel(TUNIFACE), 
+    Parallel(NODE), NETPIPE, 
+    Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER), 
+    Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
 
 # cleanup order
-shutdown_order = [ Parallel(APPLICATION), Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), NODEIFACE, Parallel(NODE) ]
+shutdown_order = [ 
+    Parallel(APPLICATION), 
+    Parallel(MULTICASTROUTER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTANNOUNCER), 
+    Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), 
+    Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), 
+    NODEIFACE, Parallel(NODE) ]
 
 factories_info = dict({
     NODE: dict({
@@ -1428,6 +1551,51 @@ factories_info = dict({
             "connector_types": ["node"],
             "traces": ["buildlog"],
         }),
+    MULTICASTFORWARDER: dict({
+            "help": "This application installs a userspace packet forwarder "
+                    "that, when connected to a node, filters all packets "
+                    "flowing through multicast-capable virtual interfaces "
+                    "and applies custom-specified routing policies.",
+            "category": FC.CATEGORY_APPLICATIONS,
+            "create_function": create_multicast_forwarder,
+            "preconfigure_function": configure_forwarder,
+            "start_function": start_application,
+            "status_function": status_application,
+            "stop_function": stop_application,
+            "box_attributes": [ ],
+            "connector_types": ["node","router"],
+            "traces": ["buildlog","stderr"],
+        }),
+    MULTICASTANNOUNCER: dict({
+            "help": "This application installs a userspace daemon that "
+                    "monitors multicast membership and announces it on all "
+                    "multicast-capable interfaces.\n"
+                    "This does not usually happen automatically on PlanetLab slivers.",
+            "category": FC.CATEGORY_APPLICATIONS,
+            "create_function": create_multicast_announcer,
+            "preconfigure_function": configure_announcer,
+            "start_function": start_application,
+            "status_function": status_application,
+            "stop_function": stop_application,
+            "box_attributes": [ ],
+            "connector_types": ["node"],
+            "traces": ["buildlog","stderr"],
+        }),
+    MULTICASTROUTER: dict({
+            "help": "This application installs a userspace daemon that "
+                    "monitors multicast membership and announces it on all "
+                    "multicast-capable interfaces.\n"
+                    "This does not usually happen automatically on PlanetLab slivers.",
+            "category": FC.CATEGORY_APPLICATIONS,
+            "create_function": create_multicast_router,
+            "preconfigure_function": configure_router,
+            "start_function": start_application,
+            "status_function": status_application,
+            "stop_function": stop_application,
+            "box_attributes": ["routing_algorithm"],
+            "connector_types": ["fwd"],
+            "traces": ["buildlog","stdout","stderr"],
+        }),
     INTERNET: dict({
             "help": "Internet routing",
             "category": FC.CATEGORY_CHANNELS,
diff --git a/src/nepi/testbeds/planetlab/multicast.py b/src/nepi/testbeds/planetlab/multicast.py
new file mode 100644 (file)
index 0000000..71df64c
--- /dev/null
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from constants import TESTBED_ID
+
+import os
+import os.path
+import sys
+import functools
+
+import nepi.util.server as server
+import nepi.util.ipaddr2 as ipaddr2
+
+import logging
+
+import application
+
+class MulticastForwarder(application.Application):
+    """
+    This application installs a userspace packet forwarder
+    that, when connected to a node, filters all packets
+    flowing through multicast-capable virtual interfaces
+    and applies custom-specified routing policies
+    """
+    def __init__(self, *p, **kw):
+        super(MulticastForwarder, self).__init__(*p, **kw)
+        
+        self.sources = ' '.join([
+            os.path.join( os.path.dirname(__file__),
+                "scripts", "mcastfwd.py" ),
+            ipaddr2.__file__.replace('.pyc','.py').replace('.pyo','.py'),
+        ])
+        
+        self.sudo = True
+        
+        self.depends = "python"
+        
+        # Initialized when connected
+        self.ifaces = []
+        self.router = None
+    
+    def _command_get(self):
+        # canonical representation of dependencies
+        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+        
+        # download rpms and pack into a tar archive
+        return (
+            "python mcastfwd.py %s"
+        ) % ( ' '.join([iface.address for iface in self.ifaces]), )
+    def _command_set(self, value):
+        # ignore
+        return
+    command = property(_command_get, _command_set)
+    
+        
+class MulticastAnnouncer(application.Application):
+    """
+    This application installs a userspace daemon that
+    monitors multicast membership and announces it on all
+    multicast-capable interfaces.
+    This does not usually happen automatically on PlanetLab slivers.
+    """
+    def __init__(self, *p, **kw):
+        super(MulticastAnnouncer, self).__init__(*p, **kw)
+        
+        self.sources = ' '.join([
+            os.path.join( os.path.dirname(__file__),
+                "scripts", "mcastfwd.py" ),
+            ipaddr2.__file__.replace('.pyc','.py').replace('.pyo','.py'),
+        ])
+        
+        self.sudo = True
+        
+        self.depends = "python"
+        
+        self.ifaces = []
+        self.router = None
+    
+    def _command_get(self):
+        # canonical representation of dependencies
+        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+        
+        # download rpms and pack into a tar archive
+        return (
+            "python mcastfwd.py -A %s"
+        ) % ( ' '.join([iface.address for iface in self.ifaces]), )
+    def _command_set(self, value):
+        # ignore
+        return
+    command = property(_command_get, _command_set)
+
+class MulticastRouter(application.Application):
+    """
+    This application installs a userspace daemon that
+    monitors multicast membership and announces it on all
+    multicast-capable interfaces.
+    This does not usually happen automatically on PlanetLab slivers.
+    """
+    ALGORITHM_MAP = {
+        'dvmrp' : {
+            'sources' :
+                ' '.join([
+                    os.path.join( os.path.dirname(__file__),
+                        "scripts", "mrouted-3.9.5-pl.patch" ),
+                ]) ,
+            'depends' : "",
+            'buildDepends' : "byacc gcc make patch",
+            'build' : 
+                "mkdir -p mrouted && "
+                "echo '3a1c1e72c4f6f7334d72df4c50b510d7  mrouted-3.9.5.tar.bz2' > archive_sums.txt && "
+                "wget -q -c -O mrouted-3.9.5.tar.bz2 ftp://ftp.vmlinux.org/pub/People/jocke/mrouted/mrouted-3.9.5.tar.bz2 && "
+                "md5sum -c archive_sums.txt && "
+                "tar xvjf mrouted-3.9.5.tar.bz2 -C mrouted --strip-components=1 && "
+                "cd mrouted && patch -p1 < ${SOURCES}/mrouted-3.9.5-pl.patch && make"
+                ,
+            'install' : "cp mrouted/mrouted ${SOURCES}",
+            'command' : 
+                "while test \\! -e /var/run/mcastrt ; do sleep 1 ; done ; "
+                "echo 'phyint eth0 disable' > ./mrouted.conf ; "
+                "for iface in %(nonifaces)s ; do echo \"phyint $iface disable\" >> ./mrouted.conf ; done ; "
+                "./mrouted -f %(debugbit)s -c ./mrouted.conf"
+                ,
+            'debugbit' : "-dpacket,igmp,routing,interface,pruning,membership,cache",
+        }
+    }
+    
+    def __init__(self, *p, **kw):
+        super(MulticastRouter, self).__init__(*p, **kw)
+        
+        self.algorithm = 'dvmrp'
+        self.sudo = True
+        self.nonifaces = []
+    
+    def _non_set(self, value):
+        # ignore
+        return
+    
+    def _gen_get(attribute, self):
+        return self.ALGORITHM_MAP[self.algorithm][attribute]
+    
+    def _command_get(self):
+        command = self.ALGORITHM_MAP[self.algorithm]['command']
+        debugbit = self.ALGORITHM_MAP[self.algorithm]['debugbit']
+        
+        # download rpms and pack into a tar archive
+        return command % {
+            'nonifaces' : ' '.join([iface.if_name for iface in self.nonifaces]),
+            'debugbit' : (debugbit if self.stderr else ""),
+        }
+    command = property(_command_get, _non_set)
+
+    build = property(functools.partial(_gen_get, "build"), _non_set)
+    install = property(functools.partial(_gen_get, "install"), _non_set)
+    sources = property(functools.partial(_gen_get, "sources"), _non_set)
+    depends = property(functools.partial(_gen_get, "depends"), _non_set)
+    buildDepends = property(functools.partial(_gen_get, "buildDepends"), _non_set)
+
index 5d9320c..5e1445e 100644 (file)
@@ -113,6 +113,9 @@ class Node(object):
         self.rpmFusion = False
         self.env = collections.defaultdict(list)
         
+        # Some special applications - initialized when connected
+        self.multicast_forwarder = None
+        
         # Testbed-derived attributes
         self.slicename = None
         self.ident_path = None
diff --git a/src/nepi/testbeds/planetlab/scripts/mcastfwd.py b/src/nepi/testbeds/planetlab/scripts/mcastfwd.py
new file mode 100644 (file)
index 0000000..77bab0c
--- /dev/null
@@ -0,0 +1,426 @@
+import sys
+
+import signal
+import socket
+import struct
+import optparse
+import threading
+import subprocess
+import re
+import time
+import collections
+import os
+import traceback
+import logging
+
+import ipaddr2
+
+usage = "usage: %prog [options] <enabled-addresses>"
+
+parser = optparse.OptionParser(usage=usage)
+
+parser.add_option(
+    "-d", "--poll-delay", dest="poll_delay", metavar="SECONDS", type="float",
+    default = 1.0,
+    help = "Multicast subscription polling interval")
+parser.add_option(
+    "-D", "--refresh-delay", dest="refresh_delay", metavar="SECONDS", type="float",
+    default = 30.0,
+    help = "Full-refresh interval - time between full IGMP reports")
+parser.add_option(
+    "-p", "--fwd-path", dest="fwd_path", metavar="PATH", 
+    default = "/var/run/mcastfwd",
+    help = "Path of the unix socket in which the program will listen for packets")
+parser.add_option(
+    "-r", "--router-path", dest="mrt_path", metavar="PATH", 
+    default = "/var/run/mcastrt",
+    help = "Path of the unix socket in which the program will listen for routing changes")
+parser.add_option(
+    "-A", "--announce-only", dest="announce_only", action="store_true",
+    default = False,
+    help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.")
+parser.add_option(
+    "-v", "--verbose", dest="verbose", action="store_true",
+    default = False,
+    help = "Path of the unix socket in which the program will listen for routing changes")
+
+(options, remaining_args) = parser.parse_args(sys.argv[1:])
+
+logging.basicConfig(
+    stream=sys.stderr, 
+    level=logging.DEBUG if options.verbose else logging.WARNING)
+
+ETH_P_ALL = 0x00000003
+ETH_P_IP = 0x00000800
+TUNSETIFF = 0x400454ca
+IFF_NO_PI = 0x00001000
+IFF_TAP = 0x00000002
+IFF_TUN = 0x00000001
+IFF_VNET_HDR = 0x00004000
+TUN_PKT_STRIP = 0x00000001
+IFHWADDRLEN = 0x00000006
+IFNAMSIZ = 0x00000010
+IFREQ_SZ = 0x00000028
+FIONREAD = 0x0000541b
+
+class IGMPThread(threading.Thread):
+    def __init__(self, vif_addr, *p, **kw):
+        super(IGMPThread, self).__init__(*p, **kw)
+        
+        vif_addr = vif_addr.strip()
+        self.vif_addr = vif_addr
+        self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
+        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
+            socket.inet_aton(self.vif_addr) )
+        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
+        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+        self._stop = False
+        self.setDaemon(True)
+        
+        # Find tun name
+        proc = subprocess.Popen(['ip','addr','show'],
+            stdout = subprocess.PIPE,
+            stderr = subprocess.STDOUT,
+            stdin = open('/dev/null','r+b') )
+        tun_name = None
+        heading = re.compile(r"\d+:\s*([-a-zA-Z0-9_]+):.*")
+        addr = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}).*")
+        for line in proc.stdout:
+            match = heading.match(line)
+            if match:
+                tun_name = match.group(1)
+            else:
+                match = addr.match(line)
+                if match and match.group(1) == vif_addr:
+                    self.tun_name = tun_name
+                    break
+        else:
+            raise RuntimeError, "Could not find iterface for", vif_addr
+    
+    def run(self):
+        devnull = open('/dev/null','r+b')
+        maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
+        cur_maddr = set()
+        lastfullrefresh = time.time()
+        while not self._stop:
+            # Get current subscriptions
+            proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
+                stdout = subprocess.PIPE,
+                stderr = subprocess.STDOUT,
+                stdin = devnull)
+            new_maddr = set()
+            for line in proc.stdout:
+                match = maddr_re.match(line)
+                if match:
+                    new_maddr.add(match.group(1))
+            proc.wait()
+            
+            # Every now and then, send a full report
+            now = time.time()
+            report_new = new_maddr
+            if (now - lastfullrefresh) <= options.refresh_delay:
+                report_new = report_new - cur_maddr
+            else:
+                lastfullrefresh = now
+            
+            # Report subscriptions
+            for grp in report_new:
+                print >>sys.stderr, "JOINING", grp
+                igmpp = ipaddr2.ipigmp(
+                    self.vif_addr, grp, 1, 0x16, 0, grp, 
+                    noipcksum=True)
+                try:
+                    self.igmp_socket.sendto(igmpp, 0, (grp,0))
+                except:
+                    traceback.print_exc(file=sys.stderr)
+
+            # Notify group leave
+            for grp in cur_maddr - new_maddr:
+                print >>sys.stderr, "LEAVING", grp
+                igmpp = ipaddr2.ipigmp(
+                    self.vif_addr, '224.0.0.2', 1, 0x17, 0, grp, 
+                    noipcksum=True)
+                try:
+                    self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
+                except:
+                    traceback.print_exc(file=sys.stderr)
+
+            cur_maddr = new_maddr
+            
+            time.sleep(options.poll_delay)
+    
+    def stop(self):
+        self._stop = True
+        self.join(1+5*options.poll_delay)
+
+
+class FWDThread(threading.Thread):
+    def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
+        super(FWDThread, self).__init__(*p, **kw)
+        
+        self.in_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+        self.in_socket.bind(options.fwd_path)
+        
+        self.pending = collections.deque()
+        self.maxpending = 1000
+        self.rt_cache = rt_cache
+        self.router_socket = router_socket
+        self.vifs = vifs
+        self.fwd_sockets = {}
+        for fwd_target in remaining_args:
+            fwd_target = socket.inet_aton(fwd_target)
+            fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
+            fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
+            fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target)
+            fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+            self.fwd_sockets[fwd_target] = fwd_socket
+        
+        self._stop = False
+        self.setDaemon(True)
+    
+    def run(self):
+        in_socket = self.in_socket
+        rt_cache = self.rt_cache
+        vifs = self.vifs
+        router_socket = self.router_socket
+        len_ = len
+        ord_ = ord
+        str_ = str
+        pending = self.pending
+        in_socket.settimeout(options.poll_delay)
+        buffer_ = buffer
+        enumerate_ = enumerate
+        fwd_sockets = self.fwd_sockets
+        npending = 0
+        noent = (None,None)
+        
+        while not self._stop:
+            # Get packet
+            try:
+                if pending and npending:
+                    packet = pending.pop()
+                    npending -= 1
+                else:
+                    packet = in_socket.recv(2000)
+            except socket.timeout, e:
+                if pending and not npending:
+                    npending = len_(pending)
+                continue
+            if not packet or len_(packet) < 24:
+                continue
+            
+            fullpacket = packet
+            parent = packet[:4]
+            packet = buffer_(packet,4)
+            
+            if packet[9] == '\x02':
+                # IGMP packet? It's for mrouted
+                self.router_socket.send(packet)
+            elif packet[9] == '\x00':
+                # LOOPING packet, discard
+                continue
+            
+            # To-Do: PIM asserts
+            
+            # Get route
+            addrinfo = packet[12:20]
+            fwd_targets, rparent = rt_cache.get(addrinfo, noent)
+            
+            if fwd_targets is not None and (rparent == '\x00\x00\x00\x00' or rparent == parent):
+                # Forward
+                ttl = ord_(packet[8])
+                tgt_group = (socket.inet_ntoa(addrinfo[4:]),0)
+                print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl,
+                nfwd_targets = len_(fwd_targets)
+                for vifi, vif in vifs.iteritems():
+                    if vifi < nfwd_targets:
+                        ttl_thresh = ord_(fwd_targets[vifi])
+                        if ttl_thresh > 0 and ttl > ttl_thresh:
+                            if vif[4] in fwd_sockets:
+                                print >>sys.stderr, socket.inet_ntoa(vif[4]),
+                                fwd_socket = fwd_sockets[vif[4]]
+                                fwd_socket.sendto(packet, 0, tgt_group)
+                print >>sys.stderr, "."
+            else:
+                # Mark pending
+                if len_(pending) < self.maxpending:
+                    tgt_group = addrinfo[4:]
+                    print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ?"
+                    
+                    pending.append(fullpacket)
+                    
+                    # Notify mrouted by forwarding it with protocol 0
+                    router_socket.send(''.join(
+                        (packet[:9],'\x00',packet[10:]) ))
+    
+    def stop(self):
+        self._stop = True
+        self.join(1+5*options.poll_delay)
+
+
+class RouterThread(threading.Thread):
+    def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
+        super(RouterThread, self).__init__(*p, **kw)
+        
+        self.rt_cache = rt_cache
+        self.vifs = vifs
+        self.router_socket = router_socket
+
+        self._stop = False
+        self.setDaemon(True)
+    
+    def run(self):
+        rt_cache = self.rt_cache
+        vifs = self.vifs
+        addr_vifs = {}
+        router_socket = self.router_socket
+        router_socket.settimeout(options.poll_delay)
+        len_ = len
+        buffer_ = buffer
+        
+        buf = ""
+        
+        MRT_BASE       = 200
+        MRT_ADD_VIF    = MRT_BASE+2    # Add a virtual interface               
+        MRT_DEL_VIF    = MRT_BASE+3    # Delete a virtual interface            
+        MRT_ADD_MFC    = MRT_BASE+4    # Add a multicast forwarding entry      
+        MRT_DEL_MFC = MRT_BASE+5       # Delete a multicast forwarding entry   
+        
+        def cmdhdr(cmd, unpack=struct.unpack, buffer=buffer):
+            op,dlen = unpack('II', buffer(cmd,0,8))
+            cmd = buffer(cmd,8)
+            return op,dlen,cmd
+        def vifctl(data, unpack=struct.unpack):
+            #vifi, flags,threshold,rate_limit,lcl_addr,rmt_addr = unpack('HBBI4s4s', data)
+            return unpack('HBBI4s4s', data)
+        def mfcctl(data, unpack=struct.unpack):
+            #origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = unpack('4s4sH10sIIIi', data)
+            return unpack('4s4sH32sIIIi', data)
+        
+        
+        def add_vif(cmd):
+            vifi = vifctl(cmd)
+            vifs[vifi[0]] = vifi
+            addr_vifs[vifi[4]] = vifi[0]
+            print >>sys.stderr, "Added VIF", vifi
+        def del_vif(cmd):
+            vifi = vifctl(cmd)
+            vifi = vifs[vifi[0]]
+            del addr_vifs[vifi[4]]
+            del vifs[vifi[0]]
+            print >>sys.stderr, "Removed VIF", vifi
+        def add_mfc(cmd):
+            origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
+            if parent in vifs:
+                parent_addr = vifs[parent][4]
+            else:
+                parent_addr = '\x00\x00\x00\x00'
+            addrinfo = origin + mcastgrp
+            rt_cache[addrinfo] = (ttls, parent_addr)
+            print >>sys.stderr, "Added RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))), map(ord,ttls)
+        def del_mfc(cmd):
+            origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
+            if parent in vifs:
+                parent_addr = vifs[parent][4]
+            else:
+                parent_addr = '\x00\x00\x00\x00'
+            addrinfo = origin + mcastgrp
+            del rt_cache[addrinfo]
+            print >>sys.stderr, "Removed RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp)))
+        
+        commands = {
+            MRT_ADD_VIF : add_vif,
+            MRT_DEL_VIF : del_vif,
+            MRT_ADD_MFC : add_mfc,
+            MRT_DEL_MFC : del_mfc,
+        }
+
+        while not self._stop:
+            if len_(buf) < 8 or len_(buf) < (cmdhdr(buf)[1]+8):
+                # Get cmd
+                try:
+                    cmd = router_socket.recv(2000)
+                except socket.timeout, e:
+                    continue
+                if not cmd:
+                    print >>sys.stderr, "PLRT CONNECTION BROKEN"
+                    TERMINATE.append(None)
+                    break
+            
+            if buf:
+                buf += cmd
+                cmd = buf
+            
+            if len_(cmd) < 8:
+                continue
+            
+            op,dlen,data = cmdhdr(cmd)
+            if len_(data) < dlen:
+                continue
+            
+            buf = buffer_(data, dlen)
+            data = buffer_(data, 0, dlen)
+            
+            print >>sys.stderr, "COMMAND", op, "DATA", dlen
+            
+            if op in commands:
+                try:
+                    commands[op](data)
+                except:
+                    traceback.print_exc(file=sys.stderr)
+            else:
+                print >>sys.stderr, "IGNORING UNKNOWN COMMAND", op
+    
+    def stop(self):
+        self._stop = True
+        self.join(1+5*options.poll_delay)
+
+
+
+igmp_threads = []
+for vif_addr in remaining_args:
+    igmp_threads.append(IGMPThread(vif_addr))
+
+rt_cache = {}
+vifs = {}
+
+TERMINATE = []
+TERMINATE = []
+def _finalize(sig,frame):
+    global TERMINATE
+    TERMINATE.append(None)
+signal.signal(signal.SIGTERM, _finalize)
+
+
+try:
+    if not options.announce_only:
+        router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        router_socket.bind(options.mrt_path)
+        router_socket.listen(0)
+        router_remote_socket, router_remote_addr = router_socket.accept()
+
+        fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs)
+        router_thread = RouterThread(rt_cache, router_remote_socket, vifs)
+
+    for thread in igmp_threads:
+        thread.start()
+    
+    if not options.announce_only:
+        fwd_thread.start()
+        router_thread.start()
+
+    while not TERMINATE:
+        time.sleep(30)
+finally:
+    if os.path.exists(options.mrt_path):
+        try:
+            os.remove(options.mrt_path)
+        except:
+            pass
+    if os.path.exists(options.fwd_path):
+        try:
+            os.remove(options.fwd_path)    
+        except:
+            pass
+
+
diff --git a/src/nepi/testbeds/planetlab/scripts/mrouted-3.9.5-pl.patch b/src/nepi/testbeds/planetlab/scripts/mrouted-3.9.5-pl.patch
new file mode 100644 (file)
index 0000000..f22175f
--- /dev/null
@@ -0,0 +1,555 @@
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/config.c mrouted-3.9.5-pl/config.c
+--- mrouted-3.9.5/config.c     2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/config.c  2011-08-29 16:32:43.710565000 +0200
+@@ -91,11 +91,41 @@
+       v->uv_subnetbcast = subnet | ~mask;
+       strncpy(v->uv_name, ifa->ifa_name, sizeof(v->uv_name));
+-      if (flags & IFF_POINTOPOINT)
++      if (flags & IFF_POINTOPOINT) {
+           v->uv_flags |= VIFF_REXMIT_PRUNES;
++          v->uv_ptp_addr = ((struct sockaddr_in *)ifa->ifa_dstaddr)->sin_addr.s_addr;
++      } else if (flags & IFF_BROADCAST) {
++          /* getifaddr doesn't give us the p2p link in this case */
++          /* So use ip, which uses netlink, to query it */
++          char buf[1024];
++        size_t rode;
++        FILE *peer;
++        
++        logit(LOG_INFO,0,"Getting ptp for %s", ifa->ifa_name);
++        
++        snprintf(buf,sizeof(buf),"ip addr show %s | grep -o 'peer [0-9.]*' | grep -o '[0-9.]*'", ifa->ifa_name);
++        peer = popen(buf, "r");
++        rode = fread(buf, 1, sizeof(buf), peer);
++        pclose(peer);
++        
++        if (rode > 0) {
++            /* It has a pointopoint address */
++            struct in_addr ptp_in;
++            
++            for (--rode; rode && buf[rode] <= 13;)
++                --rode;
++            buf[++rode] = 0;
++
++            logit(LOG_INFO,0,"Got %s", buf);
++
++              if (inet_aton(buf, &ptp_in))
++                  v->uv_ptp_addr = ptp_in.s_addr;
++        } 
++      }
+-      logit(LOG_INFO,0,"installing %s (%s on subnet %s) as vif #%u - rate=%d",
++      logit(LOG_INFO,0,"installing %s (%s on subnet %s%s%s) as vif #%u - rate=%d",
+           v->uv_name, inet_fmt(addr, s1, sizeof(s1)), inet_fmts(subnet, mask, s2, sizeof(s2)),
++          (v->uv_ptp_addr) ? "peer " : "", (v->uv_ptp_addr) ? inet_fmt(v->uv_ptp_addr, s3, sizeof(s3)) : "",
+           numvifs, v->uv_rate_limit);
+       ++numvifs;
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/defs.h mrouted-3.9.5-pl/defs.h
+--- mrouted-3.9.5/defs.h       2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/defs.h    2011-08-30 15:30:23.691662000 +0200
+@@ -9,6 +9,8 @@
+ #ifndef __MROUTED_DEFS_H__
+ #define __MROUTED_DEFS_H__
++#define PLANETLAB 1
++
+ #include <stdio.h>
+ #include <stdlib.h>
+ #include <unistd.h>
+@@ -28,6 +30,7 @@
+ #include <sys/time.h>
+ #include <sys/uio.h>
+ #include <net/if.h>
++#include <arpa/inet.h>
+ #include <netinet/in.h>
+ #include <netinet/in_systm.h>
+ #include <netinet/ip.h>
+@@ -61,7 +64,7 @@
+ #include <libutil.h>
+ #endif
+ #endif
+-#ifdef RSRR
++#if defined(RSRR) || defined(PLANETLAB)
+ #include <sys/un.h>
+ #endif /* RSRR */
+@@ -137,6 +140,11 @@
+ extern u_int32                dvmrp_group;
+ extern u_int32                dvmrp_genid;
++#ifdef PLANETLAB
++extern int      plrt_socket;
++extern char     *plrt_socket_path;
++#endif
++
+ #define       IF_DEBUG(l)     if (debug && debug & (l))
+ #define       DEBUG_PKT       0x0001
+@@ -353,6 +361,8 @@
+ extern void           k_leave(u_int32, u_int32);
+ extern void           k_init_dvmrp(void);
+ extern void           k_stop_dvmrp(void);
++extern void     k_init_plrt(void);
++extern void     k_stop_plrt(void);
+ extern void           k_add_vif(vifi_t, struct uvif *);
+ extern void           k_del_vif(vifi_t, struct uvif *);
+ extern void           k_add_rg(u_int32, struct gtable *);
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/igmp.c mrouted-3.9.5-pl/igmp.c
+--- mrouted-3.9.5/igmp.c       2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/igmp.c    2011-08-29 17:03:24.187961000 +0200
+@@ -190,8 +190,10 @@
+     if (ip->ip_p == 0) {
+       if (src == 0 || dst == 0)
+           logit(LOG_WARNING, 0, "kernel request not accurate");
+-      else
++      else {
++          logit(LOG_DEBUG, 0, "received kernel miss");
+           add_table_entry(src, dst);
++    }
+       return;
+     }
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/kern.c mrouted-3.9.5-pl/kern.c
+--- mrouted-3.9.5/kern.c       2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/kern.c    2011-08-31 15:09:21.457071000 +0200
+@@ -7,16 +7,26 @@
+  * Leland Stanford Junior University.
+  */
++#include <paths.h>
+ #include "defs.h"
+ int curttl = 0;
++#ifdef PLANETLAB
++
++int plrt_socket;
++char *plrt_socket_path = _PATH_MROUTED_PLRT;
++
++#endif
++
+ /*
+  * Open/init the multicast routing in the kernel and sets the
+  * MRT_PIM (aka MRT_ASSERT) flag in the kernel.
+  */
+ void k_init_dvmrp(void)
+ {
++#ifndef PLANETLAB
++
+ #ifdef OLD_KERNEL
+     if (setsockopt(igmp_socket, IPPROTO_IP, MRT_INIT, (char *)NULL, 0) < 0) {
+ #else
+@@ -24,11 +34,34 @@
+     if (setsockopt(igmp_socket, IPPROTO_IP, MRT_INIT, (char *)&v, sizeof(int)) < 0) {
+ #endif
++
+       if (errno == EADDRINUSE)
+           logit(LOG_ERR, 0, "Another multicast routing application is already running.");
+       else
+           logit(LOG_ERR, errno, "Cannot enable multicast routing in kernel");
+     }
++
++#endif
++}
++
++void k_init_plrt(void)
++{
++#ifdef PLANETLAB
++    /* 
++     * Just open a connection to the user-space forwarder
++     */
++
++    if ((plrt_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0)) < 0) 
++      logit(LOG_ERR, errno, "PLRT socket");
++      
++      struct sockaddr_un sun;
++      memset(&sun, 0, sizeof(sun));
++      sun.sun_family = AF_UNIX;
++      strncpy(sun.sun_path, plrt_socket_path, sizeof(sun.sun_path));
++      
++    if ((connect(plrt_socket, &sun, sizeof(sun))) < 0) 
++      logit(LOG_ERR, errno, "PLRT socket connect");
++#endif
+ }
+@@ -38,8 +71,17 @@
+  */
+ void k_stop_dvmrp(void)
+ {
++#ifndef PLANETLAB
+     if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DONE, (char *)NULL, 0) < 0)
+       logit(LOG_WARNING, errno, "Cannot disable multicast routing in kernel");
++#endif
++}
++
++void k_stop_plrt(void)
++{
++#ifdef PLANETLAB
++    close(plrt_socket);
++#endif
+ }
+@@ -194,11 +236,25 @@
+  */
+ void k_add_vif(vifi_t vifi, struct uvif *v)
+ {
+-    struct vifctl vc;
+-
+-    vc.vifc_vifi = vifi;
+-    uvif_to_vifctl(&vc, v);
+-    if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_VIF, (char *)&vc, sizeof(vc)) < 0)
++    /* 
++     * PlanetLab does application-level forwarding
++     */
++    struct {
++        u_int32 op;
++        u_int32 len;
++        struct vifctl vc;
++    } op;
++
++    op.vc.vifc_vifi = vifi;
++    uvif_to_vifctl(&op.vc, v);
++
++#ifdef PLANETLAB
++    op.op = MRT_ADD_VIF;
++    op.len = sizeof(op.vc);
++    if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++#else
++    if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_VIF, (char *)&op.vc, sizeof(op.vc)) < 0)
++#endif
+       logit(LOG_ERR, errno, "setsockopt MRT_ADD_VIF on vif %d", vifi);
+ }
+@@ -214,6 +270,20 @@
+      * we're going to delete.  *BSD systems on the other hand exepect only the index
+      * of that VIF.
+      */
++#ifdef PLANETLAB
++    struct {
++        u_int32 op;
++        u_int32 len;
++        struct vifctl vc;
++    } op;
++
++    op.vc.vifc_vifi = vifi;
++    uvif_to_vifctl(&op.vc, v);
++
++    op.op = MRT_DEL_VIF;
++    op.len = sizeof(op.vc);
++    if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++#else
+ #ifdef __linux__
+     struct vifctl vc;
+@@ -224,6 +294,7 @@
+ #else /* *BSD et al. */
+     if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_VIF, (char *)&vifi, sizeof(vifi)) < 0)
+ #endif /* !__linux__ */
++#endif /* PLANETLAB */
+     {
+         if (errno == EADDRNOTAVAIL || errno == EINVAL)
+             return;
+@@ -238,31 +309,57 @@
+  */
+ void k_add_rg(u_int32 origin, struct gtable *g)
+ {
+-    struct mfcctl mc;
++    struct {
++        u_int32 op;
++        u_int32 len;
++        struct mfcctl mc;
++    } op;
++    
+     vifi_t i;
+ #ifdef DEBUG_MFC
+     md_log(MD_ADD, origin, g->gt_mcastgrp);
+ #endif
+     /* copy table values so that setsockopt can process it */
+-    mc.mfcc_origin.s_addr = origin;
++    op.mc.mfcc_origin.s_addr = origin;
+ #ifdef OLD_KERNEL
+-    mc.mfcc_originmask.s_addr = 0xffffffff;
++    op.mc.mfcc_originmask.s_addr = 0xffffffff;
+ #endif
+-    mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
+-    mc.mfcc_parent = g->gt_route ? g->gt_route->rt_parent : NO_VIF;
++    op.mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
++    op.mc.mfcc_parent = g->gt_route ? g->gt_route->rt_parent : NO_VIF;
+     for (i = 0; i < numvifs; i++)
+-      mc.mfcc_ttls[i] = g->gt_ttls[i];
++      op.mc.mfcc_ttls[i] = g->gt_ttls[i];
++
++#ifdef PLANETLAB
++
++      logit(LOG_DEBUG, 0, "setsockopt MRT_ADD_MFC %s-%s %d[%d %d %d %d %d %d]", 
++          inet_fmt(origin, s1, sizeof(s1)),
++          inet_fmt(g->gt_mcastgrp, s2, sizeof(s2)),
++          numvifs,
++          op.mc.mfcc_ttls[0], op.mc.mfcc_ttls[1], op.mc.mfcc_ttls[2],
++          op.mc.mfcc_ttls[3], op.mc.mfcc_ttls[4], op.mc.mfcc_ttls[5] );
++
++    /* Send to PlanetLab's user-space MRT daemon */
++    op.op = MRT_ADD_MFC;
++    op.len = sizeof(op.mc);
++    if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++
++#else /* here if not PLANETLAB */
+     /* write to kernel space */
+     if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_MFC,
+-                 (char *)&mc, sizeof(mc)) < 0) {
++                 (char *)&op.mc, sizeof(op.mc)) < 0) 
++   
++#endif /* PLANETLAB */
++   
++    {
+ #ifdef DEBUG_MFC
+       md_log(MD_ADD_FAIL, origin, g->gt_mcastgrp);
+ #endif
+       logit(LOG_WARNING, errno, "setsockopt MRT_ADD_MFC",
+               inet_fmt(origin, s1, sizeof(s1)), inet_fmt(g->gt_mcastgrp, s2, sizeof(s2)));
+     }
++
+ }
+@@ -271,20 +368,37 @@
+  */
+ int k_del_rg(u_int32 origin, struct gtable *g)
+ {
+-    struct mfcctl mc;
++    struct {
++        u_int32 op;
++        u_int32 len;
++        struct mfcctl mc;
++    } op;
+ #ifdef DEBUG_MFC
+     md_log(MD_DEL, origin, g->gt_mcastgrp);
+ #endif
+     /* copy table values so that setsockopt can process it */
+-    mc.mfcc_origin.s_addr = origin;
++    op.mc.mfcc_origin.s_addr = origin;
+ #ifdef OLD_KERNEL
+-    mc.mfcc_originmask.s_addr = 0xffffffff;
++    op.mc.mfcc_originmask.s_addr = 0xffffffff;
+ #endif
+-    mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
++    op.mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
++
++#ifdef PLANETLAB
++
++    /* Send to PlanetLab's user-space MRT daemon */
++    op.op = MRT_DEL_MFC;
++    op.len = sizeof(op.mc);
++    if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++
++#else /* here if not PLANETLAB */
+     /* write to kernel space */
+-    if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_MFC, (char *)&mc, sizeof(mc)) < 0) {
++    if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_MFC, (char *)&op.mc, sizeof(op.mc)) < 0) 
++
++#endif /* PLANETLAB */
++    
++    {
+ #ifdef DEBUG_MFC
+       md_log(MD_DEL_FAIL, origin, g->gt_mcastgrp);
+ #endif
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/main.c mrouted-3.9.5-pl/main.c
+--- mrouted-3.9.5/main.c       2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/main.c    2011-08-30 15:31:01.281814000 +0200
+@@ -52,9 +52,9 @@
+ time_t mrouted_init_time;
+ #ifdef SNMP
+-#define NHANDLERS     34
++#define NHANDLERS     35
+ #else
+-#define NHANDLERS     2
++#define NHANDLERS     3
+ #endif
+ static struct ihandler {
+@@ -206,6 +206,9 @@
+     fputs("  -h, --help           Show this help text\n", stderr);
+     fputs("  -p                   Disable pruning.  Deprecated, compatibility option\n", stderr);
+     fputs("  -r, --show-routes    Show state of VIFs and multicast routing tables\n", stderr);
++#ifdef PLANETLAB
++    fputs("  -F                   Path to the PlanetLab userland forwarder, default /var/run/mcastfwd\n", stderr);
++#endif
+     fprintf(stderr, "  -v, --version        Show %s version\n", __progname);
+     fputs("\n", stderr);
+@@ -258,11 +261,23 @@
+     snprintf(versionstring, sizeof(versionstring), "mrouted version %s", todaysversion);
+-    while ((ch = getopt_long(argc, argv, "c:d::fhpP::rv", long_options, NULL)) != EOF) {
++#ifdef PLANETLAB
++#define PLOPTIONS ":F"
++#else
++#define PLOPTIONS ""
++#endif
++
++    while ((ch = getopt_long(argc, argv, "c:d::fhpP::rv" PLOPTIONS, long_options, NULL)) != EOF) {
+       switch (ch) {
+           case 'c':
+               configfilename = optarg;
+               break;
++              
++#ifdef PLANETLAB
++              case 'F':
++              plrt_socket_path = optarg;
++              break;
++#endif
+           case 'd':
+               if (!optarg)
+@@ -410,6 +425,8 @@
+     init_ipip();
+     init_routes();
+     init_ktable();
++    k_init_plrt();
++
+ #ifndef OLD_KERNEL
+     /*
+      * Unfortunately, you can't k_get_version() unless you've
+@@ -422,6 +439,7 @@
+     k_init_dvmrp();
+     vers = k_get_version();
+     k_stop_dvmrp();
++    
+     /*XXX
+      * This function must change whenever the kernel version changes
+      */
+@@ -466,6 +484,15 @@
+        logit(LOG_ERR, 0, "Descriptor too big");
+     FD_SET(igmp_socket, &readers);
+     nfds = igmp_socket + 1;
++    
++#ifdef PLANETLAB
++    if (plrt_socket >= (int)FD_SETSIZE)
++       logit(LOG_ERR, 0, "Descriptor too big");
++    FD_SET(plrt_socket, &readers);
++    if (plrt_socket >= nfds)
++        nfds = plrt_socket + 1;
++#endif
++
+     for (i = 0; i < nhandlers; i++) {
+       if (ihandlers[i].fd >= (int)FD_SETSIZE)
+           logit(LOG_ERR, 0, "Descriptor too big");
+@@ -602,6 +629,17 @@
+               accept_igmp(recvlen);
+           }
++#ifdef PLANETLAB
++          if (FD_ISSET(plrt_socket, &rfds)) {
++              recvlen = recvfrom(plrt_socket, recv_buf, RECV_BUF_SIZE, 0, NULL, &dummy);
++              if (recvlen < 0) {
++                  if (errno != EINTR) logit(LOG_ERR, errno, "recvfrom");
++                  continue;
++              }
++              accept_igmp(recvlen);
++          }
++#endif
++
+           for (i = 0; i < nhandlers; i++) {
+               if (FD_ISSET(ihandlers[i].fd, &rfds)) {
+                   (*ihandlers[i].func)(ihandlers[i].fd, &rfds);
+@@ -808,6 +846,7 @@
+       if (did_final_init)
+           k_stop_dvmrp();
+     }
++    k_stop_plrt();
+ }
+ /*
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/Makefile mrouted-3.9.5-pl/Makefile
+--- mrouted-3.9.5/Makefile     2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/Makefile  2011-08-10 17:18:22.233596000 +0200
+@@ -43,7 +43,7 @@
+ ## Common
+ CFLAGS        = $(MCAST_INCLUDE) $(SNMPDEF) $(RSRRDEF) $(INCLUDES) $(DEFS) $(USERCOMPILE)
+-CFLAGS       += -O2 -W -Wall -Werror
++CFLAGS       += -O2 -W -Wall
+ #CFLAGS       += -O -g
+ LDLIBS        = $(SNMPLIBDIR) $(SNMPLIBS) $(EXTRA_LIBS)
+ LDFLAGS      += -Wl,-Map,$@.map
+Only in mrouted-3.9.5-pl: Makefile.bk
+Only in mrouted-3.9.5-pl: map-mbone
+Only in mrouted-3.9.5-pl: map-mbone.map
+Only in mrouted-3.9.5-pl: mrinfo
+Only in mrouted-3.9.5-pl: mrinfo.map
+Only in mrouted-3.9.5-pl: mrouted
+Only in mrouted-3.9.5-pl: mrouted.map
+Only in mrouted-3.9.5-pl: mtrace
+Only in mrouted-3.9.5-pl: mtrace.map
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/pathnames.h mrouted-3.9.5-pl/pathnames.h
+--- mrouted-3.9.5/pathnames.h  2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/pathnames.h       2011-08-29 13:10:27.043684000 +0200
+@@ -14,5 +14,6 @@
+ #define _PATH_MROUTED_GENID   _PATH_MROUTED_RUNDIR "/mrouted.genid"
+ #define _PATH_MROUTED_DUMP    _PATH_MROUTED_RUNDIR "/mrouted.dump"
+ #define _PATH_MROUTED_CACHE   _PATH_MROUTED_RUNDIR "/mrouted.cache"
++#define _PATH_MROUTED_PLRT    _PATH_VARRUN "mcastrt"
+ #endif /* __MROUTED_PATHNAMES_H__ */
+Only in mrouted-3.9.5-pl: vers.c
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/vif.c mrouted-3.9.5-pl/vif.c
+--- mrouted-3.9.5/vif.c        2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/vif.c     2011-08-31 19:26:52.955453000 +0200
+@@ -139,6 +139,7 @@
+     v->uv_lcl_addr    = 0;
+     v->uv_rmt_addr    = 0;
+     v->uv_dst_addr    = t ? 0 : dvmrp_group;
++    v->uv_ptp_addr  = 0;
+     v->uv_subnet      = 0;
+     v->uv_subnetmask  = 0;
+     v->uv_subnetbcast = 0;
+@@ -379,6 +380,8 @@
+           start_route_updates();
+           update_route(p->pa_subnet, p->pa_subnetmask, 0, 0, vifi, NULL);
+       }
++      if (v->uv_ptp_addr)
++          update_route(v->uv_ptp_addr, 0xffffffff, 0, 0, vifi, NULL);
+       /*
+        * Until neighbors are discovered, assume responsibility for sending
+@@ -526,6 +529,8 @@
+                   return(vifi);
+           }
+           else {
++              if (src == v->uv_ptp_addr)
++                  return(vifi);
+               if ((src & v->uv_subnetmask) == v->uv_subnet &&
+                   ((v->uv_subnetmask == 0xffffffff) ||
+                    (src != v->uv_subnetbcast)))
+@@ -1666,6 +1671,10 @@
+                       scaletime(now - v->uv_querier->al_ctime),
+                       scaletime(v->uv_querier->al_timer));
+       }
++      if (0 != v->uv_ptp_addr) {
++          fprintf(fp, "                     PtP remote: %-18s\n",
++                                  inet_fmt(v->uv_ptp_addr, s1, sizeof(s1)));
++      }
+       if (v->uv_flags & VIFF_BLASTER)
+           fprintf(fp, "                  blasterbuf size: %dk\n",
+                       v->uv_blasterlen / 1024);
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/vif.h mrouted-3.9.5-pl/vif.h
+--- mrouted-3.9.5/vif.h        2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/vif.h     2011-08-29 14:35:35.695829000 +0200
+@@ -109,6 +109,7 @@
+     u_int32        uv_lcl_addr;   /* local address of this vif            */
+     u_int32        uv_rmt_addr;   /* remote end-point addr (tunnels only) */
+     u_int32        uv_dst_addr;   /* destination for DVMRP/PIM messages   */
++    u_int32        uv_ptp_addr;   /* remote peer address   (pointopoint only) */
+     u_int32        uv_subnet;     /* subnet number         (phyints only) */
+     u_int32        uv_subnetmask; /* subnet mask           (phyints only) */
+     u_int32        uv_subnetbcast;/* subnet broadcast addr (phyints only) */
index cf55e0d..f015bad 100644 (file)
@@ -19,7 +19,12 @@ import base64
 import traceback
 
 import tunchannel
-import ipaddr2
+
+try:
+    import iovec
+    HAS_IOVEC = True
+except:
+    HAS_IOVEC = False
 
 tun_name = 'tun0'
 tun_path = '/dev/net/tun'
@@ -113,8 +118,8 @@ parser.add_option(
         "Specify a symmetric encryption key with which to protect packets across "
         "the tunnel. python-crypto must be installed on the system." )
 parser.add_option(
-    "-K", "--gre-key", dest="gre_key", metavar="KEY", type="int",
-    default = None,
+    "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
+    default = "true",
     help = 
         "Specify a demultiplexing 32-bit numeric key for GRE." )
 parser.add_option(
@@ -133,14 +138,12 @@ parser.add_option(
     help = "If specified, packets won't be logged to standard output, "
            "but dumped to a pcap-formatted trace in the specified file. " )
 parser.add_option(
-    "--multicast", dest="multicast", 
-    action = "store_true",
-    default = False,
-    help = "If specified, multicast packets will be forwarded and IGMP "
-           "join/leave packets will be generated. Routing information "
-           "must be sent to the mroute unix socket, in a format identical "
-           "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL "
-           "code and 32-bit data length." )
+    "--multicast-forwarder", dest="multicast_fwd", 
+    default = None,
+    help = "If specified, multicast packets will be forwarded to "
+           "the specified unix-domain socket. If the device uses ethernet "
+           "frames, ethernet headers will be stripped and IP packets "
+           "will be forwarded, prefixed with the interface's address." )
 parser.add_option(
     "--filter", dest="filter_module", metavar="PATH",
     default = None,
@@ -222,65 +225,6 @@ IFNAMSIZ = 0x00000010
 IFREQ_SZ = 0x00000028
 FIONREAD = 0x0000541b
 
-class MulticastThread(threading.Thread):
-    def __init__(self, *p, **kw):
-        super(MulticastThread, self).__init__(*p, **kw)
-        self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
-        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
-            socket.inet_aton(options.vif_addr) )
-        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
-        self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
-        self._stop = False
-        self.setDaemon(True)
-    
-    def run(self):
-        devnull = open('/dev/null','r+b')
-        maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
-        cur_maddr = set()
-        lastfullrefresh = time.time()
-        while not self._stop:
-            # Get current subscriptions
-            proc = subprocess.Popen(['ip','maddr','show',tun_name],
-                stdout = subprocess.PIPE,
-                stderr = subprocess.STDOUT,
-                stdin = devnull)
-            new_maddr = set()
-            for line in proc.stdout:
-                match = maddr_re.match(line)
-                if match:
-                    new_maddr.add(match.group(1))
-            proc.wait()
-            
-            # Every now and then, send a full report
-            now = time.time()
-            report_new = new_maddr
-            if (now - lastfullrefresh) <= 30.0:
-                report_new = report_new - cur_maddr
-            else:
-                lastfullrefresh = now
-            
-            # Report subscriptions
-            for grp in report_new:
-                igmpp = ipaddr2.ipigmp(
-                    options.vif_addr, '224.0.0.2', 1, 0x16, 0, grp, 
-                    noipcksum=True)
-                self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
-
-            # Notify group leave
-            for grp in cur_maddr - new_maddr:
-                igmpp = ipaddr2.ipigmp(
-                    options.vif_addr, '224.0.0.2', 1, 0x17, 0, grp, 
-                    noipcksum=True)
-                self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
-
-            cur_maddr = new_maddr
-            
-            time.sleep(1)
-    
-    def stop(self):
-        self._stop = True
-        self.join(5)
-
 class HostLock(object):
     # This class is used as a lock to prevent concurrency issues with more
     # than one instance of netns running in the same machine. Both in 
@@ -517,7 +461,7 @@ def pl_vif_start(tun_path, tun_name):
     if options.vif_txqueuelen is not None:
         stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
     if options.mode.startswith('pl-gre'):
-        stdin.write("gre=%d\n" % (options.gre_key,))
+        stdin.write("gre=%s\n" % (options.gre_key,))
         stdin.write("remote=%s\n" % (options.peer_addr,))
     stdin.close()
     
@@ -694,6 +638,12 @@ else:
     filter_close = None
     queueclass = None
 
+# install multicast forwarding hook
+if options.multicast_fwd:
+    print >>sys.stderr, "Connecting to mcast filter"
+    mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    tunchannel.nonblock(mcfwd_sock.fileno())
+
 # be careful to roll back stuff on exceptions
 tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
 try:
@@ -719,6 +669,61 @@ try:
     tcpdump = None
     reconnect = None
     mcastthread = None
+
+    # install multicast forwarding hook
+    if options.multicast_fwd:
+        print >>sys.stderr, "Installing mcast filter"
+        
+        if HAS_IOVEC:
+            writev = iovec.writev
+        else:
+            os_write = os.write
+            map_ = map
+            str_ = str
+            def writev(fileno, *stuff):
+                os_write(''.join(map_(str_,stuff)))
+        
+        def accept_packet(packet, direction, 
+                _up_accept=accept_packet, 
+                sock=mcfwd_sock, 
+                sockno=mcfwd_sock.fileno(),
+                etherProto=tunchannel.etherProto,
+                etherStrip=tunchannel.etherStrip,
+                etherMode=tun_name.startswith('tap'),
+                multicast_fwd = options.multicast_fwd,
+                vif_addr = socket.inet_aton(options.vif_addr),
+                connected = [], writev=writev,
+                len=len, ord=ord):
+            if _up_accept:
+                rv = _up_accept(packet, direction)
+                if not rv:
+                    return rv
+
+            if direction == 1:
+                # Incoming... what?
+                if etherMode:
+                    if etherProto(packet)=='\x08\x00':
+                        fwd = etherStrip(packet)
+                    else:
+                        fwd = None
+                else:
+                    fwd = packet
+                if fwd is not None and len(fwd) >= 20:
+                    if (ord(fwd[16]) & 0xf0) == 0xe0:
+                        # Forward it
+                        if not connected:
+                            try:
+                                sock.connect(multicast_fwd)
+                                connected.append(None)
+                            except:
+                                traceback.print_exc(file=sys.stderr)
+                        if connected:
+                            try:
+                                writev(sockno, vif_addr,fwd)
+                            except:
+                                traceback.print_exc(file=sys.stderr)
+            return 1
+
     
     if options.protocol == 'fd':
         if accept_packet or filter_init:
@@ -825,11 +830,6 @@ try:
         # or perhaps there is no os.nice support in the system
         pass
     
-    if options.multicast:
-        # Start multicast forwarding daemon
-        mcastthread = MulticastThread()
-        mcastthread.start()
-
     if not filter_init:
         tun_fwd(tun, remote,
             reconnect = reconnect,
index 3f10b85..aeb4e67 100644 (file)
@@ -26,6 +26,7 @@ class TunProtoBase(object):
         self.port = 15000
         self.mode = 'pl-tun'
         self.key = key
+        self.cross_slice = False
         
         self.home_path = home_path
        
@@ -198,9 +199,10 @@ class TunProtoBase(object):
         local_snat = local.snat
         local_txq  = local.txqueuelen
         local_p2p  = local.pointopoint
-        local_cipher = local.tun_cipher
-        local_mcast = local.multicast
-        local_bwlim = local.bwlimit
+        local_cipher=local.tun_cipher
+        local_mcast= local.multicast
+        local_bwlim= local.bwlimit
+        local_mcastfwd = local.multicast_forwarder
         
         if not local_p2p and hasattr(peer, 'address'):
             local_p2p = peer.address
@@ -231,7 +233,8 @@ class TunProtoBase(object):
             "-t", str(check_proto),
             "-A", str(local_addr),
             "-M", str(local_mask),
-            "-C", str(local_cipher)]
+            "-C", str(local_cipher),
+            ]
         
         if check_proto == 'fd':
             passfd_arg = str(peer_addr)
@@ -244,8 +247,12 @@ class TunProtoBase(object):
                 "--pass-fd", passfd_arg
             ])
         elif check_proto == 'gre':
+            if self.cross_slice:
+                args.extend([
+                    "-K", str(self.key.strip('='))
+                ])
+
             args.extend([
-                "-K", str(min(local_port, peer_port)),
                 "-a", str(peer_addr),
             ])
         # both udp and tcp
@@ -267,14 +274,14 @@ class TunProtoBase(object):
             args.append("-N")
         elif local_cap == 'pcap':
             args.extend(('-c','pcap'))
-        if local_mcast:
-            args.append("--multicast")
         if local_bwlim:
             args.extend(("-b",str(local_bwlim*1024)))
         if filter_module:
             args.extend(("--filter", filter_module))
         if filter_args:
             args.extend(("--filter-args", filter_args))
+        if local_mcast and local_mcastfwd:
+            args.extend(("--multicast-forwarder", local_mcastfwd))
 
         self._logger.info("Starting %s", self)
         
@@ -324,7 +331,7 @@ class TunProtoBase(object):
             
             # Connected?
             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
-                "cd %(home)s ; grep -c Connected capture" % dict(
+                "cd %(home)s ; grep -a -c Connected capture" % dict(
                     home = server.shell_escape(self.home_path)),
                 host = local.node.hostname,
                 port = None,
@@ -342,7 +349,7 @@ class TunProtoBase(object):
 
             # At least listening?
             (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
-                "cd %(home)s ; grep -c Listening capture" % dict(
+                "cd %(home)s ; grep -a -c Listening capture" % dict(
                     home = server.shell_escape(self.home_path)),
                 host = local.node.hostname,
                 port = None,
@@ -381,7 +388,7 @@ class TunProtoBase(object):
             # Inspect the trace to check the assigned iface
             local = self.local()
             if local:
-                cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
+                cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict(
                             home = server.shell_escape(self.home_path))
                 for spin in xrange(30):
                     (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
index ce78050..970f83c 100644 (file)
@@ -120,15 +120,15 @@ def _pullPacket(buf, ether_mode=False, len=len):
             rv = buf.popleft()
         return rv
 
-def etherStrip(buf):
+def etherStrip(buf, buffer=buffer, len=len):
     if len(buf) < 14:
         return ""
     if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
         # tagged ethernet frame
-        return buf[18:]
+        return buffer(buf, 18)
     elif buf[12:14] == '\x08\x00':
         # untagged ethernet frame
-        return buf[14:]
+        return buffer(buf, 14)
     else:
         return ""