"nepi.util.graphtools",
"nepi.util" ],
package_dir = {"": "src"},
- package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"],
+ package_data = {"nepi.testbeds.planetlab" : [
+ "scripts/*.py", "scripts/*.c", "scripts/*.patch",
+ ],
"nepi.util" : ["*.tpl"] },
)
self.slicename = None
self._traces = dict()
- import node, interfaces, application
+ import node, interfaces, application, multicast
self._node = node
self._interfaces = interfaces
self._app = application
+ self._multicast = multicast
self._blacklist = set()
self._just_provisioned = set()
def _make_tos_queue_filter(self, parameters):
return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
+ def _make_multicast_forwarder(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastForwarder)
+
+ def _make_multicast_announcer(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
+
+ def _make_multicast_router(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastRouter)
+
# These get initialized when the iface is connected to any filter
self.filter_module = None
+ self.multicast_forwarder = None
# These get initialized when the iface is configured
self.external_iface = None
impl = self._PROTO_MAP[self.peer_proto](
self, self.peer_iface, home_path, self.tun_key)
impl.port = self.tun_port
+ impl.cross_slice = not self.peer_iface or isinstance(self.peer_iface, _CrossIface)
return impl
def recover(self):
TUNFILTER = "TunFilter"
CLASSQUEUEFILTER = "ClassQueueFilter"
TOSQUEUEFILTER = "TosQueueFilter"
+MULTICASTFORWARDER = "MulticastForwarder"
+MULTICASTANNOUNCER = "MulticastAnnouncer"
+MULTICASTROUTER = "MulticastRouter"
TUNFILTERS = (TUNFILTER, CLASSQUEUEFILTER, TOSQUEUEFILTER)
TAPFILTERS = (TUNFILTER, )
crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data)
crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data)
-def connect_dep(testbed_instance, node_guid, app_guid):
- node = testbed_instance._elements[node_guid]
- app = testbed_instance._elements[app_guid]
+def connect_dep(testbed_instance, node_guid, app_guid, node=None, app=None):
+ node = node or testbed_instance._elements[node_guid]
+ app = app or testbed_instance._elements[app_guid]
app.node = node
if app.depends:
if app.rpmFusion:
node.rpmFusion = True
+def connect_forwarder(testbed_instance, node_guid, fwd_guid):
+ node = testbed_instance._elements[node_guid]
+ fwd = testbed_instance._elements[fwd_guid]
+ node.multicast_forwarder = fwd
+
+ if fwd.router:
+ connect_dep(testbed_instance, node_guid, None, app=fwd.router)
+
+ connect_dep(testbed_instance, node_guid, fwd_guid)
+
+def connect_router(testbed_instance, fwd_guid, router_guid):
+ fwd = testbed_instance._elements[fwd_guid]
+ router = testbed_instance._elements[router_guid]
+ fwd.router = router
+
+ if fwd.node:
+ connect_dep(testbed_instance, None, router_guid, node=fwd.node)
+
def connect_node_netpipe(testbed_instance, node_guid, netpipe_guid):
node = testbed_instance._elements[node_guid]
netpipe = testbed_instance._elements[netpipe_guid]
testbed_instance.elements[guid] = element
+def create_multicast_forwarder(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_multicast_forwarder(parameters)
+
+ # Just inject configuration stuff
+ element.home_path = "nepi-mcfwd-%s" % (guid,)
+
+ testbed_instance.elements[guid] = element
+
+def create_multicast_announcer(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_multicast_announcer(parameters)
+
+ # Just inject configuration stuff
+ element.home_path = "nepi-mcann-%s" % (guid,)
+
+ testbed_instance.elements[guid] = element
+
+def create_multicast_router(testbed_instance, guid):
+ parameters = testbed_instance._get_parameters(guid)
+ element = testbed_instance._make_multicast_router(parameters)
+
+ # Just inject configuration stuff
+ element.home_path = "nepi-mcrt-%s" % (guid,)
+
+ testbed_instance.elements[guid] = element
+
def create_internet(testbed_instance, guid):
parameters = testbed_instance._get_parameters(guid)
element = testbed_instance._make_internet(parameters)
# Install stuff
dep.async_setup()
+def configure_announcer(testbed_instance, guid):
+ # Link ifaces
+ fwd = testbed_instance._elements[guid]
+ fwd.ifaces = [ dev
+ for node_guid in testbed_instance.get_connected(guid, "node", "apps")
+ for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
+ for dev in ( testbed_instance._elements.get(dev_guid) ,)
+ if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
+ and dev.multicast ]
+
+ # Install stuff
+ configure_dependency(testbed_instance, guid)
+
+def configure_forwarder(testbed_instance, guid):
+ configure_announcer(testbed_instance, guid)
+
+ # Link ifaces to forwarder
+ fwd = testbed_instance._elements[guid]
+ for iface in fwd.ifaces:
+ iface.multicast_forwarder = '/var/run/mcastfwd'
+
+def configure_router(testbed_instance, guid):
+ # Link ifaces
+ rt = testbed_instance._elements[guid]
+ rt.nonifaces = [ dev
+ for fwd_guid in testbed_instance.get_connected(guid, "fwd", "router")
+ for node_guid in testbed_instance.get_connected(fwd_guid, "node", "apps")
+ for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node")
+ for dev in ( testbed_instance._elements.get(dev_guid) ,)
+ if dev and isinstance(dev, testbed_instance._interfaces.TunIface)
+ and not dev.multicast ]
+
+ # Install stuff
+ configure_dependency(testbed_instance, guid)
+
def configure_netpipe(testbed_instance, guid):
netpipe = testbed_instance._elements[guid]
"max": 1,
"min": 1
}),
+ "router": dict({
+ "help": "Connector to a routing daemon",
+ "name": "router",
+ "max": 1,
+ "min": 1
+ }),
+ "fwd": dict({
+ "help": "Forwarder this routing daemon communicates with",
+ "name": "fwd",
+ "max": 1,
+ "min": 1
+ }),
"pipes": dict({
"help": "Connector to a NetPipe",
"name": "pipes",
}),
dict({
"from": (TESTBED_ID, NODE, "apps"),
- "to": (TESTBED_ID, APPLICATION, "node"),
+ "to": (TESTBED_ID, (APPLICATION, MULTICASTANNOUNCER), "node"),
"init_code": connect_dep,
"can_cross": False
}),
dict({
"from": (TESTBED_ID, NODE, "deps"),
- "to": (TESTBED_ID, DEPENDENCY, "node"),
+ "to": (TESTBED_ID, (DEPENDENCY, NEPIDEPENDENCY, NS3DEPENDENCY), "node"),
"init_code": connect_dep,
"can_cross": False
}),
dict({
- "from": (TESTBED_ID, NODE, "deps"),
- "to": (TESTBED_ID, NEPIDEPENDENCY, "node"),
- "init_code": connect_dep,
+ "from": (TESTBED_ID, NODE, "pipes"),
+ "to": (TESTBED_ID, NETPIPE, "node"),
+ "init_code": connect_node_netpipe,
"can_cross": False
}),
dict({
- "from": (TESTBED_ID, NODE, "deps"),
- "to": (TESTBED_ID, NS3DEPENDENCY, "node"),
- "init_code": connect_dep,
+ "from": (TESTBED_ID, NODE, "apps"),
+ "to": (TESTBED_ID, MULTICASTFORWARDER, "node"),
+ "init_code": connect_forwarder,
"can_cross": False
}),
dict({
- "from": (TESTBED_ID, NODE, "pipes"),
- "to": (TESTBED_ID, NETPIPE, "node"),
- "init_code": connect_node_netpipe,
+ "from": (TESTBED_ID, MULTICASTFORWARDER, "router"),
+ "to": (TESTBED_ID, MULTICASTROUTER, "fwd"),
+ "init_code": connect_router,
"can_cross": False
}),
dict({
"flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
"validation_function": validation.is_string
}),
+ "routing_algorithm": dict({
+ "name": "algorithm",
+ "help": "Routing algorithm.",
+ "value": "dvmrp",
+ "type": Attribute.ENUM,
+ "allowed": ["dvmrp"],
+ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable,
+ "validation_function": validation.is_enum,
+ }),
})
traces = dict({
}),
})
-create_order = [ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
+create_order = [
+ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER,
+ MULTICASTANNOUNCER, MULTICASTFORWARDER, MULTICASTROUTER,
+ TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE,
+ NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
-configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
+configure_order = [
+ INTERNET, Parallel(NODE),
+ NODEIFACE,
+ Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER),
+ Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE,
+ Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
# Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
-start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
+start_order = [ INTERNET,
+ NODEIFACE,
+ Parallel(TAPIFACE), Parallel(TUNIFACE),
+ Parallel(NODE), NETPIPE,
+ Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER),
+ Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ]
# cleanup order
-shutdown_order = [ Parallel(APPLICATION), Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), NODEIFACE, Parallel(NODE) ]
+shutdown_order = [
+ Parallel(APPLICATION),
+ Parallel(MULTICASTROUTER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTANNOUNCER),
+ Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE),
+ Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY),
+ NODEIFACE, Parallel(NODE) ]
factories_info = dict({
NODE: dict({
"connector_types": ["node"],
"traces": ["buildlog"],
}),
+ MULTICASTFORWARDER: dict({
+ "help": "This application installs a userspace packet forwarder "
+ "that, when connected to a node, filters all packets "
+ "flowing through multicast-capable virtual interfaces "
+ "and applies custom-specified routing policies.",
+ "category": FC.CATEGORY_APPLICATIONS,
+ "create_function": create_multicast_forwarder,
+ "preconfigure_function": configure_forwarder,
+ "start_function": start_application,
+ "status_function": status_application,
+ "stop_function": stop_application,
+ "box_attributes": [ ],
+ "connector_types": ["node","router"],
+ "traces": ["buildlog","stderr"],
+ }),
+ MULTICASTANNOUNCER: dict({
+ "help": "This application installs a userspace daemon that "
+ "monitors multicast membership and announces it on all "
+ "multicast-capable interfaces.\n"
+ "This does not usually happen automatically on PlanetLab slivers.",
+ "category": FC.CATEGORY_APPLICATIONS,
+ "create_function": create_multicast_announcer,
+ "preconfigure_function": configure_announcer,
+ "start_function": start_application,
+ "status_function": status_application,
+ "stop_function": stop_application,
+ "box_attributes": [ ],
+ "connector_types": ["node"],
+ "traces": ["buildlog","stderr"],
+ }),
+ MULTICASTROUTER: dict({
+ "help": "This application installs a userspace daemon that "
+ "monitors multicast membership and announces it on all "
+ "multicast-capable interfaces.\n"
+ "This does not usually happen automatically on PlanetLab slivers.",
+ "category": FC.CATEGORY_APPLICATIONS,
+ "create_function": create_multicast_router,
+ "preconfigure_function": configure_router,
+ "start_function": start_application,
+ "status_function": status_application,
+ "stop_function": stop_application,
+ "box_attributes": ["routing_algorithm"],
+ "connector_types": ["fwd"],
+ "traces": ["buildlog","stdout","stderr"],
+ }),
INTERNET: dict({
"help": "Internet routing",
"category": FC.CATEGORY_CHANNELS,
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from constants import TESTBED_ID
+
+import os
+import os.path
+import sys
+import functools
+
+import nepi.util.server as server
+import nepi.util.ipaddr2 as ipaddr2
+
+import logging
+
+import application
+
+class MulticastForwarder(application.Application):
+ """
+ This application installs a userspace packet forwarder
+ that, when connected to a node, filters all packets
+ flowing through multicast-capable virtual interfaces
+ and applies custom-specified routing policies
+ """
+ def __init__(self, *p, **kw):
+ super(MulticastForwarder, self).__init__(*p, **kw)
+
+ self.sources = ' '.join([
+ os.path.join( os.path.dirname(__file__),
+ "scripts", "mcastfwd.py" ),
+ ipaddr2.__file__.replace('.pyc','.py').replace('.pyo','.py'),
+ ])
+
+ self.sudo = True
+
+ self.depends = "python"
+
+ # Initialized when connected
+ self.ifaces = []
+ self.router = None
+
+ def _command_get(self):
+ # canonical representation of dependencies
+ depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+
+ # download rpms and pack into a tar archive
+ return (
+ "python mcastfwd.py %s"
+ ) % ( ' '.join([iface.address for iface in self.ifaces]), )
+ def _command_set(self, value):
+ # ignore
+ return
+ command = property(_command_get, _command_set)
+
+
+class MulticastAnnouncer(application.Application):
+ """
+ This application installs a userspace daemon that
+ monitors multicast membership and announces it on all
+ multicast-capable interfaces.
+ This does not usually happen automatically on PlanetLab slivers.
+ """
+ def __init__(self, *p, **kw):
+ super(MulticastAnnouncer, self).__init__(*p, **kw)
+
+ self.sources = ' '.join([
+ os.path.join( os.path.dirname(__file__),
+ "scripts", "mcastfwd.py" ),
+ ipaddr2.__file__.replace('.pyc','.py').replace('.pyo','.py'),
+ ])
+
+ self.sudo = True
+
+ self.depends = "python"
+
+ self.ifaces = []
+ self.router = None
+
+ def _command_get(self):
+ # canonical representation of dependencies
+ depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
+
+ # download rpms and pack into a tar archive
+ return (
+ "python mcastfwd.py -A %s"
+ ) % ( ' '.join([iface.address for iface in self.ifaces]), )
+ def _command_set(self, value):
+ # ignore
+ return
+ command = property(_command_get, _command_set)
+
+class MulticastRouter(application.Application):
+ """
+ This application installs a userspace daemon that
+ monitors multicast membership and announces it on all
+ multicast-capable interfaces.
+ This does not usually happen automatically on PlanetLab slivers.
+ """
+ ALGORITHM_MAP = {
+ 'dvmrp' : {
+ 'sources' :
+ ' '.join([
+ os.path.join( os.path.dirname(__file__),
+ "scripts", "mrouted-3.9.5-pl.patch" ),
+ ]) ,
+ 'depends' : "",
+ 'buildDepends' : "byacc gcc make patch",
+ 'build' :
+ "mkdir -p mrouted && "
+ "echo '3a1c1e72c4f6f7334d72df4c50b510d7 mrouted-3.9.5.tar.bz2' > archive_sums.txt && "
+ "wget -q -c -O mrouted-3.9.5.tar.bz2 ftp://ftp.vmlinux.org/pub/People/jocke/mrouted/mrouted-3.9.5.tar.bz2 && "
+ "md5sum -c archive_sums.txt && "
+ "tar xvjf mrouted-3.9.5.tar.bz2 -C mrouted --strip-components=1 && "
+ "cd mrouted && patch -p1 < ${SOURCES}/mrouted-3.9.5-pl.patch && make"
+ ,
+ 'install' : "cp mrouted/mrouted ${SOURCES}",
+ 'command' :
+ "while test \\! -e /var/run/mcastrt ; do sleep 1 ; done ; "
+ "echo 'phyint eth0 disable' > ./mrouted.conf ; "
+ "for iface in %(nonifaces)s ; do echo \"phyint $iface disable\" >> ./mrouted.conf ; done ; "
+ "./mrouted -f %(debugbit)s -c ./mrouted.conf"
+ ,
+ 'debugbit' : "-dpacket,igmp,routing,interface,pruning,membership,cache",
+ }
+ }
+
+ def __init__(self, *p, **kw):
+ super(MulticastRouter, self).__init__(*p, **kw)
+
+ self.algorithm = 'dvmrp'
+ self.sudo = True
+ self.nonifaces = []
+
+ def _non_set(self, value):
+ # ignore
+ return
+
+ def _gen_get(attribute, self):
+ return self.ALGORITHM_MAP[self.algorithm][attribute]
+
+ def _command_get(self):
+ command = self.ALGORITHM_MAP[self.algorithm]['command']
+ debugbit = self.ALGORITHM_MAP[self.algorithm]['debugbit']
+
+ # download rpms and pack into a tar archive
+ return command % {
+ 'nonifaces' : ' '.join([iface.if_name for iface in self.nonifaces]),
+ 'debugbit' : (debugbit if self.stderr else ""),
+ }
+ command = property(_command_get, _non_set)
+
+ build = property(functools.partial(_gen_get, "build"), _non_set)
+ install = property(functools.partial(_gen_get, "install"), _non_set)
+ sources = property(functools.partial(_gen_get, "sources"), _non_set)
+ depends = property(functools.partial(_gen_get, "depends"), _non_set)
+ buildDepends = property(functools.partial(_gen_get, "buildDepends"), _non_set)
+
self.rpmFusion = False
self.env = collections.defaultdict(list)
+ # Some special applications - initialized when connected
+ self.multicast_forwarder = None
+
# Testbed-derived attributes
self.slicename = None
self.ident_path = None
--- /dev/null
+import sys
+
+import signal
+import socket
+import struct
+import optparse
+import threading
+import subprocess
+import re
+import time
+import collections
+import os
+import traceback
+import logging
+
+import ipaddr2
+
+usage = "usage: %prog [options] <enabled-addresses>"
+
+parser = optparse.OptionParser(usage=usage)
+
+parser.add_option(
+ "-d", "--poll-delay", dest="poll_delay", metavar="SECONDS", type="float",
+ default = 1.0,
+ help = "Multicast subscription polling interval")
+parser.add_option(
+ "-D", "--refresh-delay", dest="refresh_delay", metavar="SECONDS", type="float",
+ default = 30.0,
+ help = "Full-refresh interval - time between full IGMP reports")
+parser.add_option(
+ "-p", "--fwd-path", dest="fwd_path", metavar="PATH",
+ default = "/var/run/mcastfwd",
+ help = "Path of the unix socket in which the program will listen for packets")
+parser.add_option(
+ "-r", "--router-path", dest="mrt_path", metavar="PATH",
+ default = "/var/run/mcastrt",
+ help = "Path of the unix socket in which the program will listen for routing changes")
+parser.add_option(
+ "-A", "--announce-only", dest="announce_only", action="store_true",
+ default = False,
+ help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.")
+parser.add_option(
+ "-v", "--verbose", dest="verbose", action="store_true",
+ default = False,
+ help = "Path of the unix socket in which the program will listen for routing changes")
+
+(options, remaining_args) = parser.parse_args(sys.argv[1:])
+
+logging.basicConfig(
+ stream=sys.stderr,
+ level=logging.DEBUG if options.verbose else logging.WARNING)
+
+ETH_P_ALL = 0x00000003
+ETH_P_IP = 0x00000800
+TUNSETIFF = 0x400454ca
+IFF_NO_PI = 0x00001000
+IFF_TAP = 0x00000002
+IFF_TUN = 0x00000001
+IFF_VNET_HDR = 0x00004000
+TUN_PKT_STRIP = 0x00000001
+IFHWADDRLEN = 0x00000006
+IFNAMSIZ = 0x00000010
+IFREQ_SZ = 0x00000028
+FIONREAD = 0x0000541b
+
+class IGMPThread(threading.Thread):
+ def __init__(self, vif_addr, *p, **kw):
+ super(IGMPThread, self).__init__(*p, **kw)
+
+ vif_addr = vif_addr.strip()
+ self.vif_addr = vif_addr
+ self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
+ self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
+ socket.inet_aton(self.vif_addr) )
+ self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
+ self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+ self._stop = False
+ self.setDaemon(True)
+
+ # Find tun name
+ proc = subprocess.Popen(['ip','addr','show'],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.STDOUT,
+ stdin = open('/dev/null','r+b') )
+ tun_name = None
+ heading = re.compile(r"\d+:\s*([-a-zA-Z0-9_]+):.*")
+ addr = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}).*")
+ for line in proc.stdout:
+ match = heading.match(line)
+ if match:
+ tun_name = match.group(1)
+ else:
+ match = addr.match(line)
+ if match and match.group(1) == vif_addr:
+ self.tun_name = tun_name
+ break
+ else:
+ raise RuntimeError, "Could not find iterface for", vif_addr
+
+ def run(self):
+ devnull = open('/dev/null','r+b')
+ maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
+ cur_maddr = set()
+ lastfullrefresh = time.time()
+ while not self._stop:
+ # Get current subscriptions
+ proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
+ stdout = subprocess.PIPE,
+ stderr = subprocess.STDOUT,
+ stdin = devnull)
+ new_maddr = set()
+ for line in proc.stdout:
+ match = maddr_re.match(line)
+ if match:
+ new_maddr.add(match.group(1))
+ proc.wait()
+
+ # Every now and then, send a full report
+ now = time.time()
+ report_new = new_maddr
+ if (now - lastfullrefresh) <= options.refresh_delay:
+ report_new = report_new - cur_maddr
+ else:
+ lastfullrefresh = now
+
+ # Report subscriptions
+ for grp in report_new:
+ print >>sys.stderr, "JOINING", grp
+ igmpp = ipaddr2.ipigmp(
+ self.vif_addr, grp, 1, 0x16, 0, grp,
+ noipcksum=True)
+ try:
+ self.igmp_socket.sendto(igmpp, 0, (grp,0))
+ except:
+ traceback.print_exc(file=sys.stderr)
+
+ # Notify group leave
+ for grp in cur_maddr - new_maddr:
+ print >>sys.stderr, "LEAVING", grp
+ igmpp = ipaddr2.ipigmp(
+ self.vif_addr, '224.0.0.2', 1, 0x17, 0, grp,
+ noipcksum=True)
+ try:
+ self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
+ except:
+ traceback.print_exc(file=sys.stderr)
+
+ cur_maddr = new_maddr
+
+ time.sleep(options.poll_delay)
+
+ def stop(self):
+ self._stop = True
+ self.join(1+5*options.poll_delay)
+
+
+class FWDThread(threading.Thread):
+ def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
+ super(FWDThread, self).__init__(*p, **kw)
+
+ self.in_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ self.in_socket.bind(options.fwd_path)
+
+ self.pending = collections.deque()
+ self.maxpending = 1000
+ self.rt_cache = rt_cache
+ self.router_socket = router_socket
+ self.vifs = vifs
+ self.fwd_sockets = {}
+ for fwd_target in remaining_args:
+ fwd_target = socket.inet_aton(fwd_target)
+ fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
+ fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
+ fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target)
+ fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+ self.fwd_sockets[fwd_target] = fwd_socket
+
+ self._stop = False
+ self.setDaemon(True)
+
+ def run(self):
+ in_socket = self.in_socket
+ rt_cache = self.rt_cache
+ vifs = self.vifs
+ router_socket = self.router_socket
+ len_ = len
+ ord_ = ord
+ str_ = str
+ pending = self.pending
+ in_socket.settimeout(options.poll_delay)
+ buffer_ = buffer
+ enumerate_ = enumerate
+ fwd_sockets = self.fwd_sockets
+ npending = 0
+ noent = (None,None)
+
+ while not self._stop:
+ # Get packet
+ try:
+ if pending and npending:
+ packet = pending.pop()
+ npending -= 1
+ else:
+ packet = in_socket.recv(2000)
+ except socket.timeout, e:
+ if pending and not npending:
+ npending = len_(pending)
+ continue
+ if not packet or len_(packet) < 24:
+ continue
+
+ fullpacket = packet
+ parent = packet[:4]
+ packet = buffer_(packet,4)
+
+ if packet[9] == '\x02':
+ # IGMP packet? It's for mrouted
+ self.router_socket.send(packet)
+ elif packet[9] == '\x00':
+ # LOOPING packet, discard
+ continue
+
+ # To-Do: PIM asserts
+
+ # Get route
+ addrinfo = packet[12:20]
+ fwd_targets, rparent = rt_cache.get(addrinfo, noent)
+
+ if fwd_targets is not None and (rparent == '\x00\x00\x00\x00' or rparent == parent):
+ # Forward
+ ttl = ord_(packet[8])
+ tgt_group = (socket.inet_ntoa(addrinfo[4:]),0)
+ print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl,
+ nfwd_targets = len_(fwd_targets)
+ for vifi, vif in vifs.iteritems():
+ if vifi < nfwd_targets:
+ ttl_thresh = ord_(fwd_targets[vifi])
+ if ttl_thresh > 0 and ttl > ttl_thresh:
+ if vif[4] in fwd_sockets:
+ print >>sys.stderr, socket.inet_ntoa(vif[4]),
+ fwd_socket = fwd_sockets[vif[4]]
+ fwd_socket.sendto(packet, 0, tgt_group)
+ print >>sys.stderr, "."
+ else:
+ # Mark pending
+ if len_(pending) < self.maxpending:
+ tgt_group = addrinfo[4:]
+ print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ?"
+
+ pending.append(fullpacket)
+
+ # Notify mrouted by forwarding it with protocol 0
+ router_socket.send(''.join(
+ (packet[:9],'\x00',packet[10:]) ))
+
+ def stop(self):
+ self._stop = True
+ self.join(1+5*options.poll_delay)
+
+
+class RouterThread(threading.Thread):
+ def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
+ super(RouterThread, self).__init__(*p, **kw)
+
+ self.rt_cache = rt_cache
+ self.vifs = vifs
+ self.router_socket = router_socket
+
+ self._stop = False
+ self.setDaemon(True)
+
+ def run(self):
+ rt_cache = self.rt_cache
+ vifs = self.vifs
+ addr_vifs = {}
+ router_socket = self.router_socket
+ router_socket.settimeout(options.poll_delay)
+ len_ = len
+ buffer_ = buffer
+
+ buf = ""
+
+ MRT_BASE = 200
+ MRT_ADD_VIF = MRT_BASE+2 # Add a virtual interface
+ MRT_DEL_VIF = MRT_BASE+3 # Delete a virtual interface
+ MRT_ADD_MFC = MRT_BASE+4 # Add a multicast forwarding entry
+ MRT_DEL_MFC = MRT_BASE+5 # Delete a multicast forwarding entry
+
+ def cmdhdr(cmd, unpack=struct.unpack, buffer=buffer):
+ op,dlen = unpack('II', buffer(cmd,0,8))
+ cmd = buffer(cmd,8)
+ return op,dlen,cmd
+ def vifctl(data, unpack=struct.unpack):
+ #vifi, flags,threshold,rate_limit,lcl_addr,rmt_addr = unpack('HBBI4s4s', data)
+ return unpack('HBBI4s4s', data)
+ def mfcctl(data, unpack=struct.unpack):
+ #origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = unpack('4s4sH10sIIIi', data)
+ return unpack('4s4sH32sIIIi', data)
+
+
+ def add_vif(cmd):
+ vifi = vifctl(cmd)
+ vifs[vifi[0]] = vifi
+ addr_vifs[vifi[4]] = vifi[0]
+ print >>sys.stderr, "Added VIF", vifi
+ def del_vif(cmd):
+ vifi = vifctl(cmd)
+ vifi = vifs[vifi[0]]
+ del addr_vifs[vifi[4]]
+ del vifs[vifi[0]]
+ print >>sys.stderr, "Removed VIF", vifi
+ def add_mfc(cmd):
+ origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
+ if parent in vifs:
+ parent_addr = vifs[parent][4]
+ else:
+ parent_addr = '\x00\x00\x00\x00'
+ addrinfo = origin + mcastgrp
+ rt_cache[addrinfo] = (ttls, parent_addr)
+ print >>sys.stderr, "Added RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))), map(ord,ttls)
+ def del_mfc(cmd):
+ origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
+ if parent in vifs:
+ parent_addr = vifs[parent][4]
+ else:
+ parent_addr = '\x00\x00\x00\x00'
+ addrinfo = origin + mcastgrp
+ del rt_cache[addrinfo]
+ print >>sys.stderr, "Removed RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp)))
+
+ commands = {
+ MRT_ADD_VIF : add_vif,
+ MRT_DEL_VIF : del_vif,
+ MRT_ADD_MFC : add_mfc,
+ MRT_DEL_MFC : del_mfc,
+ }
+
+ while not self._stop:
+ if len_(buf) < 8 or len_(buf) < (cmdhdr(buf)[1]+8):
+ # Get cmd
+ try:
+ cmd = router_socket.recv(2000)
+ except socket.timeout, e:
+ continue
+ if not cmd:
+ print >>sys.stderr, "PLRT CONNECTION BROKEN"
+ TERMINATE.append(None)
+ break
+
+ if buf:
+ buf += cmd
+ cmd = buf
+
+ if len_(cmd) < 8:
+ continue
+
+ op,dlen,data = cmdhdr(cmd)
+ if len_(data) < dlen:
+ continue
+
+ buf = buffer_(data, dlen)
+ data = buffer_(data, 0, dlen)
+
+ print >>sys.stderr, "COMMAND", op, "DATA", dlen
+
+ if op in commands:
+ try:
+ commands[op](data)
+ except:
+ traceback.print_exc(file=sys.stderr)
+ else:
+ print >>sys.stderr, "IGNORING UNKNOWN COMMAND", op
+
+ def stop(self):
+ self._stop = True
+ self.join(1+5*options.poll_delay)
+
+
+
+igmp_threads = []
+for vif_addr in remaining_args:
+ igmp_threads.append(IGMPThread(vif_addr))
+
+rt_cache = {}
+vifs = {}
+
+TERMINATE = []
+TERMINATE = []
+def _finalize(sig,frame):
+ global TERMINATE
+ TERMINATE.append(None)
+signal.signal(signal.SIGTERM, _finalize)
+
+
+try:
+ if not options.announce_only:
+ router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+ router_socket.bind(options.mrt_path)
+ router_socket.listen(0)
+ router_remote_socket, router_remote_addr = router_socket.accept()
+
+ fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs)
+ router_thread = RouterThread(rt_cache, router_remote_socket, vifs)
+
+ for thread in igmp_threads:
+ thread.start()
+
+ if not options.announce_only:
+ fwd_thread.start()
+ router_thread.start()
+
+ while not TERMINATE:
+ time.sleep(30)
+finally:
+ if os.path.exists(options.mrt_path):
+ try:
+ os.remove(options.mrt_path)
+ except:
+ pass
+ if os.path.exists(options.fwd_path):
+ try:
+ os.remove(options.fwd_path)
+ except:
+ pass
+
+
--- /dev/null
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/config.c mrouted-3.9.5-pl/config.c
+--- mrouted-3.9.5/config.c 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/config.c 2011-08-29 16:32:43.710565000 +0200
+@@ -91,11 +91,41 @@
+ v->uv_subnetbcast = subnet | ~mask;
+ strncpy(v->uv_name, ifa->ifa_name, sizeof(v->uv_name));
+
+- if (flags & IFF_POINTOPOINT)
++ if (flags & IFF_POINTOPOINT) {
+ v->uv_flags |= VIFF_REXMIT_PRUNES;
++ v->uv_ptp_addr = ((struct sockaddr_in *)ifa->ifa_dstaddr)->sin_addr.s_addr;
++ } else if (flags & IFF_BROADCAST) {
++ /* getifaddr doesn't give us the p2p link in this case */
++ /* So use ip, which uses netlink, to query it */
++ char buf[1024];
++ size_t rode;
++ FILE *peer;
++
++ logit(LOG_INFO,0,"Getting ptp for %s", ifa->ifa_name);
++
++ snprintf(buf,sizeof(buf),"ip addr show %s | grep -o 'peer [0-9.]*' | grep -o '[0-9.]*'", ifa->ifa_name);
++ peer = popen(buf, "r");
++ rode = fread(buf, 1, sizeof(buf), peer);
++ pclose(peer);
++
++ if (rode > 0) {
++ /* It has a pointopoint address */
++ struct in_addr ptp_in;
++
++ for (--rode; rode && buf[rode] <= 13;)
++ --rode;
++ buf[++rode] = 0;
++
++ logit(LOG_INFO,0,"Got %s", buf);
++
++ if (inet_aton(buf, &ptp_in))
++ v->uv_ptp_addr = ptp_in.s_addr;
++ }
++ }
+
+- logit(LOG_INFO,0,"installing %s (%s on subnet %s) as vif #%u - rate=%d",
++ logit(LOG_INFO,0,"installing %s (%s on subnet %s%s%s) as vif #%u - rate=%d",
+ v->uv_name, inet_fmt(addr, s1, sizeof(s1)), inet_fmts(subnet, mask, s2, sizeof(s2)),
++ (v->uv_ptp_addr) ? "peer " : "", (v->uv_ptp_addr) ? inet_fmt(v->uv_ptp_addr, s3, sizeof(s3)) : "",
+ numvifs, v->uv_rate_limit);
+
+ ++numvifs;
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/defs.h mrouted-3.9.5-pl/defs.h
+--- mrouted-3.9.5/defs.h 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/defs.h 2011-08-30 15:30:23.691662000 +0200
+@@ -9,6 +9,8 @@
+ #ifndef __MROUTED_DEFS_H__
+ #define __MROUTED_DEFS_H__
+
++#define PLANETLAB 1
++
+ #include <stdio.h>
+ #include <stdlib.h>
+ #include <unistd.h>
+@@ -28,6 +30,7 @@
+ #include <sys/time.h>
+ #include <sys/uio.h>
+ #include <net/if.h>
++#include <arpa/inet.h>
+ #include <netinet/in.h>
+ #include <netinet/in_systm.h>
+ #include <netinet/ip.h>
+@@ -61,7 +64,7 @@
+ #include <libutil.h>
+ #endif
+ #endif
+-#ifdef RSRR
++#if defined(RSRR) || defined(PLANETLAB)
+ #include <sys/un.h>
+ #endif /* RSRR */
+
+@@ -137,6 +140,11 @@
+ extern u_int32 dvmrp_group;
+ extern u_int32 dvmrp_genid;
+
++#ifdef PLANETLAB
++extern int plrt_socket;
++extern char *plrt_socket_path;
++#endif
++
+ #define IF_DEBUG(l) if (debug && debug & (l))
+
+ #define DEBUG_PKT 0x0001
+@@ -353,6 +361,8 @@
+ extern void k_leave(u_int32, u_int32);
+ extern void k_init_dvmrp(void);
+ extern void k_stop_dvmrp(void);
++extern void k_init_plrt(void);
++extern void k_stop_plrt(void);
+ extern void k_add_vif(vifi_t, struct uvif *);
+ extern void k_del_vif(vifi_t, struct uvif *);
+ extern void k_add_rg(u_int32, struct gtable *);
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/igmp.c mrouted-3.9.5-pl/igmp.c
+--- mrouted-3.9.5/igmp.c 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/igmp.c 2011-08-29 17:03:24.187961000 +0200
+@@ -190,8 +190,10 @@
+ if (ip->ip_p == 0) {
+ if (src == 0 || dst == 0)
+ logit(LOG_WARNING, 0, "kernel request not accurate");
+- else
++ else {
++ logit(LOG_DEBUG, 0, "received kernel miss");
+ add_table_entry(src, dst);
++ }
+ return;
+ }
+
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/kern.c mrouted-3.9.5-pl/kern.c
+--- mrouted-3.9.5/kern.c 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/kern.c 2011-08-31 15:09:21.457071000 +0200
+@@ -7,16 +7,26 @@
+ * Leland Stanford Junior University.
+ */
+
++#include <paths.h>
+ #include "defs.h"
+
+ int curttl = 0;
+
++#ifdef PLANETLAB
++
++int plrt_socket;
++char *plrt_socket_path = _PATH_MROUTED_PLRT;
++
++#endif
++
+ /*
+ * Open/init the multicast routing in the kernel and sets the
+ * MRT_PIM (aka MRT_ASSERT) flag in the kernel.
+ */
+ void k_init_dvmrp(void)
+ {
++#ifndef PLANETLAB
++
+ #ifdef OLD_KERNEL
+ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_INIT, (char *)NULL, 0) < 0) {
+ #else
+@@ -24,11 +34,34 @@
+
+ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_INIT, (char *)&v, sizeof(int)) < 0) {
+ #endif
++
+ if (errno == EADDRINUSE)
+ logit(LOG_ERR, 0, "Another multicast routing application is already running.");
+ else
+ logit(LOG_ERR, errno, "Cannot enable multicast routing in kernel");
+ }
++
++#endif
++}
++
++void k_init_plrt(void)
++{
++#ifdef PLANETLAB
++ /*
++ * Just open a connection to the user-space forwarder
++ */
++
++ if ((plrt_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0)) < 0)
++ logit(LOG_ERR, errno, "PLRT socket");
++
++ struct sockaddr_un sun;
++ memset(&sun, 0, sizeof(sun));
++ sun.sun_family = AF_UNIX;
++ strncpy(sun.sun_path, plrt_socket_path, sizeof(sun.sun_path));
++
++ if ((connect(plrt_socket, &sun, sizeof(sun))) < 0)
++ logit(LOG_ERR, errno, "PLRT socket connect");
++#endif
+ }
+
+
+@@ -38,8 +71,17 @@
+ */
+ void k_stop_dvmrp(void)
+ {
++#ifndef PLANETLAB
+ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DONE, (char *)NULL, 0) < 0)
+ logit(LOG_WARNING, errno, "Cannot disable multicast routing in kernel");
++#endif
++}
++
++void k_stop_plrt(void)
++{
++#ifdef PLANETLAB
++ close(plrt_socket);
++#endif
+ }
+
+
+@@ -194,11 +236,25 @@
+ */
+ void k_add_vif(vifi_t vifi, struct uvif *v)
+ {
+- struct vifctl vc;
+-
+- vc.vifc_vifi = vifi;
+- uvif_to_vifctl(&vc, v);
+- if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_VIF, (char *)&vc, sizeof(vc)) < 0)
++ /*
++ * PlanetLab does application-level forwarding
++ */
++ struct {
++ u_int32 op;
++ u_int32 len;
++ struct vifctl vc;
++ } op;
++
++ op.vc.vifc_vifi = vifi;
++ uvif_to_vifctl(&op.vc, v);
++
++#ifdef PLANETLAB
++ op.op = MRT_ADD_VIF;
++ op.len = sizeof(op.vc);
++ if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++#else
++ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_VIF, (char *)&op.vc, sizeof(op.vc)) < 0)
++#endif
+ logit(LOG_ERR, errno, "setsockopt MRT_ADD_VIF on vif %d", vifi);
+ }
+
+@@ -214,6 +270,20 @@
+ * we're going to delete. *BSD systems on the other hand exepect only the index
+ * of that VIF.
+ */
++#ifdef PLANETLAB
++ struct {
++ u_int32 op;
++ u_int32 len;
++ struct vifctl vc;
++ } op;
++
++ op.vc.vifc_vifi = vifi;
++ uvif_to_vifctl(&op.vc, v);
++
++ op.op = MRT_DEL_VIF;
++ op.len = sizeof(op.vc);
++ if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++#else
+ #ifdef __linux__
+ struct vifctl vc;
+
+@@ -224,6 +294,7 @@
+ #else /* *BSD et al. */
+ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_VIF, (char *)&vifi, sizeof(vifi)) < 0)
+ #endif /* !__linux__ */
++#endif /* PLANETLAB */
+ {
+ if (errno == EADDRNOTAVAIL || errno == EINVAL)
+ return;
+@@ -238,31 +309,57 @@
+ */
+ void k_add_rg(u_int32 origin, struct gtable *g)
+ {
+- struct mfcctl mc;
++ struct {
++ u_int32 op;
++ u_int32 len;
++ struct mfcctl mc;
++ } op;
++
+ vifi_t i;
+
+ #ifdef DEBUG_MFC
+ md_log(MD_ADD, origin, g->gt_mcastgrp);
+ #endif
+ /* copy table values so that setsockopt can process it */
+- mc.mfcc_origin.s_addr = origin;
++ op.mc.mfcc_origin.s_addr = origin;
+ #ifdef OLD_KERNEL
+- mc.mfcc_originmask.s_addr = 0xffffffff;
++ op.mc.mfcc_originmask.s_addr = 0xffffffff;
+ #endif
+- mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
+- mc.mfcc_parent = g->gt_route ? g->gt_route->rt_parent : NO_VIF;
++ op.mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
++ op.mc.mfcc_parent = g->gt_route ? g->gt_route->rt_parent : NO_VIF;
+ for (i = 0; i < numvifs; i++)
+- mc.mfcc_ttls[i] = g->gt_ttls[i];
++ op.mc.mfcc_ttls[i] = g->gt_ttls[i];
++
++#ifdef PLANETLAB
++
++ logit(LOG_DEBUG, 0, "setsockopt MRT_ADD_MFC %s-%s %d[%d %d %d %d %d %d]",
++ inet_fmt(origin, s1, sizeof(s1)),
++ inet_fmt(g->gt_mcastgrp, s2, sizeof(s2)),
++ numvifs,
++ op.mc.mfcc_ttls[0], op.mc.mfcc_ttls[1], op.mc.mfcc_ttls[2],
++ op.mc.mfcc_ttls[3], op.mc.mfcc_ttls[4], op.mc.mfcc_ttls[5] );
++
++ /* Send to PlanetLab's user-space MRT daemon */
++ op.op = MRT_ADD_MFC;
++ op.len = sizeof(op.mc);
++ if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++
++#else /* here if not PLANETLAB */
+
+ /* write to kernel space */
+ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_MFC,
+- (char *)&mc, sizeof(mc)) < 0) {
++ (char *)&op.mc, sizeof(op.mc)) < 0)
++
++#endif /* PLANETLAB */
++
++ {
+ #ifdef DEBUG_MFC
+ md_log(MD_ADD_FAIL, origin, g->gt_mcastgrp);
+ #endif
+ logit(LOG_WARNING, errno, "setsockopt MRT_ADD_MFC",
+ inet_fmt(origin, s1, sizeof(s1)), inet_fmt(g->gt_mcastgrp, s2, sizeof(s2)));
+ }
++
+ }
+
+
+@@ -271,20 +368,37 @@
+ */
+ int k_del_rg(u_int32 origin, struct gtable *g)
+ {
+- struct mfcctl mc;
++ struct {
++ u_int32 op;
++ u_int32 len;
++ struct mfcctl mc;
++ } op;
+
+ #ifdef DEBUG_MFC
+ md_log(MD_DEL, origin, g->gt_mcastgrp);
+ #endif
+ /* copy table values so that setsockopt can process it */
+- mc.mfcc_origin.s_addr = origin;
++ op.mc.mfcc_origin.s_addr = origin;
+ #ifdef OLD_KERNEL
+- mc.mfcc_originmask.s_addr = 0xffffffff;
++ op.mc.mfcc_originmask.s_addr = 0xffffffff;
+ #endif
+- mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
++ op.mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp;
++
++#ifdef PLANETLAB
++
++ /* Send to PlanetLab's user-space MRT daemon */
++ op.op = MRT_DEL_MFC;
++ op.len = sizeof(op.mc);
++ if (send(plrt_socket, &op, sizeof(op), 0) < 0)
++
++#else /* here if not PLANETLAB */
+
+ /* write to kernel space */
+- if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_MFC, (char *)&mc, sizeof(mc)) < 0) {
++ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_MFC, (char *)&op.mc, sizeof(op.mc)) < 0)
++
++#endif /* PLANETLAB */
++
++ {
+ #ifdef DEBUG_MFC
+ md_log(MD_DEL_FAIL, origin, g->gt_mcastgrp);
+ #endif
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/main.c mrouted-3.9.5-pl/main.c
+--- mrouted-3.9.5/main.c 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/main.c 2011-08-30 15:31:01.281814000 +0200
+@@ -52,9 +52,9 @@
+ time_t mrouted_init_time;
+
+ #ifdef SNMP
+-#define NHANDLERS 34
++#define NHANDLERS 35
+ #else
+-#define NHANDLERS 2
++#define NHANDLERS 3
+ #endif
+
+ static struct ihandler {
+@@ -206,6 +206,9 @@
+ fputs(" -h, --help Show this help text\n", stderr);
+ fputs(" -p Disable pruning. Deprecated, compatibility option\n", stderr);
+ fputs(" -r, --show-routes Show state of VIFs and multicast routing tables\n", stderr);
++#ifdef PLANETLAB
++ fputs(" -F Path to the PlanetLab userland forwarder, default /var/run/mcastfwd\n", stderr);
++#endif
+ fprintf(stderr, " -v, --version Show %s version\n", __progname);
+ fputs("\n", stderr);
+
+@@ -258,11 +261,23 @@
+
+ snprintf(versionstring, sizeof(versionstring), "mrouted version %s", todaysversion);
+
+- while ((ch = getopt_long(argc, argv, "c:d::fhpP::rv", long_options, NULL)) != EOF) {
++#ifdef PLANETLAB
++#define PLOPTIONS ":F"
++#else
++#define PLOPTIONS ""
++#endif
++
++ while ((ch = getopt_long(argc, argv, "c:d::fhpP::rv" PLOPTIONS, long_options, NULL)) != EOF) {
+ switch (ch) {
+ case 'c':
+ configfilename = optarg;
+ break;
++
++#ifdef PLANETLAB
++ case 'F':
++ plrt_socket_path = optarg;
++ break;
++#endif
+
+ case 'd':
+ if (!optarg)
+@@ -410,6 +425,8 @@
+ init_ipip();
+ init_routes();
+ init_ktable();
++ k_init_plrt();
++
+ #ifndef OLD_KERNEL
+ /*
+ * Unfortunately, you can't k_get_version() unless you've
+@@ -422,6 +439,7 @@
+ k_init_dvmrp();
+ vers = k_get_version();
+ k_stop_dvmrp();
++
+ /*XXX
+ * This function must change whenever the kernel version changes
+ */
+@@ -466,6 +484,15 @@
+ logit(LOG_ERR, 0, "Descriptor too big");
+ FD_SET(igmp_socket, &readers);
+ nfds = igmp_socket + 1;
++
++#ifdef PLANETLAB
++ if (plrt_socket >= (int)FD_SETSIZE)
++ logit(LOG_ERR, 0, "Descriptor too big");
++ FD_SET(plrt_socket, &readers);
++ if (plrt_socket >= nfds)
++ nfds = plrt_socket + 1;
++#endif
++
+ for (i = 0; i < nhandlers; i++) {
+ if (ihandlers[i].fd >= (int)FD_SETSIZE)
+ logit(LOG_ERR, 0, "Descriptor too big");
+@@ -602,6 +629,17 @@
+ accept_igmp(recvlen);
+ }
+
++#ifdef PLANETLAB
++ if (FD_ISSET(plrt_socket, &rfds)) {
++ recvlen = recvfrom(plrt_socket, recv_buf, RECV_BUF_SIZE, 0, NULL, &dummy);
++ if (recvlen < 0) {
++ if (errno != EINTR) logit(LOG_ERR, errno, "recvfrom");
++ continue;
++ }
++ accept_igmp(recvlen);
++ }
++#endif
++
+ for (i = 0; i < nhandlers; i++) {
+ if (FD_ISSET(ihandlers[i].fd, &rfds)) {
+ (*ihandlers[i].func)(ihandlers[i].fd, &rfds);
+@@ -808,6 +846,7 @@
+ if (did_final_init)
+ k_stop_dvmrp();
+ }
++ k_stop_plrt();
+ }
+
+ /*
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/Makefile mrouted-3.9.5-pl/Makefile
+--- mrouted-3.9.5/Makefile 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/Makefile 2011-08-10 17:18:22.233596000 +0200
+@@ -43,7 +43,7 @@
+
+ ## Common
+ CFLAGS = $(MCAST_INCLUDE) $(SNMPDEF) $(RSRRDEF) $(INCLUDES) $(DEFS) $(USERCOMPILE)
+-CFLAGS += -O2 -W -Wall -Werror
++CFLAGS += -O2 -W -Wall
+ #CFLAGS += -O -g
+ LDLIBS = $(SNMPLIBDIR) $(SNMPLIBS) $(EXTRA_LIBS)
+ LDFLAGS += -Wl,-Map,$@.map
+Only in mrouted-3.9.5-pl: Makefile.bk
+Only in mrouted-3.9.5-pl: map-mbone
+Only in mrouted-3.9.5-pl: map-mbone.map
+Only in mrouted-3.9.5-pl: mrinfo
+Only in mrouted-3.9.5-pl: mrinfo.map
+Only in mrouted-3.9.5-pl: mrouted
+Only in mrouted-3.9.5-pl: mrouted.map
+Only in mrouted-3.9.5-pl: mtrace
+Only in mrouted-3.9.5-pl: mtrace.map
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/pathnames.h mrouted-3.9.5-pl/pathnames.h
+--- mrouted-3.9.5/pathnames.h 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/pathnames.h 2011-08-29 13:10:27.043684000 +0200
+@@ -14,5 +14,6 @@
+ #define _PATH_MROUTED_GENID _PATH_MROUTED_RUNDIR "/mrouted.genid"
+ #define _PATH_MROUTED_DUMP _PATH_MROUTED_RUNDIR "/mrouted.dump"
+ #define _PATH_MROUTED_CACHE _PATH_MROUTED_RUNDIR "/mrouted.cache"
++#define _PATH_MROUTED_PLRT _PATH_VARRUN "mcastrt"
+
+ #endif /* __MROUTED_PATHNAMES_H__ */
+Only in mrouted-3.9.5-pl: vers.c
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/vif.c mrouted-3.9.5-pl/vif.c
+--- mrouted-3.9.5/vif.c 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/vif.c 2011-08-31 19:26:52.955453000 +0200
+@@ -139,6 +139,7 @@
+ v->uv_lcl_addr = 0;
+ v->uv_rmt_addr = 0;
+ v->uv_dst_addr = t ? 0 : dvmrp_group;
++ v->uv_ptp_addr = 0;
+ v->uv_subnet = 0;
+ v->uv_subnetmask = 0;
+ v->uv_subnetbcast = 0;
+@@ -379,6 +380,8 @@
+ start_route_updates();
+ update_route(p->pa_subnet, p->pa_subnetmask, 0, 0, vifi, NULL);
+ }
++ if (v->uv_ptp_addr)
++ update_route(v->uv_ptp_addr, 0xffffffff, 0, 0, vifi, NULL);
+
+ /*
+ * Until neighbors are discovered, assume responsibility for sending
+@@ -526,6 +529,8 @@
+ return(vifi);
+ }
+ else {
++ if (src == v->uv_ptp_addr)
++ return(vifi);
+ if ((src & v->uv_subnetmask) == v->uv_subnet &&
+ ((v->uv_subnetmask == 0xffffffff) ||
+ (src != v->uv_subnetbcast)))
+@@ -1666,6 +1671,10 @@
+ scaletime(now - v->uv_querier->al_ctime),
+ scaletime(v->uv_querier->al_timer));
+ }
++ if (0 != v->uv_ptp_addr) {
++ fprintf(fp, " PtP remote: %-18s\n",
++ inet_fmt(v->uv_ptp_addr, s1, sizeof(s1)));
++ }
+ if (v->uv_flags & VIFF_BLASTER)
+ fprintf(fp, " blasterbuf size: %dk\n",
+ v->uv_blasterlen / 1024);
+diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/vif.h mrouted-3.9.5-pl/vif.h
+--- mrouted-3.9.5/vif.h 2011-03-05 21:45:25.000000000 +0100
++++ mrouted-3.9.5-pl/vif.h 2011-08-29 14:35:35.695829000 +0200
+@@ -109,6 +109,7 @@
+ u_int32 uv_lcl_addr; /* local address of this vif */
+ u_int32 uv_rmt_addr; /* remote end-point addr (tunnels only) */
+ u_int32 uv_dst_addr; /* destination for DVMRP/PIM messages */
++ u_int32 uv_ptp_addr; /* remote peer address (pointopoint only) */
+ u_int32 uv_subnet; /* subnet number (phyints only) */
+ u_int32 uv_subnetmask; /* subnet mask (phyints only) */
+ u_int32 uv_subnetbcast;/* subnet broadcast addr (phyints only) */
import traceback
import tunchannel
-import ipaddr2
+
+try:
+ import iovec
+ HAS_IOVEC = True
+except:
+ HAS_IOVEC = False
tun_name = 'tun0'
tun_path = '/dev/net/tun'
"Specify a symmetric encryption key with which to protect packets across "
"the tunnel. python-crypto must be installed on the system." )
parser.add_option(
- "-K", "--gre-key", dest="gre_key", metavar="KEY", type="int",
- default = None,
+ "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
+ default = "true",
help =
"Specify a demultiplexing 32-bit numeric key for GRE." )
parser.add_option(
help = "If specified, packets won't be logged to standard output, "
"but dumped to a pcap-formatted trace in the specified file. " )
parser.add_option(
- "--multicast", dest="multicast",
- action = "store_true",
- default = False,
- help = "If specified, multicast packets will be forwarded and IGMP "
- "join/leave packets will be generated. Routing information "
- "must be sent to the mroute unix socket, in a format identical "
- "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL "
- "code and 32-bit data length." )
+ "--multicast-forwarder", dest="multicast_fwd",
+ default = None,
+ help = "If specified, multicast packets will be forwarded to "
+ "the specified unix-domain socket. If the device uses ethernet "
+ "frames, ethernet headers will be stripped and IP packets "
+ "will be forwarded, prefixed with the interface's address." )
parser.add_option(
"--filter", dest="filter_module", metavar="PATH",
default = None,
IFREQ_SZ = 0x00000028
FIONREAD = 0x0000541b
-class MulticastThread(threading.Thread):
- def __init__(self, *p, **kw):
- super(MulticastThread, self).__init__(*p, **kw)
- self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
- self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
- socket.inet_aton(options.vif_addr) )
- self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
- self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
- self._stop = False
- self.setDaemon(True)
-
- def run(self):
- devnull = open('/dev/null','r+b')
- maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
- cur_maddr = set()
- lastfullrefresh = time.time()
- while not self._stop:
- # Get current subscriptions
- proc = subprocess.Popen(['ip','maddr','show',tun_name],
- stdout = subprocess.PIPE,
- stderr = subprocess.STDOUT,
- stdin = devnull)
- new_maddr = set()
- for line in proc.stdout:
- match = maddr_re.match(line)
- if match:
- new_maddr.add(match.group(1))
- proc.wait()
-
- # Every now and then, send a full report
- now = time.time()
- report_new = new_maddr
- if (now - lastfullrefresh) <= 30.0:
- report_new = report_new - cur_maddr
- else:
- lastfullrefresh = now
-
- # Report subscriptions
- for grp in report_new:
- igmpp = ipaddr2.ipigmp(
- options.vif_addr, '224.0.0.2', 1, 0x16, 0, grp,
- noipcksum=True)
- self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
-
- # Notify group leave
- for grp in cur_maddr - new_maddr:
- igmpp = ipaddr2.ipigmp(
- options.vif_addr, '224.0.0.2', 1, 0x17, 0, grp,
- noipcksum=True)
- self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
-
- cur_maddr = new_maddr
-
- time.sleep(1)
-
- def stop(self):
- self._stop = True
- self.join(5)
-
class HostLock(object):
# This class is used as a lock to prevent concurrency issues with more
# than one instance of netns running in the same machine. Both in
if options.vif_txqueuelen is not None:
stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
if options.mode.startswith('pl-gre'):
- stdin.write("gre=%d\n" % (options.gre_key,))
+ stdin.write("gre=%s\n" % (options.gre_key,))
stdin.write("remote=%s\n" % (options.peer_addr,))
stdin.close()
filter_close = None
queueclass = None
+# install multicast forwarding hook
+if options.multicast_fwd:
+ print >>sys.stderr, "Connecting to mcast filter"
+ mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ tunchannel.nonblock(mcfwd_sock.fileno())
+
# be careful to roll back stuff on exceptions
tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
try:
tcpdump = None
reconnect = None
mcastthread = None
+
+ # install multicast forwarding hook
+ if options.multicast_fwd:
+ print >>sys.stderr, "Installing mcast filter"
+
+ if HAS_IOVEC:
+ writev = iovec.writev
+ else:
+ os_write = os.write
+ map_ = map
+ str_ = str
+ def writev(fileno, *stuff):
+ os_write(''.join(map_(str_,stuff)))
+
+ def accept_packet(packet, direction,
+ _up_accept=accept_packet,
+ sock=mcfwd_sock,
+ sockno=mcfwd_sock.fileno(),
+ etherProto=tunchannel.etherProto,
+ etherStrip=tunchannel.etherStrip,
+ etherMode=tun_name.startswith('tap'),
+ multicast_fwd = options.multicast_fwd,
+ vif_addr = socket.inet_aton(options.vif_addr),
+ connected = [], writev=writev,
+ len=len, ord=ord):
+ if _up_accept:
+ rv = _up_accept(packet, direction)
+ if not rv:
+ return rv
+
+ if direction == 1:
+ # Incoming... what?
+ if etherMode:
+ if etherProto(packet)=='\x08\x00':
+ fwd = etherStrip(packet)
+ else:
+ fwd = None
+ else:
+ fwd = packet
+ if fwd is not None and len(fwd) >= 20:
+ if (ord(fwd[16]) & 0xf0) == 0xe0:
+ # Forward it
+ if not connected:
+ try:
+ sock.connect(multicast_fwd)
+ connected.append(None)
+ except:
+ traceback.print_exc(file=sys.stderr)
+ if connected:
+ try:
+ writev(sockno, vif_addr,fwd)
+ except:
+ traceback.print_exc(file=sys.stderr)
+ return 1
+
if options.protocol == 'fd':
if accept_packet or filter_init:
# or perhaps there is no os.nice support in the system
pass
- if options.multicast:
- # Start multicast forwarding daemon
- mcastthread = MulticastThread()
- mcastthread.start()
-
if not filter_init:
tun_fwd(tun, remote,
reconnect = reconnect,
self.port = 15000
self.mode = 'pl-tun'
self.key = key
+ self.cross_slice = False
self.home_path = home_path
local_snat = local.snat
local_txq = local.txqueuelen
local_p2p = local.pointopoint
- local_cipher = local.tun_cipher
- local_mcast = local.multicast
- local_bwlim = local.bwlimit
+ local_cipher=local.tun_cipher
+ local_mcast= local.multicast
+ local_bwlim= local.bwlimit
+ local_mcastfwd = local.multicast_forwarder
if not local_p2p and hasattr(peer, 'address'):
local_p2p = peer.address
"-t", str(check_proto),
"-A", str(local_addr),
"-M", str(local_mask),
- "-C", str(local_cipher)]
+ "-C", str(local_cipher),
+ ]
if check_proto == 'fd':
passfd_arg = str(peer_addr)
"--pass-fd", passfd_arg
])
elif check_proto == 'gre':
+ if self.cross_slice:
+ args.extend([
+ "-K", str(self.key.strip('='))
+ ])
+
args.extend([
- "-K", str(min(local_port, peer_port)),
"-a", str(peer_addr),
])
# both udp and tcp
args.append("-N")
elif local_cap == 'pcap':
args.extend(('-c','pcap'))
- if local_mcast:
- args.append("--multicast")
if local_bwlim:
args.extend(("-b",str(local_bwlim*1024)))
if filter_module:
args.extend(("--filter", filter_module))
if filter_args:
args.extend(("--filter-args", filter_args))
+ if local_mcast and local_mcastfwd:
+ args.extend(("--multicast-forwarder", local_mcastfwd))
self._logger.info("Starting %s", self)
# Connected?
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
- "cd %(home)s ; grep -c Connected capture" % dict(
+ "cd %(home)s ; grep -a -c Connected capture" % dict(
home = server.shell_escape(self.home_path)),
host = local.node.hostname,
port = None,
# At least listening?
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
- "cd %(home)s ; grep -c Listening capture" % dict(
+ "cd %(home)s ; grep -a -c Listening capture" % dict(
home = server.shell_escape(self.home_path)),
host = local.node.hostname,
port = None,
# Inspect the trace to check the assigned iface
local = self.local()
if local:
- cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
+ cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict(
home = server.shell_escape(self.home_path))
for spin in xrange(30):
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
rv = buf.popleft()
return rv
-def etherStrip(buf):
+def etherStrip(buf, buffer=buffer, len=len):
if len(buf) < 14:
return ""
if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
# tagged ethernet frame
- return buf[18:]
+ return buffer(buf, 18)
elif buf[12:14] == '\x08\x00':
# untagged ethernet frame
- return buf[14:]
+ return buffer(buf, 14)
else:
return ""