From e99ccb2de1d64165cc20515769d19acdbad0cc0c Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Sat, 1 Oct 2011 22:23:45 +0200 Subject: [PATCH] Forward all packets to the default multicast egress when a MulticastForwarder is not connected to a router. --- src/nepi/testbeds/planetlab/multicast.py | 15 ++--- .../testbeds/planetlab/scripts/mcastfwd.py | 56 +++++++++++++++++-- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/src/nepi/testbeds/planetlab/multicast.py b/src/nepi/testbeds/planetlab/multicast.py index ffb25d44..1fdffa04 100644 --- a/src/nepi/testbeds/planetlab/multicast.py +++ b/src/nepi/testbeds/planetlab/multicast.py @@ -40,13 +40,10 @@ class MulticastForwarder(application.Application): self.router = None def _command_get(self): - # canonical representation of dependencies - depends = ' '.join( sorted( (self.depends or "").split(' ') ) ) - - # download rpms and pack into a tar archive - return ( - "python mcastfwd.py %s" - ) % ( ' '.join([iface.address for iface in self.ifaces]), ) + cmd = "python mcastfwd.py " + if not self.router: + cmd += "-R " + cmd += ' '.join([iface.address for iface in self.ifaces]) def _command_set(self, value): # ignore return @@ -77,10 +74,6 @@ class MulticastAnnouncer(application.Application): self.router = None def _command_get(self): - # canonical representation of dependencies - depends = ' '.join( sorted( (self.depends or "").split(' ') ) ) - - # download rpms and pack into a tar archive return ( "python mcastfwd.py -A %s" ) % ( ' '.join([iface.address for iface in self.ifaces]), ) diff --git a/src/nepi/testbeds/planetlab/scripts/mcastfwd.py b/src/nepi/testbeds/planetlab/scripts/mcastfwd.py index 77bab0c0..fba0d890 100644 --- a/src/nepi/testbeds/planetlab/scripts/mcastfwd.py +++ b/src/nepi/testbeds/planetlab/scripts/mcastfwd.py @@ -38,7 +38,13 @@ parser.add_option( parser.add_option( "-A", "--announce-only", dest="announce_only", action="store_true", default = False, - help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.") + help = "If given, only group membership announcements will be made. " + "Useful for non-router non-member multicast nodes.") +parser.add_option( + "-R", "--no-router", dest="no_router", action="store_true", + default = False, + help = "If given, only group membership announcements and forwarding to the default multicast egress will be made. " + "Useful for non-router but member multicast nodes.") parser.add_option( "-v", "--verbose", dest="verbose", action="store_true", default = False, @@ -103,7 +109,7 @@ class IGMPThread(threading.Thread): cur_maddr = set() lastfullrefresh = time.time() while not self._stop: - # Get current subscriptions + # Get current subscriptions @ vif proc = subprocess.Popen(['ip','maddr','show',self.tun_name], stdout = subprocess.PIPE, stderr = subprocess.STDOUT, @@ -115,6 +121,19 @@ class IGMPThread(threading.Thread): new_maddr.add(match.group(1)) proc.wait() + # Get current subscriptions @ eth0 (default on PL), + # they should be considered "universal" suscriptions. + proc = subprocess.Popen(['ip','maddr','show', 'eth0'], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = devnull) + new_maddr = set() + for line in proc.stdout: + match = maddr_re.match(line) + if match: + new_maddr.add(match.group(1)) + proc.wait() + # Every now and then, send a full report now = time.time() report_new = new_maddr @@ -166,6 +185,8 @@ class FWDThread(threading.Thread): self.rt_cache = rt_cache self.router_socket = router_socket self.vifs = vifs + + # prepare forwarding sockets self.fwd_sockets = {} for fwd_target in remaining_args: fwd_target = socket.inet_aton(fwd_target) @@ -175,6 +196,21 @@ class FWDThread(threading.Thread): fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) self.fwd_sockets[fwd_target] = fwd_socket + # we always forward to eth0 + # In PL, we cannot join the multicast routers in eth0, + # that would bring a lot of trouble. But we can + # listen there for subscriptions and forward interesting + # packets, partially joining the mbone + # TODO: IGMP messages from eth0 should be selectively + # replicated in all vifs to propagate external + # subscriptions. It is complex though. + fwd_target = '\x00'*4 + fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) + fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) + fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target) + fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) + self.fwd_sockets[fwd_target] = fwd_socket + self._stop = False self.setDaemon(True) @@ -227,7 +263,7 @@ class FWDThread(threading.Thread): fwd_targets, rparent = rt_cache.get(addrinfo, noent) if fwd_targets is not None and (rparent == '\x00\x00\x00\x00' or rparent == parent): - # Forward + # Forward to vifs ttl = ord_(packet[8]) tgt_group = (socket.inet_ntoa(addrinfo[4:]),0) print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl, @@ -240,6 +276,11 @@ class FWDThread(threading.Thread): print >>sys.stderr, socket.inet_ntoa(vif[4]), fwd_socket = fwd_sockets[vif[4]] fwd_socket.sendto(packet, 0, tgt_group) + + # Forward to eth0 + fwd_socket = fwd_sockets[vif[4]] + fwd_socket.sendto(packet, 0, tgt_group) + print >>sys.stderr, "." else: # Mark pending @@ -393,20 +434,25 @@ signal.signal(signal.SIGTERM, _finalize) try: - if not options.announce_only: + if not options.announce_only and not options.no_router: router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) router_socket.bind(options.mrt_path) router_socket.listen(0) router_remote_socket, router_remote_addr = router_socket.accept() + router_thread = RouterThread(rt_cache, router_remote_socket, vifs) + else: + router_remote_socket = None + router_thread = None + if not options.announce_only: fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs) - router_thread = RouterThread(rt_cache, router_remote_socket, vifs) for thread in igmp_threads: thread.start() if not options.announce_only: fwd_thread.start() + if not options.no_router and not options.announce_only: router_thread.start() while not TERMINATE: -- 2.43.0