17 usage = "usage: %prog [options] <enabled-addresses>"
19 parser = optparse.OptionParser(usage=usage)
22 "-d", "--poll-delay", dest="poll_delay", metavar="SECONDS", type="float",
24 help = "Multicast subscription polling interval")
26 "-D", "--refresh-delay", dest="refresh_delay", metavar="SECONDS", type="float",
28 help = "Full-refresh interval - time between full IGMP reports")
30 "-p", "--fwd-path", dest="fwd_path", metavar="PATH",
31 default = "/var/run/mcastfwd",
32 help = "Path of the unix socket in which the program will listen for packets")
34 "-r", "--router-path", dest="mrt_path", metavar="PATH",
35 default = "/var/run/mcastrt",
36 help = "Path of the unix socket in which the program will listen for routing changes")
38 "-A", "--announce-only", dest="announce_only", action="store_true",
40 help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.")
42 (options, remaining_args) = parser.parse_args(sys.argv[1:])
44 ETH_P_ALL = 0x00000003
46 TUNSETIFF = 0x400454ca
47 IFF_NO_PI = 0x00001000
50 IFF_VNET_HDR = 0x00004000
51 TUN_PKT_STRIP = 0x00000001
52 IFHWADDRLEN = 0x00000006
57 class IGMPThread(threading.Thread):
58 def __init__(self, vif_addr, *p, **kw):
59 super(IGMPThread, self).__init__(*p, **kw)
61 vif_addr = vif_addr.strip()
62 self.vif_addr = vif_addr
63 self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
64 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
65 socket.inet_aton(self.vif_addr) )
66 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
67 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
72 proc = subprocess.Popen(['ip','addr','show'],
73 stdout = subprocess.PIPE,
74 stderr = subprocess.STDOUT,
75 stdin = open('/dev/null','r+b') )
77 heading = re.compile(r"\d+:\s*(\w+):.*")
78 addr = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}).*")
79 for line in proc.stdout:
80 match = heading.match(line)
82 tun_name = match.group(1)
84 match = addr.match(line)
85 if match and match.group(1) == vif_addr:
86 self.tun_name = tun_name
89 raise RuntimeError, "Could not find iterface for", vif_addr
92 devnull = open('/dev/null','r+b')
93 maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
95 lastfullrefresh = time.time()
97 # Get current subscriptions
98 proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
99 stdout = subprocess.PIPE,
100 stderr = subprocess.STDOUT,
103 for line in proc.stdout:
104 match = maddr_re.match(line)
106 new_maddr.add(match.group(1))
109 # Every now and then, send a full report
111 report_new = new_maddr
112 if (now - lastfullrefresh) <= options.refresh_delay:
113 report_new = report_new - cur_maddr
115 lastfullrefresh = now
117 # Report subscriptions
118 for grp in report_new:
119 print >>sys.stderr, "JOINING", grp
120 igmpp = ipaddr2.ipigmp(
121 self.vif_addr, '224.0.0.2', 1, 0x16, 0, grp,
124 self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
126 traceback.print_exc(file=sys.stderr)
129 for grp in cur_maddr - new_maddr:
130 print >>sys.stderr, "LEAVING", grp
131 igmpp = ipaddr2.ipigmp(
132 self.vif_addr, '224.0.0.2', 1, 0x17, 0, grp,
135 self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
137 traceback.print_exc(file=sys.stderr)
139 cur_maddr = new_maddr
141 time.sleep(options.poll_delay)
145 self.join(1+5*options.poll_delay)
148 class FWDThread(threading.Thread):
149 def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
150 super(FWDThread, self).__init__(*p, **kw)
152 self.in_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
153 self.in_socket.bind(options.fwd_path)
155 self.pending = collections.deque()
156 self.maxpending = 1000
157 self.rt_cache = rt_cache
158 self.router_socket = router_socket
160 self.fwd_sockets = {}
161 for fwd_target in remaining_args:
162 fwd_target = socket.inet_aton(fwd_target)
163 fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
164 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
165 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target)
166 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
167 self.fwd_sockets[fwd_target] = fwd_socket
173 in_socket = self.in_socket
174 rt_cache = self.rt_cache
176 router_socket = self.router_socket
180 pending = self.pending
181 in_socket.settimeout(options.poll_delay)
183 enumerate_ = enumerate
184 fwd_sockets = self.fwd_sockets
187 while not self._stop:
190 if pending and npending:
191 packet = pending.pop()
194 packet = in_socket.recv(2000)
195 except socket.timeout, e:
196 if pending and not npending:
197 npending = len_(pending)
199 if not packet or len_(packet) < 24:
203 parent = buffer_(packet,0,4)
204 packet = buffer_(packet,4)
206 if packet[9] == '\x02':
207 # IGMP packet? It's for mrouted
208 self.router_socket.send(packet)
209 elif packet[9] == '\x00':
210 # LOOPING packet, discard
216 addrinfo = buffer_(packet,12,8)
217 fwd_targets = rt_cache.get(parent+addrinfo)
218 if fwd_targets is None:
219 fwd_targets = rt_cache.get('\x00\x00\x00\x00'+str_(addrinfo))
221 if fwd_targets is not None:
223 ttl = ord_(packet[8])
224 tgt_group = (addrinfo[4:],0)
225 print >>sys.stderr, socket.inet_ntoa(tgt_group[0]), "->",
226 for vifi, ttl in enumerate_(fwd_targets):
227 ttl_thresh = ord_(ttl)
228 if ttl_thresh > 0 and ttl > ttl_thresh and vifi in vifs:
230 if vifi[4] in fwd_sockets:
231 print >>sys.stderr, socket.inet_ntoa(vifi[4]),
232 fwd_socket = fwd_sockets[vifi[4]]
233 fwd_socket.sendto(packet, 0, tgt_group)
234 print >>sys.stderr, "."
237 if len_(pending) < self.maxpending:
238 tgt_group = addrinfo[4:]
239 print >>sys.stderr, socket.inet_ntoa(tgt_group), "-> ?"
241 pending.append(fullpacket)
243 # Notify mrouted by forwarding it with protocol 0
244 router_socket.send(''.join(
245 (packet[:9],'\x00',packet[10:]) ))
249 self.join(1+5*options.poll_delay)
252 class RouterThread(threading.Thread):
253 def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
254 super(RouterThread, self).__init__(*p, **kw)
256 self.rt_cache = rt_cache
258 self.router_socket = router_socket
264 rt_cache = self.rt_cache
267 router_socket = self.router_socket
268 router_socket.settimeout(options.poll_delay)
275 MRT_ADD_VIF = MRT_BASE+2 # Add a virtual interface
276 MRT_DEL_VIF = MRT_BASE+3 # Delete a virtual interface
277 MRT_ADD_MFC = MRT_BASE+4 # Add a multicast forwarding entry
278 MRT_DEL_MFC = MRT_BASE+5 # Delete a multicast forwarding entry
280 def cmdhdr(cmd, unpack=struct.unpack, buffer=buffer):
281 op,dlen = unpack('II', buffer(cmd,0,8))
284 def vifctl(data, unpack=struct.unpack):
285 #vifi, flags,threshold,rate_limit,lcl_addr,rmt_addr = unpack('HBBI4s4s', data)
286 return unpack('HBBI4s4s', data)
287 def mfcctl(data, unpack=struct.unpack):
288 #origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = unpack('4s4sH10sIIIi', data)
289 return unpack('4s4sH32sIIIi', data)
295 addr_vifs[vifi[4]] = vifi[0]
296 print >>sys.stderr, "Added VIF", vifi
300 del addr_vifs[vifi[4]]
302 print >>sys.stderr, "Removed VIF", vifi
304 origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
306 parent_addr = vifs[parent][4]
308 parent_addr = '\x00\x00\x00\x00'
309 addrinfo = ''.join((parent_addr,origin,mcastgrp))
310 rt_cache[addrinfo] = ttls
311 print >>sys.stderr, "Added RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp)))
313 origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
315 parent_addr = vifs[parent][4]
317 parent_addr = '\x00\x00\x00\x00'
318 addrinfo = ''.join((parent_addr,origin,mcastgrp))
319 del rt_cache[addrinfo]
320 print >>sys.stderr, "Removed RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp)))
323 MRT_ADD_VIF : add_vif,
324 MRT_DEL_VIF : del_vif,
325 MRT_ADD_MFC : add_mfc,
326 MRT_DEL_MFC : del_mfc,
329 while not self._stop:
330 if len_(buf) < 8 or len_(buf) < (cmdhdr(buf)[1]+8):
333 cmd = router_socket.recv(2000)
334 except socket.timeout, e:
337 print >>sys.stderr, "PLRT CONNECTION BROKEN"
338 TERMINATE.append(None)
348 op,dlen,data = cmdhdr(cmd)
349 if len_(data) < dlen:
352 buf = buffer_(data, dlen)
353 data = buffer_(data, 0, dlen)
355 print >>sys.stderr, "COMMAND", op, "DATA", dlen
361 traceback.print_exc(file=sys.stderr)
363 print >>sys.stderr, "IGNORING UNKNOWN COMMAND", op
367 self.join(1+5*options.poll_delay)
372 for vif_addr in remaining_args:
373 igmp_threads.append(IGMPThread(vif_addr))
380 def _finalize(sig,frame):
382 TERMINATE.append(None)
383 signal.signal(signal.SIGTERM, _finalize)
387 if not options.announce_only:
388 router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
389 router_socket.bind(options.mrt_path)
390 router_socket.listen(0)
391 router_remote_socket, router_remote_addr = router_socket.accept()
393 fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs)
394 router_thread = RouterThread(rt_cache, router_remote_socket, vifs)
396 for thread in igmp_threads:
399 if not options.announce_only:
401 router_thread.start()
406 if os.path.exists(options.mrt_path):
408 os.remove(options.mrt_path)
411 if os.path.exists(options.fwd_path):
413 os.remove(options.fwd_path)