From: Claudio-Daniel Freire Date: Fri, 7 Oct 2011 23:01:59 +0000 (-0300) Subject: Queued forwarding to the multicast forwarder - less packet loss like that X-Git-Tag: nepi-3.0.0~167 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=8d33ebfa0022bc10018745001c2e0dde20b9d157;p=nepi.git Queued forwarding to the multicast forwarder - less packet loss like that --- diff --git a/src/nepi/testbeds/planetlab/scripts/tun_connect.py b/src/nepi/testbeds/planetlab/scripts/tun_connect.py index d995aff6..747b715f 100644 --- a/src/nepi/testbeds/planetlab/scripts/tun_connect.py +++ b/src/nepi/testbeds/planetlab/scripts/tun_connect.py @@ -17,6 +17,7 @@ import functools import time import base64 import traceback +from Queue import Queue import tunchannel @@ -644,7 +645,7 @@ if options.multicast_fwd: mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) #disable nonblocking, cannot handle EWOULDBLOCK #tunchannel.nonblock(mcfwd_sock.fileno()) - mcfwd_sock.settimeout(0.1) # 100ms tops - packet lost if it blocks more than that + mcfwd_sock.settimeout(0.5) # 500ms tops - packet lost if it blocks more than that # be careful to roll back stuff on exceptions tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name) @@ -670,7 +671,7 @@ signal.signal(signal.SIGTERM, _finalize) try: tcpdump = None reconnect = None - mcastthread = None + mcfwd_thread = None # install multicast forwarding hook if options.multicast_fwd: @@ -685,18 +686,59 @@ try: def writev(fileno, *stuff): os_write(''.join(map_(str_,stuff))) - def accept_packet(packet, direction, - _up_accept=accept_packet, + + mcfwd_queue = Queue(options.vif_txqueuelen or 500) + def mcfwd_thread_fn( sock=mcfwd_sock, sockno=mcfwd_sock.fileno(), - etherProto=tunchannel.etherProto, - etherStrip=tunchannel.etherStrip, - etherMode=tun_name.startswith('tap'), multicast_fwd = options.multicast_fwd, vif_addr = socket.inet_aton(options.vif_addr), - connected = [], writev=writev, + writev=writev, retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR), len=len, ord=ord): + TERMINATE_ = TERMINATE + connected = False + + while not TERMINATE_: + try: + fwd = mcfwd_queue.get(True, 1) + except: + continue + + # Forward it + if not connected: + try: + sock.connect(multicast_fwd) + connected = True + except: + traceback.print_exc(file=sys.stderr) + if connected: + try: + writev(sockno, vif_addr,fwd) + except OSError,e: + if e.errno not in retrycodes: + traceback.print_exc(file=sys.stderr) + else: + try: + writev(sockno, vif_addr,fwd) + except: + traceback.print_exc(file=sys.stderr) + except socket.timeout: + # packet lost + continue + except: + traceback.print_exc(file=sys.stderr) + + mcfwd_queue.task_done() + mcfwd_thread = threading.Thread(target=mcfwd_thread) + mcfwd_thread.start() + + def accept_packet(packet, direction, + _up_accept=accept_packet, + etherProto=tunchannel.etherProto, + etherStrip=tunchannel.etherStrip, + etherMode=tun_name.startswith('tap'), + len=len, ord=ord): if _up_accept: rv = _up_accept(packet, direction) if not rv: @@ -713,21 +755,11 @@ try: fwd = packet if fwd is not None and len(fwd) >= 20: if (ord(fwd[16]) & 0xf0) == 0xe0: - # Forward it - if not connected: - try: - sock.connect(multicast_fwd) - connected.append(None) - except: - traceback.print_exc(file=sys.stderr) - if connected: - try: - writev(sockno, vif_addr,fwd) - except OSError,e: - if e.errno not in retrycodes: - traceback.print_exc(file=sys.stderr) - except: - traceback.print_exc(file=sys.stderr) + # Queue for forwarding + try: + mcfwd_queue.put_nowait(fwd) + except: + print >>sys.stderr, "Multicast packet dropped, forwarder queue full" return 1 @@ -892,12 +924,12 @@ finally: # tidy shutdown in every case - swallow exceptions TERMINATE.append(None) - if mcastthread: + if mcfwd_thread: try: - mcastthread.stop() + mcfwd_thread.join() except: pass - + if filter_thread: try: filter_thread.join()