From c99f6a5b00fea783b1e3a5c8841e6880f877eceb Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Fri, 7 Oct 2011 20:04:24 -0300 Subject: [PATCH] Avoid flapping subscriptions --- .../testbeds/planetlab/scripts/mcastfwd.py | 80 +++++++++++-------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/src/nepi/testbeds/planetlab/scripts/mcastfwd.py b/src/nepi/testbeds/planetlab/scripts/mcastfwd.py index f7ef3604..77262edd 100644 --- a/src/nepi/testbeds/planetlab/scripts/mcastfwd.py +++ b/src/nepi/testbeds/planetlab/scripts/mcastfwd.py @@ -112,37 +112,46 @@ class IGMPThread(threading.Thread): while not self._stop: mirror_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # Get current subscriptions @ vif - proc = subprocess.Popen(['ip','maddr','show',self.tun_name], - stdout = subprocess.PIPE, - stderr = subprocess.STDOUT, - stdin = devnull) - new_maddr = set() - for line in proc.stdout: - match = maddr_re.match(line) - if match: - new_maddr.add(match.group(1)) - proc.wait() + for i in xrange(5): + # Get current subscriptions @ vif + proc = subprocess.Popen(['ip','maddr','show',self.tun_name], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = devnull) + new_maddr = set() + for line in proc.stdout: + match = maddr_re.match(line) + if match: + new_maddr.add(match.group(1)) + proc.wait() + if new_maddr: + break - # Get current subscriptions @ eth0 (default on PL), - # they should be considered "universal" suscriptions. - proc = subprocess.Popen(['ip','maddr','show', 'eth0'], - stdout = subprocess.PIPE, - stderr = subprocess.STDOUT, - stdin = devnull) - new_maddr = set() - for line in proc.stdout: - match = maddr_re.match(line) - if match: - new_maddr.add(match.group(1)) - try: - mirror_socket.setsockopt( - socket.IPPROTO_IP, - socket.IP_ADD_MEMBERSHIP, - socket.inet_aton(match.group(1))+vif_addr_i ) - except: - traceback.print_exc(file=sys.stderr) - proc.wait() + for i in xrange(5): + # Get current subscriptions @ eth0 (default on PL), + # they should be considered "universal" suscriptions. + proc = subprocess.Popen(['ip','maddr','show', 'eth0'], + stdout = subprocess.PIPE, + stderr = subprocess.STDOUT, + stdin = devnull) + eth_maddr = set() + for line in proc.stdout: + match = maddr_re.match(line) + if match: + eth_maddr.add(match.group(1)) + proc.wait() + + if eth_maddr: + for maddr in eth_maddr: + try: + mirror_socket.setsockopt( + socket.IPPROTO_IP, + socket.IP_ADD_MEMBERSHIP, + socket.inet_aton(maddr)+vif_addr_i ) + except: + traceback.print_exc(file=sys.stderr) + new_maddr.update(eth_maddr) + break # Every now and then, send a full report now = time.time() @@ -223,6 +232,7 @@ class FWDThread(threading.Thread): enumerate_ = enumerate fwd_sockets = self.fwd_sockets npending = 0 + npendingpop = 0 noent = (None,None) verbose = options.verbose @@ -232,7 +242,13 @@ class FWDThread(threading.Thread): if pending and npending: packet = pending.pop() npending -= 1 + npendingpop += 1 + if npendingpop > 10: + # Don't hurry too much, + # we'll saturate the kernel's queue + time.sleep(0) else: + npendingpop = 0 packet = in_socket.recv(2000) except socket.timeout, e: if pending and not npending: @@ -284,8 +300,8 @@ class FWDThread(threading.Thread): print >>sys.stderr, socket.inet_ntoa(vif[4]), fwd_socket = fwd_sockets[vif[4]] fwd_socket.sendto(packet, 0, tgt_group) - except: - pass + except Exception,e: + print >>sys.stderr, "ERROR: forwarding packet:", str(e) if verbose: print >>sys.stderr, "." -- 2.45.2