import sys import signal import socket import struct import optparse import threading import subprocess import re import time import collections import os import traceback import ipaddr2 usage = "usage: %prog [options] " parser = optparse.OptionParser(usage=usage) parser.add_option( "-d", "--poll-delay", dest="poll_delay", metavar="SECONDS", type="float", default = 1.0, help = "Multicast subscription polling interval") parser.add_option( "-D", "--refresh-delay", dest="refresh_delay", metavar="SECONDS", type="float", default = 30.0, help = "Full-refresh interval - time between full IGMP reports") parser.add_option( "-p", "--fwd-path", dest="fwd_path", metavar="PATH", default = "/var/run/mcastfwd", help = "Path of the unix socket in which the program will listen for packets") parser.add_option( "-r", "--router-path", dest="mrt_path", metavar="PATH", default = "/var/run/mcastrt", help = "Path of the unix socket in which the program will listen for routing changes") parser.add_option( "-A", "--announce-only", dest="announce_only", action="store_true", default = False, help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.") (options, remaining_args) = parser.parse_args(sys.argv[1:]) ETH_P_ALL = 0x00000003 ETH_P_IP = 0x00000800 TUNSETIFF = 0x400454ca IFF_NO_PI = 0x00001000 IFF_TAP = 0x00000002 IFF_TUN = 0x00000001 IFF_VNET_HDR = 0x00004000 TUN_PKT_STRIP = 0x00000001 IFHWADDRLEN = 0x00000006 IFNAMSIZ = 0x00000010 IFREQ_SZ = 0x00000028 FIONREAD = 0x0000541b class IGMPThread(threading.Thread): def __init__(self, vif_addr, *p, **kw): super(IGMPThread, self).__init__(*p, **kw) vif_addr = vif_addr.strip() self.vif_addr = vif_addr self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP) self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.vif_addr) ) self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) self._stop = False self.setDaemon(True) # Find tun name proc = subprocess.Popen(['ip','addr','show'], stdout = subprocess.PIPE, stderr = subprocess.STDOUT, stdin = open('/dev/null','r+b') ) tun_name = None heading = re.compile(r"\d+:\s*(\w+):.*") addr = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}).*") for line in proc.stdout: match = heading.match(line) if match: tun_name = match.group(1) else: match = addr.match(line) if match and match.group(1) == vif_addr: self.tun_name = tun_name break else: raise RuntimeError, "Could not find iterface for", vif_addr def run(self): devnull = open('/dev/null','r+b') maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*") cur_maddr = set() lastfullrefresh = time.time() while not self._stop: # Get current subscriptions proc = subprocess.Popen(['ip','maddr','show',self.tun_name], stdout = subprocess.PIPE, stderr = subprocess.STDOUT, stdin = devnull) new_maddr = set() for line in proc.stdout: match = maddr_re.match(line) if match: new_maddr.add(match.group(1)) proc.wait() # Every now and then, send a full report now = time.time() report_new = new_maddr if (now - lastfullrefresh) <= options.refresh_delay: report_new = report_new - cur_maddr else: lastfullrefresh = now # Report subscriptions for grp in report_new: print >>sys.stderr, "JOINING", grp igmpp = ipaddr2.ipigmp( self.vif_addr, '224.0.0.2', 1, 0x16, 0, grp, noipcksum=True) try: self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0)) except: traceback.print_exc(file=sys.stderr) # Notify group leave for grp in cur_maddr - new_maddr: print >>sys.stderr, "LEAVING", grp igmpp = ipaddr2.ipigmp( self.vif_addr, '224.0.0.2', 1, 0x17, 0, grp, noipcksum=True) try: self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0)) except: traceback.print_exc(file=sys.stderr) cur_maddr = new_maddr time.sleep(options.poll_delay) def stop(self): self._stop = True self.join(1+5*options.poll_delay) class FWDThread(threading.Thread): def __init__(self, rt_cache, router_socket, vifs, *p, **kw): super(FWDThread, self).__init__(*p, **kw) self.in_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) self.in_socket.bind(options.fwd_path) self.pending = collections.deque() self.maxpending = 1000 self.rt_cache = rt_cache self.router_socket = router_socket self.vifs = vifs self.fwd_sockets = {} for fwd_target in remaining_args: fwd_target = socket.inet_aton(fwd_target) fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target) fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) self.fwd_sockets[fwd_target] = fwd_socket self._stop = False self.setDaemon(True) def run(self): in_socket = self.in_socket rt_cache = self.rt_cache vifs = self.vifs router_socket = self.router_socket len_ = len ord_ = ord str_ = str pending = self.pending in_socket.settimeout(options.poll_delay) buffer_ = buffer enumerate_ = enumerate fwd_sockets = self.fwd_sockets npending = 0 while not self._stop: # Get packet try: if pending and npending: packet = pending.pop() npending -= 1 else: packet = in_socket.recv(2000) except socket.timeout, e: if pending and not npending: npending = len_(pending) continue if not packet or len_(packet) < 24: continue fullpacket = packet parent = buffer_(packet,0,4) packet = buffer_(packet,4) if packet[9] == '\x02': # IGMP packet? It's for mrouted self.router_socket.send(packet) elif packet[9] == '\x00': # LOOPING packet, discard continue # To-Do: PIM asserts # Get route addrinfo = buffer_(packet,12,8) fwd_targets = rt_cache.get(parent+addrinfo) if fwd_targets is None: fwd_targets = rt_cache.get('\x00\x00\x00\x00'+str_(addrinfo)) if fwd_targets is not None: # Forward ttl = ord_(packet[8]) tgt_group = (addrinfo[4:],0) print >>sys.stderr, socket.inet_ntoa(tgt_group[0]), "->", for vifi, ttl in enumerate_(fwd_targets): ttl_thresh = ord_(ttl) if ttl_thresh > 0 and ttl > ttl_thresh and vifi in vifs: vifi = vifs[vifi] if vifi[4] in fwd_sockets: print >>sys.stderr, socket.inet_ntoa(vifi[4]), fwd_socket = fwd_sockets[vifi[4]] fwd_socket.sendto(packet, 0, tgt_group) print >>sys.stderr, "." else: # Mark pending if len_(pending) < self.maxpending: tgt_group = addrinfo[4:] print >>sys.stderr, socket.inet_ntoa(tgt_group), "-> ?" pending.append(fullpacket) # Notify mrouted by forwarding it with protocol 0 router_socket.send(''.join( (packet[:9],'\x00',packet[10:]) )) def stop(self): self._stop = True self.join(1+5*options.poll_delay) class RouterThread(threading.Thread): def __init__(self, rt_cache, router_socket, vifs, *p, **kw): super(RouterThread, self).__init__(*p, **kw) self.rt_cache = rt_cache self.vifs = vifs self.router_socket = router_socket self._stop = False self.setDaemon(True) def run(self): rt_cache = self.rt_cache vifs = self.vifs addr_vifs = {} router_socket = self.router_socket router_socket.settimeout(options.poll_delay) len_ = len buffer_ = buffer buf = "" MRT_BASE = 200 MRT_ADD_VIF = MRT_BASE+2 # Add a virtual interface MRT_DEL_VIF = MRT_BASE+3 # Delete a virtual interface MRT_ADD_MFC = MRT_BASE+4 # Add a multicast forwarding entry MRT_DEL_MFC = MRT_BASE+5 # Delete a multicast forwarding entry def cmdhdr(cmd, unpack=struct.unpack, buffer=buffer): op,dlen = unpack('II', buffer(cmd,0,8)) cmd = buffer(cmd,8) return op,dlen,cmd def vifctl(data, unpack=struct.unpack): #vifi, flags,threshold,rate_limit,lcl_addr,rmt_addr = unpack('HBBI4s4s', data) return unpack('HBBI4s4s', data) def mfcctl(data, unpack=struct.unpack): #origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = unpack('4s4sH10sIIIi', data) return unpack('4s4sH32sIIIi', data) def add_vif(cmd): vifi = vifctl(cmd) vifs[vifi[0]] = vifi addr_vifs[vifi[4]] = vifi[0] print >>sys.stderr, "Added VIF", vifi def del_vif(cmd): vifi = vifctl(cmd) vifi = vifs[vifi[0]] del addr_vifs[vifi[4]] del vifs[vifi[0]] print >>sys.stderr, "Removed VIF", vifi def add_mfc(cmd): origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data) if parent in vifs: parent_addr = vifs[parent][4] else: parent_addr = '\x00\x00\x00\x00' addrinfo = ''.join((parent_addr,origin,mcastgrp)) rt_cache[addrinfo] = ttls print >>sys.stderr, "Added RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))) def del_mfc(cmd): origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data) if parent in vifs: parent_addr = vifs[parent][4] else: parent_addr = '\x00\x00\x00\x00' addrinfo = ''.join((parent_addr,origin,mcastgrp)) del rt_cache[addrinfo] print >>sys.stderr, "Removed RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))) commands = { MRT_ADD_VIF : add_vif, MRT_DEL_VIF : del_vif, MRT_ADD_MFC : add_mfc, MRT_DEL_MFC : del_mfc, } while not self._stop: if len_(buf) < 8 or len_(buf) < (cmdhdr(buf)[1]+8): # Get cmd try: cmd = router_socket.recv(2000) except socket.timeout, e: continue if not cmd: print >>sys.stderr, "PLRT CONNECTION BROKEN" TERMINATE.append(None) break if buf: buf += cmd cmd = buf if len_(cmd) < 8: continue op,dlen,data = cmdhdr(cmd) if len_(data) < dlen: continue buf = buffer_(data, dlen) data = buffer_(data, 0, dlen) print >>sys.stderr, "COMMAND", op, "DATA", dlen if op in commands: try: commands[op](data) except: traceback.print_exc(file=sys.stderr) else: print >>sys.stderr, "IGNORING UNKNOWN COMMAND", op def stop(self): self._stop = True self.join(1+5*options.poll_delay) igmp_threads = [] for vif_addr in remaining_args: igmp_threads.append(IGMPThread(vif_addr)) rt_cache = {} vifs = {} TERMINATE = [] TERMINATE = [] def _finalize(sig,frame): global TERMINATE TERMINATE.append(None) signal.signal(signal.SIGTERM, _finalize) try: if not options.announce_only: router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) router_socket.bind(options.mrt_path) router_socket.listen(0) router_remote_socket, router_remote_addr = router_socket.accept() fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs) router_thread = RouterThread(rt_cache, router_remote_socket, vifs) for thread in igmp_threads: thread.start() if not options.announce_only: fwd_thread.start() router_thread.start() while not TERMINATE: time.sleep(30) finally: if os.path.exists(options.mrt_path): try: os.remove(options.mrt_path) except: pass if os.path.exists(options.fwd_path): try: os.remove(options.fwd_path) except: pass