18 usage = "usage: %prog [options] <enabled-addresses>"
20 parser = optparse.OptionParser(usage=usage)
23 "-d", "--poll-delay", dest="poll_delay", metavar="SECONDS", type="float",
25 help = "Multicast subscription polling interval")
27 "-D", "--refresh-delay", dest="refresh_delay", metavar="SECONDS", type="float",
29 help = "Full-refresh interval - time between full IGMP reports")
31 "-p", "--fwd-path", dest="fwd_path", metavar="PATH",
32 default = "/var/run/mcastfwd",
33 help = "Path of the unix socket in which the program will listen for packets")
35 "-r", "--router-path", dest="mrt_path", metavar="PATH",
36 default = "/var/run/mcastrt",
37 help = "Path of the unix socket in which the program will listen for routing changes")
39 "-A", "--announce-only", dest="announce_only", action="store_true",
41 help = "If given, only group membership announcements will be made. Useful for non-router multicast nodes.")
43 "-v", "--verbose", dest="verbose", action="store_true",
45 help = "Path of the unix socket in which the program will listen for routing changes")
47 (options, remaining_args) = parser.parse_args(sys.argv[1:])
51 level=logging.DEBUG if options.verbose else logging.WARNING)
53 ETH_P_ALL = 0x00000003
55 TUNSETIFF = 0x400454ca
56 IFF_NO_PI = 0x00001000
59 IFF_VNET_HDR = 0x00004000
60 TUN_PKT_STRIP = 0x00000001
61 IFHWADDRLEN = 0x00000006
66 class IGMPThread(threading.Thread):
67 def __init__(self, vif_addr, *p, **kw):
68 super(IGMPThread, self).__init__(*p, **kw)
70 vif_addr = vif_addr.strip()
71 self.vif_addr = vif_addr
72 self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
73 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
74 socket.inet_aton(self.vif_addr) )
75 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
76 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
81 proc = subprocess.Popen(['ip','addr','show'],
82 stdout = subprocess.PIPE,
83 stderr = subprocess.STDOUT,
84 stdin = open('/dev/null','r+b') )
86 heading = re.compile(r"\d+:\s*([-a-zA-Z0-9_]+):.*")
87 addr = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}).*")
88 for line in proc.stdout:
89 match = heading.match(line)
91 tun_name = match.group(1)
93 match = addr.match(line)
94 if match and match.group(1) == vif_addr:
95 self.tun_name = tun_name
98 raise RuntimeError, "Could not find iterface for", vif_addr
101 devnull = open('/dev/null','r+b')
102 maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
104 lastfullrefresh = time.time()
105 while not self._stop:
106 # Get current subscriptions
107 proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
108 stdout = subprocess.PIPE,
109 stderr = subprocess.STDOUT,
112 for line in proc.stdout:
113 match = maddr_re.match(line)
115 new_maddr.add(match.group(1))
118 # Every now and then, send a full report
120 report_new = new_maddr
121 if (now - lastfullrefresh) <= options.refresh_delay:
122 report_new = report_new - cur_maddr
124 lastfullrefresh = now
126 # Report subscriptions
127 for grp in report_new:
128 print >>sys.stderr, "JOINING", grp
129 igmpp = ipaddr2.ipigmp(
130 self.vif_addr, grp, 1, 0x16, 0, grp,
133 self.igmp_socket.sendto(igmpp, 0, (grp,0))
135 traceback.print_exc(file=sys.stderr)
138 for grp in cur_maddr - new_maddr:
139 print >>sys.stderr, "LEAVING", grp
140 igmpp = ipaddr2.ipigmp(
141 self.vif_addr, '224.0.0.2', 1, 0x17, 0, grp,
144 self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
146 traceback.print_exc(file=sys.stderr)
148 cur_maddr = new_maddr
150 time.sleep(options.poll_delay)
154 self.join(1+5*options.poll_delay)
157 class FWDThread(threading.Thread):
158 def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
159 super(FWDThread, self).__init__(*p, **kw)
161 self.in_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
162 self.in_socket.bind(options.fwd_path)
164 self.pending = collections.deque()
165 self.maxpending = 1000
166 self.rt_cache = rt_cache
167 self.router_socket = router_socket
169 self.fwd_sockets = {}
170 for fwd_target in remaining_args:
171 fwd_target = socket.inet_aton(fwd_target)
172 fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
173 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
174 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target)
175 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
176 self.fwd_sockets[fwd_target] = fwd_socket
182 in_socket = self.in_socket
183 rt_cache = self.rt_cache
185 router_socket = self.router_socket
189 pending = self.pending
190 in_socket.settimeout(options.poll_delay)
192 enumerate_ = enumerate
193 fwd_sockets = self.fwd_sockets
197 while not self._stop:
200 if pending and npending:
201 packet = pending.pop()
204 packet = in_socket.recv(2000)
205 except socket.timeout, e:
206 if pending and not npending:
207 npending = len_(pending)
209 if not packet or len_(packet) < 24:
214 packet = buffer_(packet,4)
216 if packet[9] == '\x02':
217 # IGMP packet? It's for mrouted
218 self.router_socket.send(packet)
219 elif packet[9] == '\x00':
220 # LOOPING packet, discard
226 addrinfo = packet[12:20]
227 fwd_targets, rparent = rt_cache.get(addrinfo, noent)
229 if fwd_targets is not None and (rparent == '\x00\x00\x00\x00' or rparent == parent):
231 ttl = ord_(packet[8])
232 tgt_group = (socket.inet_ntoa(addrinfo[4:]),0)
233 print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl,
234 nfwd_targets = len_(fwd_targets)
235 for vifi, vif in vifs.iteritems():
236 if vifi < nfwd_targets:
237 ttl_thresh = ord_(fwd_targets[vifi])
238 if ttl_thresh > 0 and ttl > ttl_thresh:
239 if vif[4] in fwd_sockets:
240 print >>sys.stderr, socket.inet_ntoa(vif[4]),
241 fwd_socket = fwd_sockets[vif[4]]
242 fwd_socket.sendto(packet, 0, tgt_group)
243 print >>sys.stderr, "."
246 if len_(pending) < self.maxpending:
247 tgt_group = addrinfo[4:]
248 print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ?"
250 pending.append(fullpacket)
252 # Notify mrouted by forwarding it with protocol 0
253 router_socket.send(''.join(
254 (packet[:9],'\x00',packet[10:]) ))
258 self.join(1+5*options.poll_delay)
261 class RouterThread(threading.Thread):
262 def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
263 super(RouterThread, self).__init__(*p, **kw)
265 self.rt_cache = rt_cache
267 self.router_socket = router_socket
273 rt_cache = self.rt_cache
276 router_socket = self.router_socket
277 router_socket.settimeout(options.poll_delay)
284 MRT_ADD_VIF = MRT_BASE+2 # Add a virtual interface
285 MRT_DEL_VIF = MRT_BASE+3 # Delete a virtual interface
286 MRT_ADD_MFC = MRT_BASE+4 # Add a multicast forwarding entry
287 MRT_DEL_MFC = MRT_BASE+5 # Delete a multicast forwarding entry
289 def cmdhdr(cmd, unpack=struct.unpack, buffer=buffer):
290 op,dlen = unpack('II', buffer(cmd,0,8))
293 def vifctl(data, unpack=struct.unpack):
294 #vifi, flags,threshold,rate_limit,lcl_addr,rmt_addr = unpack('HBBI4s4s', data)
295 return unpack('HBBI4s4s', data)
296 def mfcctl(data, unpack=struct.unpack):
297 #origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = unpack('4s4sH10sIIIi', data)
298 return unpack('4s4sH32sIIIi', data)
304 addr_vifs[vifi[4]] = vifi[0]
305 print >>sys.stderr, "Added VIF", vifi
309 del addr_vifs[vifi[4]]
311 print >>sys.stderr, "Removed VIF", vifi
313 origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
315 parent_addr = vifs[parent][4]
317 parent_addr = '\x00\x00\x00\x00'
318 addrinfo = origin + mcastgrp
319 rt_cache[addrinfo] = (ttls, parent_addr)
320 print >>sys.stderr, "Added RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))), map(ord,ttls)
322 origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
324 parent_addr = vifs[parent][4]
326 parent_addr = '\x00\x00\x00\x00'
327 addrinfo = origin + mcastgrp
328 del rt_cache[addrinfo]
329 print >>sys.stderr, "Removed RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp)))
332 MRT_ADD_VIF : add_vif,
333 MRT_DEL_VIF : del_vif,
334 MRT_ADD_MFC : add_mfc,
335 MRT_DEL_MFC : del_mfc,
338 while not self._stop:
339 if len_(buf) < 8 or len_(buf) < (cmdhdr(buf)[1]+8):
342 cmd = router_socket.recv(2000)
343 except socket.timeout, e:
346 print >>sys.stderr, "PLRT CONNECTION BROKEN"
347 TERMINATE.append(None)
357 op,dlen,data = cmdhdr(cmd)
358 if len_(data) < dlen:
361 buf = buffer_(data, dlen)
362 data = buffer_(data, 0, dlen)
364 print >>sys.stderr, "COMMAND", op, "DATA", dlen
370 traceback.print_exc(file=sys.stderr)
372 print >>sys.stderr, "IGNORING UNKNOWN COMMAND", op
376 self.join(1+5*options.poll_delay)
381 for vif_addr in remaining_args:
382 igmp_threads.append(IGMPThread(vif_addr))
389 def _finalize(sig,frame):
391 TERMINATE.append(None)
392 signal.signal(signal.SIGTERM, _finalize)
396 if not options.announce_only:
397 router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
398 router_socket.bind(options.mrt_path)
399 router_socket.listen(0)
400 router_remote_socket, router_remote_addr = router_socket.accept()
402 fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs)
403 router_thread = RouterThread(rt_cache, router_remote_socket, vifs)
405 for thread in igmp_threads:
408 if not options.announce_only:
410 router_thread.start()
415 if os.path.exists(options.mrt_path):
417 os.remove(options.mrt_path)
420 if os.path.exists(options.fwd_path):
422 os.remove(options.fwd_path)