From: Claudio-Daniel Freire Date: Sun, 4 Sep 2011 17:31:46 +0000 (+0200) Subject: Merge with HEAD, close aly's branch. X-Git-Tag: nepi-3.0.0~251 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=221dc7a823e3310b5ef1cc8f23d49ec1e65b687e;hp=933c66b106c614a91bd0578ca76f3fd8f6d2e2f6;p=nepi.git Merge with HEAD, close aly's branch. --- diff --git a/setup.py b/setup.py index 80b9299b..9e4719a6 100755 --- a/setup.py +++ b/setup.py @@ -22,6 +22,8 @@ setup( "nepi.util.graphtools", "nepi.util" ], package_dir = {"": "src"}, - package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"], + package_data = {"nepi.testbeds.planetlab" : [ + "scripts/*.py", "scripts/*.c", "scripts/*.patch", + ], "nepi.util" : ["*.tpl"] }, ) diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 90028f6b..27ed23d7 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -37,10 +37,11 @@ class TestbedController(testbed_impl.TestbedController): self.slicename = None self._traces = dict() - import node, interfaces, application + import node, interfaces, application, multicast self._node = node self._interfaces = interfaces self._app = application + self._multicast = multicast self._blacklist = set() self._just_provisioned = set() @@ -718,4 +719,13 @@ class TestbedController(testbed_impl.TestbedController): def _make_tos_queue_filter(self, parameters): return self._make_generic(parameters, self._interfaces.ToSQueueFilter) + def _make_multicast_forwarder(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastForwarder) + + def _make_multicast_announcer(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastAnnouncer) + + def _make_multicast_router(self, parameters): + return self._make_generic(parameters, self._multicast.MulticastRouter) + diff --git a/src/nepi/testbeds/planetlab/interfaces.py b/src/nepi/testbeds/planetlab/interfaces.py index 1f313c14..56720a37 100644 --- a/src/nepi/testbeds/planetlab/interfaces.py +++ b/src/nepi/testbeds/planetlab/interfaces.py @@ -139,6 +139,7 @@ class TunIface(object): # These get initialized when the iface is connected to any filter self.filter_module = None + self.multicast_forwarder = None # These get initialized when the iface is configured self.external_iface = None @@ -242,6 +243,7 @@ class TunIface(object): impl = self._PROTO_MAP[self.peer_proto]( self, self.peer_iface, home_path, self.tun_key) impl.port = self.tun_port + impl.cross_slice = not self.peer_iface or isinstance(self.peer_iface, _CrossIface) return impl def recover(self): diff --git a/src/nepi/testbeds/planetlab/metadata.py b/src/nepi/testbeds/planetlab/metadata.py index 7754c0a3..f5d72f19 100644 --- a/src/nepi/testbeds/planetlab/metadata.py +++ b/src/nepi/testbeds/planetlab/metadata.py @@ -31,6 +31,9 @@ NETPIPE = "NetPipe" TUNFILTER = "TunFilter" CLASSQUEUEFILTER = "ClassQueueFilter" TOSQUEUEFILTER = "TosQueueFilter" +MULTICASTFORWARDER = "MulticastForwarder" +MULTICASTANNOUNCER = "MulticastAnnouncer" +MULTICASTROUTER = "MulticastRouter" TUNFILTERS = (TUNFILTER, CLASSQUEUEFILTER, TOSQUEUEFILTER) TAPFILTERS = (TUNFILTER, ) @@ -192,9 +195,9 @@ def crossconnect_filter_peer_both(proto, testbed_instance, filter_guid, peer_dat crossconnect_filter_peer_init(proto, testbed_instance, iface_guid, peer_iface_data) crossconnect_filter_peer_compl(proto, testbed_instance, iface_guid, peer_iface_data) -def connect_dep(testbed_instance, node_guid, app_guid): - node = testbed_instance._elements[node_guid] - app = testbed_instance._elements[app_guid] +def connect_dep(testbed_instance, node_guid, app_guid, node=None, app=None): + node = node or testbed_instance._elements[node_guid] + app = app or testbed_instance._elements[app_guid] app.node = node if app.depends: @@ -213,6 +216,24 @@ def connect_dep(testbed_instance, node_guid, app_guid): if app.rpmFusion: node.rpmFusion = True +def connect_forwarder(testbed_instance, node_guid, fwd_guid): + node = testbed_instance._elements[node_guid] + fwd = testbed_instance._elements[fwd_guid] + node.multicast_forwarder = fwd + + if fwd.router: + connect_dep(testbed_instance, node_guid, None, app=fwd.router) + + connect_dep(testbed_instance, node_guid, fwd_guid) + +def connect_router(testbed_instance, fwd_guid, router_guid): + fwd = testbed_instance._elements[fwd_guid] + router = testbed_instance._elements[router_guid] + fwd.router = router + + if fwd.node: + connect_dep(testbed_instance, None, router_guid, node=fwd.node) + def connect_node_netpipe(testbed_instance, node_guid, netpipe_guid): node = testbed_instance._elements[node_guid] netpipe = testbed_instance._elements[netpipe_guid] @@ -330,6 +351,33 @@ def create_ns3_dependency(testbed_instance, guid): testbed_instance.elements[guid] = element +def create_multicast_forwarder(testbed_instance, guid): + parameters = testbed_instance._get_parameters(guid) + element = testbed_instance._make_multicast_forwarder(parameters) + + # Just inject configuration stuff + element.home_path = "nepi-mcfwd-%s" % (guid,) + + testbed_instance.elements[guid] = element + +def create_multicast_announcer(testbed_instance, guid): + parameters = testbed_instance._get_parameters(guid) + element = testbed_instance._make_multicast_announcer(parameters) + + # Just inject configuration stuff + element.home_path = "nepi-mcann-%s" % (guid,) + + testbed_instance.elements[guid] = element + +def create_multicast_router(testbed_instance, guid): + parameters = testbed_instance._get_parameters(guid) + element = testbed_instance._make_multicast_router(parameters) + + # Just inject configuration stuff + element.home_path = "nepi-mcrt-%s" % (guid,) + + testbed_instance.elements[guid] = element + def create_internet(testbed_instance, guid): parameters = testbed_instance._get_parameters(guid) element = testbed_instance._make_internet(parameters) @@ -490,6 +538,41 @@ def configure_dependency(testbed_instance, guid): # Install stuff dep.async_setup() +def configure_announcer(testbed_instance, guid): + # Link ifaces + fwd = testbed_instance._elements[guid] + fwd.ifaces = [ dev + for node_guid in testbed_instance.get_connected(guid, "node", "apps") + for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node") + for dev in ( testbed_instance._elements.get(dev_guid) ,) + if dev and isinstance(dev, testbed_instance._interfaces.TunIface) + and dev.multicast ] + + # Install stuff + configure_dependency(testbed_instance, guid) + +def configure_forwarder(testbed_instance, guid): + configure_announcer(testbed_instance, guid) + + # Link ifaces to forwarder + fwd = testbed_instance._elements[guid] + for iface in fwd.ifaces: + iface.multicast_forwarder = '/var/run/mcastfwd' + +def configure_router(testbed_instance, guid): + # Link ifaces + rt = testbed_instance._elements[guid] + rt.nonifaces = [ dev + for fwd_guid in testbed_instance.get_connected(guid, "fwd", "router") + for node_guid in testbed_instance.get_connected(fwd_guid, "node", "apps") + for dev_guid in testbed_instance.get_connected(node_guid, "devs", "node") + for dev in ( testbed_instance._elements.get(dev_guid) ,) + if dev and isinstance(dev, testbed_instance._interfaces.TunIface) + and not dev.multicast ] + + # Install stuff + configure_dependency(testbed_instance, guid) + def configure_netpipe(testbed_instance, guid): netpipe = testbed_instance._elements[guid] @@ -536,6 +619,18 @@ connector_types = dict({ "max": 1, "min": 1 }), + "router": dict({ + "help": "Connector to a routing daemon", + "name": "router", + "max": 1, + "min": 1 + }), + "fwd": dict({ + "help": "Forwarder this routing daemon communicates with", + "name": "fwd", + "max": 1, + "min": 1 + }), "pipes": dict({ "help": "Connector to a NetPipe", "name": "pipes", @@ -602,32 +697,32 @@ connections = [ }), dict({ "from": (TESTBED_ID, NODE, "apps"), - "to": (TESTBED_ID, APPLICATION, "node"), + "to": (TESTBED_ID, (APPLICATION, MULTICASTANNOUNCER), "node"), "init_code": connect_dep, "can_cross": False }), dict({ "from": (TESTBED_ID, NODE, "deps"), - "to": (TESTBED_ID, DEPENDENCY, "node"), + "to": (TESTBED_ID, (DEPENDENCY, NEPIDEPENDENCY, NS3DEPENDENCY), "node"), "init_code": connect_dep, "can_cross": False }), dict({ - "from": (TESTBED_ID, NODE, "deps"), - "to": (TESTBED_ID, NEPIDEPENDENCY, "node"), - "init_code": connect_dep, + "from": (TESTBED_ID, NODE, "pipes"), + "to": (TESTBED_ID, NETPIPE, "node"), + "init_code": connect_node_netpipe, "can_cross": False }), dict({ - "from": (TESTBED_ID, NODE, "deps"), - "to": (TESTBED_ID, NS3DEPENDENCY, "node"), - "init_code": connect_dep, + "from": (TESTBED_ID, NODE, "apps"), + "to": (TESTBED_ID, MULTICASTFORWARDER, "node"), + "init_code": connect_forwarder, "can_cross": False }), dict({ - "from": (TESTBED_ID, NODE, "pipes"), - "to": (TESTBED_ID, NETPIPE, "node"), - "init_code": connect_node_netpipe, + "from": (TESTBED_ID, MULTICASTFORWARDER, "router"), + "to": (TESTBED_ID, MULTICASTROUTER, "fwd"), + "init_code": connect_router, "can_cross": False }), dict({ @@ -1167,6 +1262,15 @@ attributes = dict({ "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, "validation_function": validation.is_string }), + "routing_algorithm": dict({ + "name": "algorithm", + "help": "Routing algorithm.", + "value": "dvmrp", + "type": Attribute.ENUM, + "allowed": ["dvmrp"], + "flags": Attribute.ExecReadOnly | Attribute.ExecImmutable, + "validation_function": validation.is_enum, + }), }) traces = dict({ @@ -1206,15 +1310,34 @@ traces = dict({ }), }) -create_order = [ INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] +create_order = [ + INTERNET, NODE, NODEIFACE, CLASSQUEUEFILTER, TOSQUEUEFILTER, + MULTICASTANNOUNCER, MULTICASTFORWARDER, MULTICASTROUTER, + TUNFILTER, TAPIFACE, TUNIFACE, NETPIPE, + NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ] -configure_order = [ INTERNET, Parallel(NODE), NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] +configure_order = [ + INTERNET, Parallel(NODE), + NODEIFACE, + Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER), + Parallel(TAPIFACE), Parallel(TUNIFACE), NETPIPE, + Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] # Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes -start_order = [ INTERNET, NODEIFACE, Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NODE), NETPIPE, Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] +start_order = [ INTERNET, + NODEIFACE, + Parallel(TAPIFACE), Parallel(TUNIFACE), + Parallel(NODE), NETPIPE, + Parallel(MULTICASTANNOUNCER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTROUTER), + Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), Parallel(APPLICATION) ] # cleanup order -shutdown_order = [ Parallel(APPLICATION), Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), NODEIFACE, Parallel(NODE) ] +shutdown_order = [ + Parallel(APPLICATION), + Parallel(MULTICASTROUTER), Parallel(MULTICASTFORWARDER), Parallel(MULTICASTANNOUNCER), + Parallel(TAPIFACE), Parallel(TUNIFACE), Parallel(NETPIPE), + Parallel(NEPIDEPENDENCY), Parallel(NS3DEPENDENCY), Parallel(DEPENDENCY), + NODEIFACE, Parallel(NODE) ] factories_info = dict({ NODE: dict({ @@ -1428,6 +1551,51 @@ factories_info = dict({ "connector_types": ["node"], "traces": ["buildlog"], }), + MULTICASTFORWARDER: dict({ + "help": "This application installs a userspace packet forwarder " + "that, when connected to a node, filters all packets " + "flowing through multicast-capable virtual interfaces " + "and applies custom-specified routing policies.", + "category": FC.CATEGORY_APPLICATIONS, + "create_function": create_multicast_forwarder, + "preconfigure_function": configure_forwarder, + "start_function": start_application, + "status_function": status_application, + "stop_function": stop_application, + "box_attributes": [ ], + "connector_types": ["node","router"], + "traces": ["buildlog","stderr"], + }), + MULTICASTANNOUNCER: dict({ + "help": "This application installs a userspace daemon that " + "monitors multicast membership and announces it on all " + "multicast-capable interfaces.\n" + "This does not usually happen automatically on PlanetLab slivers.", + "category": FC.CATEGORY_APPLICATIONS, + "create_function": create_multicast_announcer, + "preconfigure_function": configure_announcer, + "start_function": start_application, + "status_function": status_application, + "stop_function": stop_application, + "box_attributes": [ ], + "connector_types": ["node"], + "traces": ["buildlog","stderr"], + }), + MULTICASTROUTER: dict({ + "help": "This application installs a userspace daemon that " + "monitors multicast membership and announces it on all " + "multicast-capable interfaces.\n" + "This does not usually happen automatically on PlanetLab slivers.", + "category": FC.CATEGORY_APPLICATIONS, + "create_function": create_multicast_router, + "preconfigure_function": configure_router, + "start_function": start_application, + "status_function": status_application, + "stop_function": stop_application, + "box_attributes": ["routing_algorithm"], + "connector_types": ["fwd"], + "traces": ["buildlog","stdout","stderr"], + }), INTERNET: dict({ "help": "Internet routing", "category": FC.CATEGORY_CHANNELS, diff --git a/src/nepi/testbeds/planetlab/multicast.py b/src/nepi/testbeds/planetlab/multicast.py new file mode 100644 index 00000000..71df64cf --- /dev/null +++ b/src/nepi/testbeds/planetlab/multicast.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from constants import TESTBED_ID + +import os +import os.path +import sys +import functools + +import nepi.util.server as server +import nepi.util.ipaddr2 as ipaddr2 + +import logging + +import application + +class MulticastForwarder(application.Application): + """ + This application installs a userspace packet forwarder + that, when connected to a node, filters all packets + flowing through multicast-capable virtual interfaces + and applies custom-specified routing policies + """ + def __init__(self, *p, **kw): + super(MulticastForwarder, self).__init__(*p, **kw) + + self.sources = ' '.join([ + os.path.join( os.path.dirname(__file__), + "scripts", "mcastfwd.py" ), + ipaddr2.__file__.replace('.pyc','.py').replace('.pyo','.py'), + ]) + + self.sudo = True + + self.depends = "python" + + # Initialized when connected + self.ifaces = [] + self.router = None + + def _command_get(self): + # canonical representation of dependencies + depends = ' '.join( sorted( (self.depends or "").split(' ') ) ) + + # download rpms and pack into a tar archive + return ( + "python mcastfwd.py %s" + ) % ( ' '.join([iface.address for iface in self.ifaces]), ) + def _command_set(self, value): + # ignore + return + command = property(_command_get, _command_set) + + +class MulticastAnnouncer(application.Application): + """ + This application installs a userspace daemon that + monitors multicast membership and announces it on all + multicast-capable interfaces. + This does not usually happen automatically on PlanetLab slivers. + """ + def __init__(self, *p, **kw): + super(MulticastAnnouncer, self).__init__(*p, **kw) + + self.sources = ' '.join([ + os.path.join( os.path.dirname(__file__), + "scripts", "mcastfwd.py" ), + ipaddr2.__file__.replace('.pyc','.py').replace('.pyo','.py'), + ]) + + self.sudo = True + + self.depends = "python" + + self.ifaces = [] + self.router = None + + def _command_get(self): + # canonical representation of dependencies + depends = ' '.join( sorted( (self.depends or "").split(' ') ) ) + + # download rpms and pack into a tar archive + return ( + "python mcastfwd.py -A %s" + ) % ( ' '.join([iface.address for iface in self.ifaces]), ) + def _command_set(self, value): + # ignore + return + command = property(_command_get, _command_set) + +class MulticastRouter(application.Application): + """ + This application installs a userspace daemon that + monitors multicast membership and announces it on all + multicast-capable interfaces. + This does not usually happen automatically on PlanetLab slivers. + """ + ALGORITHM_MAP = { + 'dvmrp' : { + 'sources' : + ' '.join([ + os.path.join( os.path.dirname(__file__), + "scripts", "mrouted-3.9.5-pl.patch" ), + ]) , + 'depends' : "", + 'buildDepends' : "byacc gcc make patch", + 'build' : + "mkdir -p mrouted && " + "echo '3a1c1e72c4f6f7334d72df4c50b510d7 mrouted-3.9.5.tar.bz2' > archive_sums.txt && " + "wget -q -c -O mrouted-3.9.5.tar.bz2 ftp://ftp.vmlinux.org/pub/People/jocke/mrouted/mrouted-3.9.5.tar.bz2 && " + "md5sum -c archive_sums.txt && " + "tar xvjf mrouted-3.9.5.tar.bz2 -C mrouted --strip-components=1 && " + "cd mrouted && patch -p1 < ${SOURCES}/mrouted-3.9.5-pl.patch && make" + , + 'install' : "cp mrouted/mrouted ${SOURCES}", + 'command' : + "while test \\! -e /var/run/mcastrt ; do sleep 1 ; done ; " + "echo 'phyint eth0 disable' > ./mrouted.conf ; " + "for iface in %(nonifaces)s ; do echo \"phyint $iface disable\" >> ./mrouted.conf ; done ; " + "./mrouted -f %(debugbit)s -c ./mrouted.conf" + , + 'debugbit' : "-dpacket,igmp,routing,interface,pruning,membership,cache", + } + } + + def __init__(self, *p, **kw): + super(MulticastRouter, self).__init__(*p, **kw) + + self.algorithm = 'dvmrp' + self.sudo = True + self.nonifaces = [] + + def _non_set(self, value): + # ignore + return + + def _gen_get(attribute, self): + return self.ALGORITHM_MAP[self.algorithm][attribute] + + def _command_get(self): + command = self.ALGORITHM_MAP[self.algorithm]['command'] + debugbit = self.ALGORITHM_MAP[self.algorithm]['debugbit'] + + # download rpms and pack into a tar archive + return command % { + 'nonifaces' : ' '.join([iface.if_name for iface in self.nonifaces]), + 'debugbit' : (debugbit if self.stderr else ""), + } + command = property(_command_get, _non_set) + + build = property(functools.partial(_gen_get, "build"), _non_set) + install = property(functools.partial(_gen_get, "install"), _non_set) + sources = property(functools.partial(_gen_get, "sources"), _non_set) + depends = property(functools.partial(_gen_get, "depends"), _non_set) + buildDepends = property(functools.partial(_gen_get, "buildDepends"), _non_set) + diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index 5d9320c2..5e1445ec 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -113,6 +113,9 @@ class Node(object): self.rpmFusion = False self.env = collections.defaultdict(list) + # Some special applications - initialized when connected + self.multicast_forwarder = None + # Testbed-derived attributes self.slicename = None self.ident_path = None diff --git a/src/nepi/testbeds/planetlab/scripts/mcastfwd.py b/src/nepi/testbeds/planetlab/scripts/mcastfwd.py new file mode 100644 index 00000000..77bab0c0 --- /dev/null +++ b/src/nepi/testbeds/planetlab/scripts/mcastfwd.py @@ -0,0 +1,426 @@ +import sys + +import signal +import socket +import struct +import optparse +import threading +import subprocess +import re +import time +import collections +import os +import traceback +import logging + +import ipaddr2 + +usage = "usage: %prog [options] " + +parser = optparse.OptionParser(usage=usage) + +parser.add_option( + "-d", "--poll-delay", dest="poll_delay", metavar="SECONDS", type="float", + default = 1.0, + help = "Multicast subscription polling interval") +parser.add_option( + "-D", "--refresh-delay", dest="refresh_delay", metavar="SECONDS", type="float", + default = 30.0, + help = "Full-refresh interval - time between full IGMP reports") +parser.add_option( + "-p", "--fwd-path", dest="fwd_path", metavar="PATH", + default = "/var/run/mcastfwd", + help = "Path of the unix socket in which the program will listen for packets") +parser.add_option( + "-r", "--router-path", dest="mrt_path", metavar="PATH", + default = "/var/run/mcastrt", + help = "Path of the unix socket in which the program will listen for routing changes") +parser.add_option( + "-A", "--announce-only", dest="announce_only", action="store_true", + default = False, + help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.") +parser.add_option( + "-v", "--verbose", dest="verbose", action="store_true", + default = False, + help = "Path of the unix socket in which the program will listen for routing changes") + +(options, remaining_args) = parser.parse_args(sys.argv[1:]) + +logging.basicConfig( + stream=sys.stderr, + level=logging.DEBUG if options.verbose else logging.WARNING) + +ETH_P_ALL = 0x00000003 +ETH_P_IP = 0x00000800 +TUNSETIFF = 0x400454ca +IFF_NO_PI = 0x00001000 +IFF_TAP = 0x00000002 +IFF_TUN = 0x00000001 +IFF_VNET_HDR = 0x00004000 +TUN_PKT_STRIP = 0x00000001 +IFHWADDRLEN = 0x00000006 +IFNAMSIZ = 0x00000010 +IFREQ_SZ = 0x00000028 +FIONREAD = 0x0000541b + +class IGMPThread(threading.Thread): + def __init__(self, vif_addr, *p, **kw): + super(IGMPThread, self).__init__(*p, **kw) + + vif_addr = vif_addr.strip() + self.vif_addr = vif_addr + self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP) + self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, + socket.inet_aton(self.vif_addr) ) + self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) + self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) + self._stop = False + self.setDaemon(True) + + # Find tun name + proc = subprocess.Popen(['ip','addr','show'], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = open('/dev/null','r+b') ) + tun_name = None + heading = re.compile(r"\d+:\s*([-a-zA-Z0-9_]+):.*") + addr = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}).*") + for line in proc.stdout: + match = heading.match(line) + if match: + tun_name = match.group(1) + else: + match = addr.match(line) + if match and match.group(1) == vif_addr: + self.tun_name = tun_name + break + else: + raise RuntimeError, "Could not find iterface for", vif_addr + + def run(self): + devnull = open('/dev/null','r+b') + maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*") + cur_maddr = set() + lastfullrefresh = time.time() + while not self._stop: + # Get current subscriptions + proc = subprocess.Popen(['ip','maddr','show',self.tun_name], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = devnull) + new_maddr = set() + for line in proc.stdout: + match = maddr_re.match(line) + if match: + new_maddr.add(match.group(1)) + proc.wait() + + # Every now and then, send a full report + now = time.time() + report_new = new_maddr + if (now - lastfullrefresh) <= options.refresh_delay: + report_new = report_new - cur_maddr + else: + lastfullrefresh = now + + # Report subscriptions + for grp in report_new: + print >>sys.stderr, "JOINING", grp + igmpp = ipaddr2.ipigmp( + self.vif_addr, grp, 1, 0x16, 0, grp, + noipcksum=True) + try: + self.igmp_socket.sendto(igmpp, 0, (grp,0)) + except: + traceback.print_exc(file=sys.stderr) + + # Notify group leave + for grp in cur_maddr - new_maddr: + print >>sys.stderr, "LEAVING", grp + igmpp = ipaddr2.ipigmp( + self.vif_addr, '224.0.0.2', 1, 0x17, 0, grp, + noipcksum=True) + try: + self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0)) + except: + traceback.print_exc(file=sys.stderr) + + cur_maddr = new_maddr + + time.sleep(options.poll_delay) + + def stop(self): + self._stop = True + self.join(1+5*options.poll_delay) + + +class FWDThread(threading.Thread): + def __init__(self, rt_cache, router_socket, vifs, *p, **kw): + super(FWDThread, self).__init__(*p, **kw) + + self.in_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.in_socket.bind(options.fwd_path) + + self.pending = collections.deque() + self.maxpending = 1000 + self.rt_cache = rt_cache + self.router_socket = router_socket + self.vifs = vifs + self.fwd_sockets = {} + for fwd_target in remaining_args: + fwd_target = socket.inet_aton(fwd_target) + fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) + fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) + fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target) + fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) + self.fwd_sockets[fwd_target] = fwd_socket + + self._stop = False + self.setDaemon(True) + + def run(self): + in_socket = self.in_socket + rt_cache = self.rt_cache + vifs = self.vifs + router_socket = self.router_socket + len_ = len + ord_ = ord + str_ = str + pending = self.pending + in_socket.settimeout(options.poll_delay) + buffer_ = buffer + enumerate_ = enumerate + fwd_sockets = self.fwd_sockets + npending = 0 + noent = (None,None) + + while not self._stop: + # Get packet + try: + if pending and npending: + packet = pending.pop() + npending -= 1 + else: + packet = in_socket.recv(2000) + except socket.timeout, e: + if pending and not npending: + npending = len_(pending) + continue + if not packet or len_(packet) < 24: + continue + + fullpacket = packet + parent = packet[:4] + packet = buffer_(packet,4) + + if packet[9] == '\x02': + # IGMP packet? It's for mrouted + self.router_socket.send(packet) + elif packet[9] == '\x00': + # LOOPING packet, discard + continue + + # To-Do: PIM asserts + + # Get route + addrinfo = packet[12:20] + fwd_targets, rparent = rt_cache.get(addrinfo, noent) + + if fwd_targets is not None and (rparent == '\x00\x00\x00\x00' or rparent == parent): + # Forward + ttl = ord_(packet[8]) + tgt_group = (socket.inet_ntoa(addrinfo[4:]),0) + print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl, + nfwd_targets = len_(fwd_targets) + for vifi, vif in vifs.iteritems(): + if vifi < nfwd_targets: + ttl_thresh = ord_(fwd_targets[vifi]) + if ttl_thresh > 0 and ttl > ttl_thresh: + if vif[4] in fwd_sockets: + print >>sys.stderr, socket.inet_ntoa(vif[4]), + fwd_socket = fwd_sockets[vif[4]] + fwd_socket.sendto(packet, 0, tgt_group) + print >>sys.stderr, "." + else: + # Mark pending + if len_(pending) < self.maxpending: + tgt_group = addrinfo[4:] + print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ?" + + pending.append(fullpacket) + + # Notify mrouted by forwarding it with protocol 0 + router_socket.send(''.join( + (packet[:9],'\x00',packet[10:]) )) + + def stop(self): + self._stop = True + self.join(1+5*options.poll_delay) + + +class RouterThread(threading.Thread): + def __init__(self, rt_cache, router_socket, vifs, *p, **kw): + super(RouterThread, self).__init__(*p, **kw) + + self.rt_cache = rt_cache + self.vifs = vifs + self.router_socket = router_socket + + self._stop = False + self.setDaemon(True) + + def run(self): + rt_cache = self.rt_cache + vifs = self.vifs + addr_vifs = {} + router_socket = self.router_socket + router_socket.settimeout(options.poll_delay) + len_ = len + buffer_ = buffer + + buf = "" + + MRT_BASE = 200 + MRT_ADD_VIF = MRT_BASE+2 # Add a virtual interface + MRT_DEL_VIF = MRT_BASE+3 # Delete a virtual interface + MRT_ADD_MFC = MRT_BASE+4 # Add a multicast forwarding entry + MRT_DEL_MFC = MRT_BASE+5 # Delete a multicast forwarding entry + + def cmdhdr(cmd, unpack=struct.unpack, buffer=buffer): + op,dlen = unpack('II', buffer(cmd,0,8)) + cmd = buffer(cmd,8) + return op,dlen,cmd + def vifctl(data, unpack=struct.unpack): + #vifi, flags,threshold,rate_limit,lcl_addr,rmt_addr = unpack('HBBI4s4s', data) + return unpack('HBBI4s4s', data) + def mfcctl(data, unpack=struct.unpack): + #origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = unpack('4s4sH10sIIIi', data) + return unpack('4s4sH32sIIIi', data) + + + def add_vif(cmd): + vifi = vifctl(cmd) + vifs[vifi[0]] = vifi + addr_vifs[vifi[4]] = vifi[0] + print >>sys.stderr, "Added VIF", vifi + def del_vif(cmd): + vifi = vifctl(cmd) + vifi = vifs[vifi[0]] + del addr_vifs[vifi[4]] + del vifs[vifi[0]] + print >>sys.stderr, "Removed VIF", vifi + def add_mfc(cmd): + origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data) + if parent in vifs: + parent_addr = vifs[parent][4] + else: + parent_addr = '\x00\x00\x00\x00' + addrinfo = origin + mcastgrp + rt_cache[addrinfo] = (ttls, parent_addr) + print >>sys.stderr, "Added RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))), map(ord,ttls) + def del_mfc(cmd): + origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data) + if parent in vifs: + parent_addr = vifs[parent][4] + else: + parent_addr = '\x00\x00\x00\x00' + addrinfo = origin + mcastgrp + del rt_cache[addrinfo] + print >>sys.stderr, "Removed RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))) + + commands = { + MRT_ADD_VIF : add_vif, + MRT_DEL_VIF : del_vif, + MRT_ADD_MFC : add_mfc, + MRT_DEL_MFC : del_mfc, + } + + while not self._stop: + if len_(buf) < 8 or len_(buf) < (cmdhdr(buf)[1]+8): + # Get cmd + try: + cmd = router_socket.recv(2000) + except socket.timeout, e: + continue + if not cmd: + print >>sys.stderr, "PLRT CONNECTION BROKEN" + TERMINATE.append(None) + break + + if buf: + buf += cmd + cmd = buf + + if len_(cmd) < 8: + continue + + op,dlen,data = cmdhdr(cmd) + if len_(data) < dlen: + continue + + buf = buffer_(data, dlen) + data = buffer_(data, 0, dlen) + + print >>sys.stderr, "COMMAND", op, "DATA", dlen + + if op in commands: + try: + commands[op](data) + except: + traceback.print_exc(file=sys.stderr) + else: + print >>sys.stderr, "IGNORING UNKNOWN COMMAND", op + + def stop(self): + self._stop = True + self.join(1+5*options.poll_delay) + + + +igmp_threads = [] +for vif_addr in remaining_args: + igmp_threads.append(IGMPThread(vif_addr)) + +rt_cache = {} +vifs = {} + +TERMINATE = [] +TERMINATE = [] +def _finalize(sig,frame): + global TERMINATE + TERMINATE.append(None) +signal.signal(signal.SIGTERM, _finalize) + + +try: + if not options.announce_only: + router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) + router_socket.bind(options.mrt_path) + router_socket.listen(0) + router_remote_socket, router_remote_addr = router_socket.accept() + + fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs) + router_thread = RouterThread(rt_cache, router_remote_socket, vifs) + + for thread in igmp_threads: + thread.start() + + if not options.announce_only: + fwd_thread.start() + router_thread.start() + + while not TERMINATE: + time.sleep(30) +finally: + if os.path.exists(options.mrt_path): + try: + os.remove(options.mrt_path) + except: + pass + if os.path.exists(options.fwd_path): + try: + os.remove(options.fwd_path) + except: + pass + + diff --git a/src/nepi/testbeds/planetlab/scripts/mrouted-3.9.5-pl.patch b/src/nepi/testbeds/planetlab/scripts/mrouted-3.9.5-pl.patch new file mode 100644 index 00000000..f22175f7 --- /dev/null +++ b/src/nepi/testbeds/planetlab/scripts/mrouted-3.9.5-pl.patch @@ -0,0 +1,555 @@ +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/config.c mrouted-3.9.5-pl/config.c +--- mrouted-3.9.5/config.c 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/config.c 2011-08-29 16:32:43.710565000 +0200 +@@ -91,11 +91,41 @@ + v->uv_subnetbcast = subnet | ~mask; + strncpy(v->uv_name, ifa->ifa_name, sizeof(v->uv_name)); + +- if (flags & IFF_POINTOPOINT) ++ if (flags & IFF_POINTOPOINT) { + v->uv_flags |= VIFF_REXMIT_PRUNES; ++ v->uv_ptp_addr = ((struct sockaddr_in *)ifa->ifa_dstaddr)->sin_addr.s_addr; ++ } else if (flags & IFF_BROADCAST) { ++ /* getifaddr doesn't give us the p2p link in this case */ ++ /* So use ip, which uses netlink, to query it */ ++ char buf[1024]; ++ size_t rode; ++ FILE *peer; ++ ++ logit(LOG_INFO,0,"Getting ptp for %s", ifa->ifa_name); ++ ++ snprintf(buf,sizeof(buf),"ip addr show %s | grep -o 'peer [0-9.]*' | grep -o '[0-9.]*'", ifa->ifa_name); ++ peer = popen(buf, "r"); ++ rode = fread(buf, 1, sizeof(buf), peer); ++ pclose(peer); ++ ++ if (rode > 0) { ++ /* It has a pointopoint address */ ++ struct in_addr ptp_in; ++ ++ for (--rode; rode && buf[rode] <= 13;) ++ --rode; ++ buf[++rode] = 0; ++ ++ logit(LOG_INFO,0,"Got %s", buf); ++ ++ if (inet_aton(buf, &ptp_in)) ++ v->uv_ptp_addr = ptp_in.s_addr; ++ } ++ } + +- logit(LOG_INFO,0,"installing %s (%s on subnet %s) as vif #%u - rate=%d", ++ logit(LOG_INFO,0,"installing %s (%s on subnet %s%s%s) as vif #%u - rate=%d", + v->uv_name, inet_fmt(addr, s1, sizeof(s1)), inet_fmts(subnet, mask, s2, sizeof(s2)), ++ (v->uv_ptp_addr) ? "peer " : "", (v->uv_ptp_addr) ? inet_fmt(v->uv_ptp_addr, s3, sizeof(s3)) : "", + numvifs, v->uv_rate_limit); + + ++numvifs; +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/defs.h mrouted-3.9.5-pl/defs.h +--- mrouted-3.9.5/defs.h 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/defs.h 2011-08-30 15:30:23.691662000 +0200 +@@ -9,6 +9,8 @@ + #ifndef __MROUTED_DEFS_H__ + #define __MROUTED_DEFS_H__ + ++#define PLANETLAB 1 ++ + #include + #include + #include +@@ -28,6 +30,7 @@ + #include + #include + #include ++#include + #include + #include + #include +@@ -61,7 +64,7 @@ + #include + #endif + #endif +-#ifdef RSRR ++#if defined(RSRR) || defined(PLANETLAB) + #include + #endif /* RSRR */ + +@@ -137,6 +140,11 @@ + extern u_int32 dvmrp_group; + extern u_int32 dvmrp_genid; + ++#ifdef PLANETLAB ++extern int plrt_socket; ++extern char *plrt_socket_path; ++#endif ++ + #define IF_DEBUG(l) if (debug && debug & (l)) + + #define DEBUG_PKT 0x0001 +@@ -353,6 +361,8 @@ + extern void k_leave(u_int32, u_int32); + extern void k_init_dvmrp(void); + extern void k_stop_dvmrp(void); ++extern void k_init_plrt(void); ++extern void k_stop_plrt(void); + extern void k_add_vif(vifi_t, struct uvif *); + extern void k_del_vif(vifi_t, struct uvif *); + extern void k_add_rg(u_int32, struct gtable *); +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/igmp.c mrouted-3.9.5-pl/igmp.c +--- mrouted-3.9.5/igmp.c 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/igmp.c 2011-08-29 17:03:24.187961000 +0200 +@@ -190,8 +190,10 @@ + if (ip->ip_p == 0) { + if (src == 0 || dst == 0) + logit(LOG_WARNING, 0, "kernel request not accurate"); +- else ++ else { ++ logit(LOG_DEBUG, 0, "received kernel miss"); + add_table_entry(src, dst); ++ } + return; + } + +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/kern.c mrouted-3.9.5-pl/kern.c +--- mrouted-3.9.5/kern.c 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/kern.c 2011-08-31 15:09:21.457071000 +0200 +@@ -7,16 +7,26 @@ + * Leland Stanford Junior University. + */ + ++#include + #include "defs.h" + + int curttl = 0; + ++#ifdef PLANETLAB ++ ++int plrt_socket; ++char *plrt_socket_path = _PATH_MROUTED_PLRT; ++ ++#endif ++ + /* + * Open/init the multicast routing in the kernel and sets the + * MRT_PIM (aka MRT_ASSERT) flag in the kernel. + */ + void k_init_dvmrp(void) + { ++#ifndef PLANETLAB ++ + #ifdef OLD_KERNEL + if (setsockopt(igmp_socket, IPPROTO_IP, MRT_INIT, (char *)NULL, 0) < 0) { + #else +@@ -24,11 +34,34 @@ + + if (setsockopt(igmp_socket, IPPROTO_IP, MRT_INIT, (char *)&v, sizeof(int)) < 0) { + #endif ++ + if (errno == EADDRINUSE) + logit(LOG_ERR, 0, "Another multicast routing application is already running."); + else + logit(LOG_ERR, errno, "Cannot enable multicast routing in kernel"); + } ++ ++#endif ++} ++ ++void k_init_plrt(void) ++{ ++#ifdef PLANETLAB ++ /* ++ * Just open a connection to the user-space forwarder ++ */ ++ ++ if ((plrt_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0)) < 0) ++ logit(LOG_ERR, errno, "PLRT socket"); ++ ++ struct sockaddr_un sun; ++ memset(&sun, 0, sizeof(sun)); ++ sun.sun_family = AF_UNIX; ++ strncpy(sun.sun_path, plrt_socket_path, sizeof(sun.sun_path)); ++ ++ if ((connect(plrt_socket, &sun, sizeof(sun))) < 0) ++ logit(LOG_ERR, errno, "PLRT socket connect"); ++#endif + } + + +@@ -38,8 +71,17 @@ + */ + void k_stop_dvmrp(void) + { ++#ifndef PLANETLAB + if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DONE, (char *)NULL, 0) < 0) + logit(LOG_WARNING, errno, "Cannot disable multicast routing in kernel"); ++#endif ++} ++ ++void k_stop_plrt(void) ++{ ++#ifdef PLANETLAB ++ close(plrt_socket); ++#endif + } + + +@@ -194,11 +236,25 @@ + */ + void k_add_vif(vifi_t vifi, struct uvif *v) + { +- struct vifctl vc; +- +- vc.vifc_vifi = vifi; +- uvif_to_vifctl(&vc, v); +- if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_VIF, (char *)&vc, sizeof(vc)) < 0) ++ /* ++ * PlanetLab does application-level forwarding ++ */ ++ struct { ++ u_int32 op; ++ u_int32 len; ++ struct vifctl vc; ++ } op; ++ ++ op.vc.vifc_vifi = vifi; ++ uvif_to_vifctl(&op.vc, v); ++ ++#ifdef PLANETLAB ++ op.op = MRT_ADD_VIF; ++ op.len = sizeof(op.vc); ++ if (send(plrt_socket, &op, sizeof(op), 0) < 0) ++#else ++ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_VIF, (char *)&op.vc, sizeof(op.vc)) < 0) ++#endif + logit(LOG_ERR, errno, "setsockopt MRT_ADD_VIF on vif %d", vifi); + } + +@@ -214,6 +270,20 @@ + * we're going to delete. *BSD systems on the other hand exepect only the index + * of that VIF. + */ ++#ifdef PLANETLAB ++ struct { ++ u_int32 op; ++ u_int32 len; ++ struct vifctl vc; ++ } op; ++ ++ op.vc.vifc_vifi = vifi; ++ uvif_to_vifctl(&op.vc, v); ++ ++ op.op = MRT_DEL_VIF; ++ op.len = sizeof(op.vc); ++ if (send(plrt_socket, &op, sizeof(op), 0) < 0) ++#else + #ifdef __linux__ + struct vifctl vc; + +@@ -224,6 +294,7 @@ + #else /* *BSD et al. */ + if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_VIF, (char *)&vifi, sizeof(vifi)) < 0) + #endif /* !__linux__ */ ++#endif /* PLANETLAB */ + { + if (errno == EADDRNOTAVAIL || errno == EINVAL) + return; +@@ -238,31 +309,57 @@ + */ + void k_add_rg(u_int32 origin, struct gtable *g) + { +- struct mfcctl mc; ++ struct { ++ u_int32 op; ++ u_int32 len; ++ struct mfcctl mc; ++ } op; ++ + vifi_t i; + + #ifdef DEBUG_MFC + md_log(MD_ADD, origin, g->gt_mcastgrp); + #endif + /* copy table values so that setsockopt can process it */ +- mc.mfcc_origin.s_addr = origin; ++ op.mc.mfcc_origin.s_addr = origin; + #ifdef OLD_KERNEL +- mc.mfcc_originmask.s_addr = 0xffffffff; ++ op.mc.mfcc_originmask.s_addr = 0xffffffff; + #endif +- mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp; +- mc.mfcc_parent = g->gt_route ? g->gt_route->rt_parent : NO_VIF; ++ op.mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp; ++ op.mc.mfcc_parent = g->gt_route ? g->gt_route->rt_parent : NO_VIF; + for (i = 0; i < numvifs; i++) +- mc.mfcc_ttls[i] = g->gt_ttls[i]; ++ op.mc.mfcc_ttls[i] = g->gt_ttls[i]; ++ ++#ifdef PLANETLAB ++ ++ logit(LOG_DEBUG, 0, "setsockopt MRT_ADD_MFC %s-%s %d[%d %d %d %d %d %d]", ++ inet_fmt(origin, s1, sizeof(s1)), ++ inet_fmt(g->gt_mcastgrp, s2, sizeof(s2)), ++ numvifs, ++ op.mc.mfcc_ttls[0], op.mc.mfcc_ttls[1], op.mc.mfcc_ttls[2], ++ op.mc.mfcc_ttls[3], op.mc.mfcc_ttls[4], op.mc.mfcc_ttls[5] ); ++ ++ /* Send to PlanetLab's user-space MRT daemon */ ++ op.op = MRT_ADD_MFC; ++ op.len = sizeof(op.mc); ++ if (send(plrt_socket, &op, sizeof(op), 0) < 0) ++ ++#else /* here if not PLANETLAB */ + + /* write to kernel space */ + if (setsockopt(igmp_socket, IPPROTO_IP, MRT_ADD_MFC, +- (char *)&mc, sizeof(mc)) < 0) { ++ (char *)&op.mc, sizeof(op.mc)) < 0) ++ ++#endif /* PLANETLAB */ ++ ++ { + #ifdef DEBUG_MFC + md_log(MD_ADD_FAIL, origin, g->gt_mcastgrp); + #endif + logit(LOG_WARNING, errno, "setsockopt MRT_ADD_MFC", + inet_fmt(origin, s1, sizeof(s1)), inet_fmt(g->gt_mcastgrp, s2, sizeof(s2))); + } ++ + } + + +@@ -271,20 +368,37 @@ + */ + int k_del_rg(u_int32 origin, struct gtable *g) + { +- struct mfcctl mc; ++ struct { ++ u_int32 op; ++ u_int32 len; ++ struct mfcctl mc; ++ } op; + + #ifdef DEBUG_MFC + md_log(MD_DEL, origin, g->gt_mcastgrp); + #endif + /* copy table values so that setsockopt can process it */ +- mc.mfcc_origin.s_addr = origin; ++ op.mc.mfcc_origin.s_addr = origin; + #ifdef OLD_KERNEL +- mc.mfcc_originmask.s_addr = 0xffffffff; ++ op.mc.mfcc_originmask.s_addr = 0xffffffff; + #endif +- mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp; ++ op.mc.mfcc_mcastgrp.s_addr = g->gt_mcastgrp; ++ ++#ifdef PLANETLAB ++ ++ /* Send to PlanetLab's user-space MRT daemon */ ++ op.op = MRT_DEL_MFC; ++ op.len = sizeof(op.mc); ++ if (send(plrt_socket, &op, sizeof(op), 0) < 0) ++ ++#else /* here if not PLANETLAB */ + + /* write to kernel space */ +- if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_MFC, (char *)&mc, sizeof(mc)) < 0) { ++ if (setsockopt(igmp_socket, IPPROTO_IP, MRT_DEL_MFC, (char *)&op.mc, sizeof(op.mc)) < 0) ++ ++#endif /* PLANETLAB */ ++ ++ { + #ifdef DEBUG_MFC + md_log(MD_DEL_FAIL, origin, g->gt_mcastgrp); + #endif +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/main.c mrouted-3.9.5-pl/main.c +--- mrouted-3.9.5/main.c 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/main.c 2011-08-30 15:31:01.281814000 +0200 +@@ -52,9 +52,9 @@ + time_t mrouted_init_time; + + #ifdef SNMP +-#define NHANDLERS 34 ++#define NHANDLERS 35 + #else +-#define NHANDLERS 2 ++#define NHANDLERS 3 + #endif + + static struct ihandler { +@@ -206,6 +206,9 @@ + fputs(" -h, --help Show this help text\n", stderr); + fputs(" -p Disable pruning. Deprecated, compatibility option\n", stderr); + fputs(" -r, --show-routes Show state of VIFs and multicast routing tables\n", stderr); ++#ifdef PLANETLAB ++ fputs(" -F Path to the PlanetLab userland forwarder, default /var/run/mcastfwd\n", stderr); ++#endif + fprintf(stderr, " -v, --version Show %s version\n", __progname); + fputs("\n", stderr); + +@@ -258,11 +261,23 @@ + + snprintf(versionstring, sizeof(versionstring), "mrouted version %s", todaysversion); + +- while ((ch = getopt_long(argc, argv, "c:d::fhpP::rv", long_options, NULL)) != EOF) { ++#ifdef PLANETLAB ++#define PLOPTIONS ":F" ++#else ++#define PLOPTIONS "" ++#endif ++ ++ while ((ch = getopt_long(argc, argv, "c:d::fhpP::rv" PLOPTIONS, long_options, NULL)) != EOF) { + switch (ch) { + case 'c': + configfilename = optarg; + break; ++ ++#ifdef PLANETLAB ++ case 'F': ++ plrt_socket_path = optarg; ++ break; ++#endif + + case 'd': + if (!optarg) +@@ -410,6 +425,8 @@ + init_ipip(); + init_routes(); + init_ktable(); ++ k_init_plrt(); ++ + #ifndef OLD_KERNEL + /* + * Unfortunately, you can't k_get_version() unless you've +@@ -422,6 +439,7 @@ + k_init_dvmrp(); + vers = k_get_version(); + k_stop_dvmrp(); ++ + /*XXX + * This function must change whenever the kernel version changes + */ +@@ -466,6 +484,15 @@ + logit(LOG_ERR, 0, "Descriptor too big"); + FD_SET(igmp_socket, &readers); + nfds = igmp_socket + 1; ++ ++#ifdef PLANETLAB ++ if (plrt_socket >= (int)FD_SETSIZE) ++ logit(LOG_ERR, 0, "Descriptor too big"); ++ FD_SET(plrt_socket, &readers); ++ if (plrt_socket >= nfds) ++ nfds = plrt_socket + 1; ++#endif ++ + for (i = 0; i < nhandlers; i++) { + if (ihandlers[i].fd >= (int)FD_SETSIZE) + logit(LOG_ERR, 0, "Descriptor too big"); +@@ -602,6 +629,17 @@ + accept_igmp(recvlen); + } + ++#ifdef PLANETLAB ++ if (FD_ISSET(plrt_socket, &rfds)) { ++ recvlen = recvfrom(plrt_socket, recv_buf, RECV_BUF_SIZE, 0, NULL, &dummy); ++ if (recvlen < 0) { ++ if (errno != EINTR) logit(LOG_ERR, errno, "recvfrom"); ++ continue; ++ } ++ accept_igmp(recvlen); ++ } ++#endif ++ + for (i = 0; i < nhandlers; i++) { + if (FD_ISSET(ihandlers[i].fd, &rfds)) { + (*ihandlers[i].func)(ihandlers[i].fd, &rfds); +@@ -808,6 +846,7 @@ + if (did_final_init) + k_stop_dvmrp(); + } ++ k_stop_plrt(); + } + + /* +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/Makefile mrouted-3.9.5-pl/Makefile +--- mrouted-3.9.5/Makefile 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/Makefile 2011-08-10 17:18:22.233596000 +0200 +@@ -43,7 +43,7 @@ + + ## Common + CFLAGS = $(MCAST_INCLUDE) $(SNMPDEF) $(RSRRDEF) $(INCLUDES) $(DEFS) $(USERCOMPILE) +-CFLAGS += -O2 -W -Wall -Werror ++CFLAGS += -O2 -W -Wall + #CFLAGS += -O -g + LDLIBS = $(SNMPLIBDIR) $(SNMPLIBS) $(EXTRA_LIBS) + LDFLAGS += -Wl,-Map,$@.map +Only in mrouted-3.9.5-pl: Makefile.bk +Only in mrouted-3.9.5-pl: map-mbone +Only in mrouted-3.9.5-pl: map-mbone.map +Only in mrouted-3.9.5-pl: mrinfo +Only in mrouted-3.9.5-pl: mrinfo.map +Only in mrouted-3.9.5-pl: mrouted +Only in mrouted-3.9.5-pl: mrouted.map +Only in mrouted-3.9.5-pl: mtrace +Only in mrouted-3.9.5-pl: mtrace.map +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/pathnames.h mrouted-3.9.5-pl/pathnames.h +--- mrouted-3.9.5/pathnames.h 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/pathnames.h 2011-08-29 13:10:27.043684000 +0200 +@@ -14,5 +14,6 @@ + #define _PATH_MROUTED_GENID _PATH_MROUTED_RUNDIR "/mrouted.genid" + #define _PATH_MROUTED_DUMP _PATH_MROUTED_RUNDIR "/mrouted.dump" + #define _PATH_MROUTED_CACHE _PATH_MROUTED_RUNDIR "/mrouted.cache" ++#define _PATH_MROUTED_PLRT _PATH_VARRUN "mcastrt" + + #endif /* __MROUTED_PATHNAMES_H__ */ +Only in mrouted-3.9.5-pl: vers.c +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/vif.c mrouted-3.9.5-pl/vif.c +--- mrouted-3.9.5/vif.c 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/vif.c 2011-08-31 19:26:52.955453000 +0200 +@@ -139,6 +139,7 @@ + v->uv_lcl_addr = 0; + v->uv_rmt_addr = 0; + v->uv_dst_addr = t ? 0 : dvmrp_group; ++ v->uv_ptp_addr = 0; + v->uv_subnet = 0; + v->uv_subnetmask = 0; + v->uv_subnetbcast = 0; +@@ -379,6 +380,8 @@ + start_route_updates(); + update_route(p->pa_subnet, p->pa_subnetmask, 0, 0, vifi, NULL); + } ++ if (v->uv_ptp_addr) ++ update_route(v->uv_ptp_addr, 0xffffffff, 0, 0, vifi, NULL); + + /* + * Until neighbors are discovered, assume responsibility for sending +@@ -526,6 +529,8 @@ + return(vifi); + } + else { ++ if (src == v->uv_ptp_addr) ++ return(vifi); + if ((src & v->uv_subnetmask) == v->uv_subnet && + ((v->uv_subnetmask == 0xffffffff) || + (src != v->uv_subnetbcast))) +@@ -1666,6 +1671,10 @@ + scaletime(now - v->uv_querier->al_ctime), + scaletime(v->uv_querier->al_timer)); + } ++ if (0 != v->uv_ptp_addr) { ++ fprintf(fp, " PtP remote: %-18s\n", ++ inet_fmt(v->uv_ptp_addr, s1, sizeof(s1))); ++ } + if (v->uv_flags & VIFF_BLASTER) + fprintf(fp, " blasterbuf size: %dk\n", + v->uv_blasterlen / 1024); +diff -ur --exclude='*.o' --exclude=cfparse.c --exclude='*~' mrouted-3.9.5/vif.h mrouted-3.9.5-pl/vif.h +--- mrouted-3.9.5/vif.h 2011-03-05 21:45:25.000000000 +0100 ++++ mrouted-3.9.5-pl/vif.h 2011-08-29 14:35:35.695829000 +0200 +@@ -109,6 +109,7 @@ + u_int32 uv_lcl_addr; /* local address of this vif */ + u_int32 uv_rmt_addr; /* remote end-point addr (tunnels only) */ + u_int32 uv_dst_addr; /* destination for DVMRP/PIM messages */ ++ u_int32 uv_ptp_addr; /* remote peer address (pointopoint only) */ + u_int32 uv_subnet; /* subnet number (phyints only) */ + u_int32 uv_subnetmask; /* subnet mask (phyints only) */ + u_int32 uv_subnetbcast;/* subnet broadcast addr (phyints only) */ diff --git a/src/nepi/testbeds/planetlab/scripts/tun_connect.py b/src/nepi/testbeds/planetlab/scripts/tun_connect.py index cf55e0da..f015bade 100644 --- a/src/nepi/testbeds/planetlab/scripts/tun_connect.py +++ b/src/nepi/testbeds/planetlab/scripts/tun_connect.py @@ -19,7 +19,12 @@ import base64 import traceback import tunchannel -import ipaddr2 + +try: + import iovec + HAS_IOVEC = True +except: + HAS_IOVEC = False tun_name = 'tun0' tun_path = '/dev/net/tun' @@ -113,8 +118,8 @@ parser.add_option( "Specify a symmetric encryption key with which to protect packets across " "the tunnel. python-crypto must be installed on the system." ) parser.add_option( - "-K", "--gre-key", dest="gre_key", metavar="KEY", type="int", - default = None, + "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string", + default = "true", help = "Specify a demultiplexing 32-bit numeric key for GRE." ) parser.add_option( @@ -133,14 +138,12 @@ parser.add_option( help = "If specified, packets won't be logged to standard output, " "but dumped to a pcap-formatted trace in the specified file. " ) parser.add_option( - "--multicast", dest="multicast", - action = "store_true", - default = False, - help = "If specified, multicast packets will be forwarded and IGMP " - "join/leave packets will be generated. Routing information " - "must be sent to the mroute unix socket, in a format identical " - "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL " - "code and 32-bit data length." ) + "--multicast-forwarder", dest="multicast_fwd", + default = None, + help = "If specified, multicast packets will be forwarded to " + "the specified unix-domain socket. If the device uses ethernet " + "frames, ethernet headers will be stripped and IP packets " + "will be forwarded, prefixed with the interface's address." ) parser.add_option( "--filter", dest="filter_module", metavar="PATH", default = None, @@ -222,65 +225,6 @@ IFNAMSIZ = 0x00000010 IFREQ_SZ = 0x00000028 FIONREAD = 0x0000541b -class MulticastThread(threading.Thread): - def __init__(self, *p, **kw): - super(MulticastThread, self).__init__(*p, **kw) - self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP) - self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, - socket.inet_aton(options.vif_addr) ) - self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) - self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) - self._stop = False - self.setDaemon(True) - - def run(self): - devnull = open('/dev/null','r+b') - maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*") - cur_maddr = set() - lastfullrefresh = time.time() - while not self._stop: - # Get current subscriptions - proc = subprocess.Popen(['ip','maddr','show',tun_name], - stdout = subprocess.PIPE, - stderr = subprocess.STDOUT, - stdin = devnull) - new_maddr = set() - for line in proc.stdout: - match = maddr_re.match(line) - if match: - new_maddr.add(match.group(1)) - proc.wait() - - # Every now and then, send a full report - now = time.time() - report_new = new_maddr - if (now - lastfullrefresh) <= 30.0: - report_new = report_new - cur_maddr - else: - lastfullrefresh = now - - # Report subscriptions - for grp in report_new: - igmpp = ipaddr2.ipigmp( - options.vif_addr, '224.0.0.2', 1, 0x16, 0, grp, - noipcksum=True) - self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0)) - - # Notify group leave - for grp in cur_maddr - new_maddr: - igmpp = ipaddr2.ipigmp( - options.vif_addr, '224.0.0.2', 1, 0x17, 0, grp, - noipcksum=True) - self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0)) - - cur_maddr = new_maddr - - time.sleep(1) - - def stop(self): - self._stop = True - self.join(5) - class HostLock(object): # This class is used as a lock to prevent concurrency issues with more # than one instance of netns running in the same machine. Both in @@ -517,7 +461,7 @@ def pl_vif_start(tun_path, tun_name): if options.vif_txqueuelen is not None: stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,)) if options.mode.startswith('pl-gre'): - stdin.write("gre=%d\n" % (options.gre_key,)) + stdin.write("gre=%s\n" % (options.gre_key,)) stdin.write("remote=%s\n" % (options.peer_addr,)) stdin.close() @@ -694,6 +638,12 @@ else: filter_close = None queueclass = None +# install multicast forwarding hook +if options.multicast_fwd: + print >>sys.stderr, "Connecting to mcast filter" + mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + tunchannel.nonblock(mcfwd_sock.fileno()) + # be careful to roll back stuff on exceptions tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name) try: @@ -719,6 +669,61 @@ try: tcpdump = None reconnect = None mcastthread = None + + # install multicast forwarding hook + if options.multicast_fwd: + print >>sys.stderr, "Installing mcast filter" + + if HAS_IOVEC: + writev = iovec.writev + else: + os_write = os.write + map_ = map + str_ = str + def writev(fileno, *stuff): + os_write(''.join(map_(str_,stuff))) + + def accept_packet(packet, direction, + _up_accept=accept_packet, + sock=mcfwd_sock, + sockno=mcfwd_sock.fileno(), + etherProto=tunchannel.etherProto, + etherStrip=tunchannel.etherStrip, + etherMode=tun_name.startswith('tap'), + multicast_fwd = options.multicast_fwd, + vif_addr = socket.inet_aton(options.vif_addr), + connected = [], writev=writev, + len=len, ord=ord): + if _up_accept: + rv = _up_accept(packet, direction) + if not rv: + return rv + + if direction == 1: + # Incoming... what? + if etherMode: + if etherProto(packet)=='\x08\x00': + fwd = etherStrip(packet) + else: + fwd = None + else: + fwd = packet + if fwd is not None and len(fwd) >= 20: + if (ord(fwd[16]) & 0xf0) == 0xe0: + # Forward it + if not connected: + try: + sock.connect(multicast_fwd) + connected.append(None) + except: + traceback.print_exc(file=sys.stderr) + if connected: + try: + writev(sockno, vif_addr,fwd) + except: + traceback.print_exc(file=sys.stderr) + return 1 + if options.protocol == 'fd': if accept_packet or filter_init: @@ -825,11 +830,6 @@ try: # or perhaps there is no os.nice support in the system pass - if options.multicast: - # Start multicast forwarding daemon - mcastthread = MulticastThread() - mcastthread.start() - if not filter_init: tun_fwd(tun, remote, reconnect = reconnect, diff --git a/src/nepi/testbeds/planetlab/tunproto.py b/src/nepi/testbeds/planetlab/tunproto.py index 3f10b85b..aeb4e674 100644 --- a/src/nepi/testbeds/planetlab/tunproto.py +++ b/src/nepi/testbeds/planetlab/tunproto.py @@ -26,6 +26,7 @@ class TunProtoBase(object): self.port = 15000 self.mode = 'pl-tun' self.key = key + self.cross_slice = False self.home_path = home_path @@ -198,9 +199,10 @@ class TunProtoBase(object): local_snat = local.snat local_txq = local.txqueuelen local_p2p = local.pointopoint - local_cipher = local.tun_cipher - local_mcast = local.multicast - local_bwlim = local.bwlimit + local_cipher=local.tun_cipher + local_mcast= local.multicast + local_bwlim= local.bwlimit + local_mcastfwd = local.multicast_forwarder if not local_p2p and hasattr(peer, 'address'): local_p2p = peer.address @@ -231,7 +233,8 @@ class TunProtoBase(object): "-t", str(check_proto), "-A", str(local_addr), "-M", str(local_mask), - "-C", str(local_cipher)] + "-C", str(local_cipher), + ] if check_proto == 'fd': passfd_arg = str(peer_addr) @@ -244,8 +247,12 @@ class TunProtoBase(object): "--pass-fd", passfd_arg ]) elif check_proto == 'gre': + if self.cross_slice: + args.extend([ + "-K", str(self.key.strip('=')) + ]) + args.extend([ - "-K", str(min(local_port, peer_port)), "-a", str(peer_addr), ]) # both udp and tcp @@ -267,14 +274,14 @@ class TunProtoBase(object): args.append("-N") elif local_cap == 'pcap': args.extend(('-c','pcap')) - if local_mcast: - args.append("--multicast") if local_bwlim: args.extend(("-b",str(local_bwlim*1024))) if filter_module: args.extend(("--filter", filter_module)) if filter_args: args.extend(("--filter-args", filter_args)) + if local_mcast and local_mcastfwd: + args.extend(("--multicast-forwarder", local_mcastfwd)) self._logger.info("Starting %s", self) @@ -324,7 +331,7 @@ class TunProtoBase(object): # Connected? (out,err),proc = server.eintr_retry(server.popen_ssh_command)( - "cd %(home)s ; grep -c Connected capture" % dict( + "cd %(home)s ; grep -a -c Connected capture" % dict( home = server.shell_escape(self.home_path)), host = local.node.hostname, port = None, @@ -342,7 +349,7 @@ class TunProtoBase(object): # At least listening? (out,err),proc = server.eintr_retry(server.popen_ssh_command)( - "cd %(home)s ; grep -c Listening capture" % dict( + "cd %(home)s ; grep -a -c Listening capture" % dict( home = server.shell_escape(self.home_path)), host = local.node.hostname, port = None, @@ -381,7 +388,7 @@ class TunProtoBase(object): # Inspect the trace to check the assigned iface local = self.local() if local: - cmd = "cd %(home)s ; grep 'Using tun:' capture | head -1" % dict( + cmd = "cd %(home)s ; grep -a 'Using tun:' capture | head -1" % dict( home = server.shell_escape(self.home_path)) for spin in xrange(30): (out,err),proc = server.eintr_retry(server.popen_ssh_command)( diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index ce780502..970f83cf 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -120,15 +120,15 @@ def _pullPacket(buf, ether_mode=False, len=len): rv = buf.popleft() return rv -def etherStrip(buf): +def etherStrip(buf, buffer=buffer, len=len): if len(buf) < 14: return "" if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00': # tagged ethernet frame - return buf[18:] + return buffer(buf, 18) elif buf[12:14] == '\x08\x00': # untagged ethernet frame - return buf[14:] + return buffer(buf, 14) else: return ""