Queued forwarding to the multicast forwarder - less packet loss like that
[nepi.git] / src / nepi / testbeds / planetlab / scripts / tun_connect.py
index d995aff..747b715 100644 (file)
@@ -17,6 +17,7 @@ import functools
 import time
 import base64
 import traceback
+from Queue import Queue
 
 import tunchannel
 
@@ -644,7 +645,7 @@ if options.multicast_fwd:
     mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
     #disable nonblocking, cannot handle EWOULDBLOCK
     #tunchannel.nonblock(mcfwd_sock.fileno())
-    mcfwd_sock.settimeout(0.1) # 100ms tops - packet lost if it blocks more than that
+    mcfwd_sock.settimeout(0.5) # 500ms tops - packet lost if it blocks more than that
 
 # be careful to roll back stuff on exceptions
 tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
@@ -670,7 +671,7 @@ signal.signal(signal.SIGTERM, _finalize)
 try:
     tcpdump = None
     reconnect = None
-    mcastthread = None
+    mcfwd_thread = None
 
     # install multicast forwarding hook
     if options.multicast_fwd:
@@ -685,18 +686,59 @@ try:
             def writev(fileno, *stuff):
                 os_write(''.join(map_(str_,stuff)))
         
-        def accept_packet(packet, direction, 
-                _up_accept=accept_packet, 
+        
+        mcfwd_queue = Queue(options.vif_txqueuelen or 500)
+        def mcfwd_thread_fn(
                 sock=mcfwd_sock, 
                 sockno=mcfwd_sock.fileno(),
-                etherProto=tunchannel.etherProto,
-                etherStrip=tunchannel.etherStrip,
-                etherMode=tun_name.startswith('tap'),
                 multicast_fwd = options.multicast_fwd,
                 vif_addr = socket.inet_aton(options.vif_addr),
-                connected = [], writev=writev,
+                writev=writev,
                 retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR),
                 len=len, ord=ord):
+            TERMINATE_ = TERMINATE
+            connected = False
+            
+            while not TERMINATE_:
+                try:
+                    fwd = mcfwd_queue.get(True, 1)
+                except:
+                    continue
+                
+                # Forward it
+                if not connected:
+                    try:
+                        sock.connect(multicast_fwd)
+                        connected = True
+                    except:
+                        traceback.print_exc(file=sys.stderr)
+                if connected:
+                    try:
+                        writev(sockno, vif_addr,fwd)
+                    except OSError,e:
+                        if e.errno not in retrycodes:
+                            traceback.print_exc(file=sys.stderr)
+                        else:
+                            try:
+                                writev(sockno, vif_addr,fwd)
+                            except:
+                                traceback.print_exc(file=sys.stderr)
+                    except socket.timeout:
+                        # packet lost
+                        continue
+                    except:
+                        traceback.print_exc(file=sys.stderr)
+                
+                mcfwd_queue.task_done()
+        mcfwd_thread = threading.Thread(target=mcfwd_thread)
+        mcfwd_thread.start()
+        
+        def accept_packet(packet, direction, 
+                _up_accept=accept_packet, 
+                etherProto=tunchannel.etherProto,
+                etherStrip=tunchannel.etherStrip,
+                etherMode=tun_name.startswith('tap'),
+                len=len, ord=ord):
             if _up_accept:
                 rv = _up_accept(packet, direction)
                 if not rv:
@@ -713,21 +755,11 @@ try:
                     fwd = packet
                 if fwd is not None and len(fwd) >= 20:
                     if (ord(fwd[16]) & 0xf0) == 0xe0:
-                        # Forward it
-                        if not connected:
-                            try:
-                                sock.connect(multicast_fwd)
-                                connected.append(None)
-                            except:
-                                traceback.print_exc(file=sys.stderr)
-                        if connected:
-                            try:
-                                writev(sockno, vif_addr,fwd)
-                            except OSError,e:
-                                if e.errno not in retrycodes:
-                                    traceback.print_exc(file=sys.stderr)
-                            except:
-                                traceback.print_exc(file=sys.stderr)
+                        # Queue for forwarding
+                        try:
+                            mcfwd_queue.put_nowait(fwd)
+                        except:
+                            print >>sys.stderr, "Multicast packet dropped, forwarder queue full"
             return 1
 
     
@@ -892,12 +924,12 @@ finally:
     # tidy shutdown in every case - swallow exceptions
     TERMINATE.append(None)
     
-    if mcastthread:
+    if mcfwd_thread:
         try:
-            mcastthread.stop()
+            mcfwd_thread.join()
         except:
             pass
-    
+
     if filter_thread:
         try:
             filter_thread.join()