Forward all packets to the default multicast egress when a MulticastForwarder is...
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Sat, 1 Oct 2011 20:23:45 +0000 (22:23 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Sat, 1 Oct 2011 20:23:45 +0000 (22:23 +0200)
src/nepi/testbeds/planetlab/multicast.py
src/nepi/testbeds/planetlab/scripts/mcastfwd.py

index ffb25d4..1fdffa0 100644 (file)
@@ -40,13 +40,10 @@ class MulticastForwarder(application.Application):
         self.router = None
     
     def _command_get(self):
-        # canonical representation of dependencies
-        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
-        
-        # download rpms and pack into a tar archive
-        return (
-            "python mcastfwd.py %s"
-        ) % ( ' '.join([iface.address for iface in self.ifaces]), )
+        cmd = "python mcastfwd.py "
+        if not self.router:
+            cmd += "-R "
+        cmd += ' '.join([iface.address for iface in self.ifaces])
     def _command_set(self, value):
         # ignore
         return
@@ -77,10 +74,6 @@ class MulticastAnnouncer(application.Application):
         self.router = None
     
     def _command_get(self):
-        # canonical representation of dependencies
-        depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
-        
-        # download rpms and pack into a tar archive
         return (
             "python mcastfwd.py -A %s"
         ) % ( ' '.join([iface.address for iface in self.ifaces]), )
index 77bab0c..fba0d89 100644 (file)
@@ -38,7 +38,13 @@ parser.add_option(
 parser.add_option(
     "-A", "--announce-only", dest="announce_only", action="store_true",
     default = False,
-    help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.")
+    help = "If given, only group membership announcements will be made. "
+           "Useful for non-router non-member multicast nodes.")
+parser.add_option(
+    "-R", "--no-router", dest="no_router", action="store_true",
+    default = False,
+    help = "If given, only group membership announcements and forwarding to the default multicast egress will be made. "
+           "Useful for non-router but member multicast nodes.")
 parser.add_option(
     "-v", "--verbose", dest="verbose", action="store_true",
     default = False,
@@ -103,7 +109,7 @@ class IGMPThread(threading.Thread):
         cur_maddr = set()
         lastfullrefresh = time.time()
         while not self._stop:
-            # Get current subscriptions
+            # Get current subscriptions @ vif
             proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
                 stdout = subprocess.PIPE,
                 stderr = subprocess.STDOUT,
@@ -115,6 +121,19 @@ class IGMPThread(threading.Thread):
                     new_maddr.add(match.group(1))
             proc.wait()
             
+            # Get current subscriptions @ eth0 (default on PL),
+            # they should be considered "universal" suscriptions.
+            proc = subprocess.Popen(['ip','maddr','show', 'eth0'],
+                stdout = subprocess.PIPE,
+                stderr = subprocess.STDOUT,
+                stdin = devnull)
+            new_maddr = set()
+            for line in proc.stdout:
+                match = maddr_re.match(line)
+                if match:
+                    new_maddr.add(match.group(1))
+            proc.wait()
+            
             # Every now and then, send a full report
             now = time.time()
             report_new = new_maddr
@@ -166,6 +185,8 @@ class FWDThread(threading.Thread):
         self.rt_cache = rt_cache
         self.router_socket = router_socket
         self.vifs = vifs
+        
+        # prepare forwarding sockets 
         self.fwd_sockets = {}
         for fwd_target in remaining_args:
             fwd_target = socket.inet_aton(fwd_target)
@@ -175,6 +196,21 @@ class FWDThread(threading.Thread):
             fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
             self.fwd_sockets[fwd_target] = fwd_socket
         
+        # we always forward to eth0
+        # In PL, we cannot join the multicast routers in eth0,
+        # that would bring a lot of trouble. But we can
+        # listen there for subscriptions and forward interesting
+        # packets, partially joining the mbone
+        # TODO: IGMP messages from eth0 should be selectively
+        #       replicated in all vifs to propagate external
+        #       subscriptions. It is complex though.
+        fwd_target = '\x00'*4
+        fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
+        fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
+        fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target)
+        fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+        self.fwd_sockets[fwd_target] = fwd_socket
+        
         self._stop = False
         self.setDaemon(True)
     
@@ -227,7 +263,7 @@ class FWDThread(threading.Thread):
             fwd_targets, rparent = rt_cache.get(addrinfo, noent)
             
             if fwd_targets is not None and (rparent == '\x00\x00\x00\x00' or rparent == parent):
-                # Forward
+                # Forward to vifs
                 ttl = ord_(packet[8])
                 tgt_group = (socket.inet_ntoa(addrinfo[4:]),0)
                 print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl,
@@ -240,6 +276,11 @@ class FWDThread(threading.Thread):
                                 print >>sys.stderr, socket.inet_ntoa(vif[4]),
                                 fwd_socket = fwd_sockets[vif[4]]
                                 fwd_socket.sendto(packet, 0, tgt_group)
+                
+                # Forward to eth0
+                fwd_socket = fwd_sockets[vif[4]]
+                fwd_socket.sendto(packet, 0, tgt_group)
+                
                 print >>sys.stderr, "."
             else:
                 # Mark pending
@@ -393,20 +434,25 @@ signal.signal(signal.SIGTERM, _finalize)
 
 
 try:
-    if not options.announce_only:
+    if not options.announce_only and not options.no_router:
         router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
         router_socket.bind(options.mrt_path)
         router_socket.listen(0)
         router_remote_socket, router_remote_addr = router_socket.accept()
+        router_thread = RouterThread(rt_cache, router_remote_socket, vifs)
+    else:
+        router_remote_socket = None
+        router_thread = None
 
+    if not options.announce_only:
         fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs)
-        router_thread = RouterThread(rt_cache, router_remote_socket, vifs)
 
     for thread in igmp_threads:
         thread.start()
     
     if not options.announce_only:
         fwd_thread.start()
+    if not options.no_router and not options.announce_only:
         router_thread.start()
 
     while not TERMINATE: