Avoid flapping subscriptions
[nepi.git] / src / nepi / testbeds / planetlab / scripts / mcastfwd.py
index f7ef360..77262ed 100644 (file)
@@ -112,37 +112,46 @@ class IGMPThread(threading.Thread):
         while not self._stop:
             mirror_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
             
-            # Get current subscriptions @ vif
-            proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
-                stdout = subprocess.PIPE,
-                stderr = subprocess.STDOUT,
-                stdin = devnull)
-            new_maddr = set()
-            for line in proc.stdout:
-                match = maddr_re.match(line)
-                if match:
-                    new_maddr.add(match.group(1))
-            proc.wait()
+            for i in xrange(5):
+                # Get current subscriptions @ vif
+                proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.STDOUT,
+                    stdin = devnull)
+                new_maddr = set()
+                for line in proc.stdout:
+                    match = maddr_re.match(line)
+                    if match:
+                        new_maddr.add(match.group(1))
+                proc.wait()
+                if new_maddr:
+                    break
             
-            # Get current subscriptions @ eth0 (default on PL),
-            # they should be considered "universal" suscriptions.
-            proc = subprocess.Popen(['ip','maddr','show', 'eth0'],
-                stdout = subprocess.PIPE,
-                stderr = subprocess.STDOUT,
-                stdin = devnull)
-            new_maddr = set()
-            for line in proc.stdout:
-                match = maddr_re.match(line)
-                if match:
-                    new_maddr.add(match.group(1))
-                    try:
-                        mirror_socket.setsockopt(
-                            socket.IPPROTO_IP,
-                            socket.IP_ADD_MEMBERSHIP,
-                            socket.inet_aton(match.group(1))+vif_addr_i )
-                    except:
-                        traceback.print_exc(file=sys.stderr)
-            proc.wait()
+            for i in xrange(5):
+                # Get current subscriptions @ eth0 (default on PL),
+                # they should be considered "universal" suscriptions.
+                proc = subprocess.Popen(['ip','maddr','show', 'eth0'],
+                    stdout = subprocess.PIPE,
+                    stderr = subprocess.STDOUT,
+                    stdin = devnull)
+                eth_maddr = set()
+                for line in proc.stdout:
+                    match = maddr_re.match(line)
+                    if match:
+                        eth_maddr.add(match.group(1))
+                proc.wait()
+                
+                if eth_maddr:
+                    for maddr in eth_maddr:
+                        try:
+                            mirror_socket.setsockopt(
+                                socket.IPPROTO_IP,
+                                socket.IP_ADD_MEMBERSHIP,
+                                socket.inet_aton(maddr)+vif_addr_i )
+                        except:
+                            traceback.print_exc(file=sys.stderr)
+                    new_maddr.update(eth_maddr)
+                    break
             
             # Every now and then, send a full report
             now = time.time()
@@ -223,6 +232,7 @@ class FWDThread(threading.Thread):
         enumerate_ = enumerate
         fwd_sockets = self.fwd_sockets
         npending = 0
+        npendingpop = 0
         noent = (None,None)
         verbose = options.verbose
         
@@ -232,7 +242,13 @@ class FWDThread(threading.Thread):
                 if pending and npending:
                     packet = pending.pop()
                     npending -= 1
+                    npendingpop += 1
+                    if npendingpop > 10:
+                        # Don't hurry too much, 
+                        # we'll saturate the kernel's queue
+                        time.sleep(0)
                 else:
+                    npendingpop = 0
                     packet = in_socket.recv(2000)
             except socket.timeout, e:
                 if pending and not npending:
@@ -284,8 +300,8 @@ class FWDThread(threading.Thread):
                                         print >>sys.stderr, socket.inet_ntoa(vif[4]),
                                     fwd_socket = fwd_sockets[vif[4]]
                                     fwd_socket.sendto(packet, 0, tgt_group)
-                                except:
-                                    pass
+                                except Exception,e:
+                                    print >>sys.stderr, "ERROR: forwarding packet:", str(e)
                 
                 if verbose:
                     print >>sys.stderr, "."