import time
import base64
import traceback
+from Queue import Queue
import tunchannel
mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
#disable nonblocking, cannot handle EWOULDBLOCK
#tunchannel.nonblock(mcfwd_sock.fileno())
- mcfwd_sock.settimeout(0.1) # 100ms tops - packet lost if it blocks more than that
+ mcfwd_sock.settimeout(0.5) # 500ms tops - packet lost if it blocks more than that
# be careful to roll back stuff on exceptions
tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
try:
tcpdump = None
reconnect = None
- mcastthread = None
+ mcfwd_thread = None
# install multicast forwarding hook
if options.multicast_fwd:
def writev(fileno, *stuff):
os_write(''.join(map_(str_,stuff)))
- def accept_packet(packet, direction,
- _up_accept=accept_packet,
+
+ mcfwd_queue = Queue(options.vif_txqueuelen or 500)
+ def mcfwd_thread_fn(
sock=mcfwd_sock,
sockno=mcfwd_sock.fileno(),
- etherProto=tunchannel.etherProto,
- etherStrip=tunchannel.etherStrip,
- etherMode=tun_name.startswith('tap'),
multicast_fwd = options.multicast_fwd,
vif_addr = socket.inet_aton(options.vif_addr),
- connected = [], writev=writev,
+ writev=writev,
retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR),
len=len, ord=ord):
+ TERMINATE_ = TERMINATE
+ connected = False
+
+ while not TERMINATE_:
+ try:
+ fwd = mcfwd_queue.get(True, 1)
+ except:
+ continue
+
+ # Forward it
+ if not connected:
+ try:
+ sock.connect(multicast_fwd)
+ connected = True
+ except:
+ traceback.print_exc(file=sys.stderr)
+ if connected:
+ try:
+ writev(sockno, vif_addr,fwd)
+ except OSError,e:
+ if e.errno not in retrycodes:
+ traceback.print_exc(file=sys.stderr)
+ else:
+ try:
+ writev(sockno, vif_addr,fwd)
+ except:
+ traceback.print_exc(file=sys.stderr)
+ except socket.timeout:
+ # packet lost
+ continue
+ except:
+ traceback.print_exc(file=sys.stderr)
+
+ mcfwd_queue.task_done()
+ mcfwd_thread = threading.Thread(target=mcfwd_thread)
+ mcfwd_thread.start()
+
+ def accept_packet(packet, direction,
+ _up_accept=accept_packet,
+ etherProto=tunchannel.etherProto,
+ etherStrip=tunchannel.etherStrip,
+ etherMode=tun_name.startswith('tap'),
+ len=len, ord=ord):
if _up_accept:
rv = _up_accept(packet, direction)
if not rv:
fwd = packet
if fwd is not None and len(fwd) >= 20:
if (ord(fwd[16]) & 0xf0) == 0xe0:
- # Forward it
- if not connected:
- try:
- sock.connect(multicast_fwd)
- connected.append(None)
- except:
- traceback.print_exc(file=sys.stderr)
- if connected:
- try:
- writev(sockno, vif_addr,fwd)
- except OSError,e:
- if e.errno not in retrycodes:
- traceback.print_exc(file=sys.stderr)
- except:
- traceback.print_exc(file=sys.stderr)
+ # Queue for forwarding
+ try:
+ mcfwd_queue.put_nowait(fwd)
+ except:
+ print >>sys.stderr, "Multicast packet dropped, forwarder queue full"
return 1
# tidy shutdown in every case - swallow exceptions
TERMINATE.append(None)
- if mcastthread:
+ if mcfwd_thread:
try:
- mcastthread.stop()
+ mcfwd_thread.join()
except:
pass
-
+
if filter_thread:
try:
filter_thread.join()