18 usage = "usage: %prog [options] <enabled-addresses>"
20 parser = optparse.OptionParser(usage=usage)
23 "-d", "--poll-delay", dest="poll_delay", metavar="SECONDS", type="float",
25 help = "Multicast subscription polling interval")
27 "-D", "--refresh-delay", dest="refresh_delay", metavar="SECONDS", type="float",
29 help = "Full-refresh interval - time between full IGMP reports")
31 "-p", "--fwd-path", dest="fwd_path", metavar="PATH",
32 default = "/var/run/mcastfwd",
33 help = "Path of the unix socket in which the program will listen for packets")
35 "-r", "--router-path", dest="mrt_path", metavar="PATH",
36 default = "/var/run/mcastrt",
37 help = "Path of the unix socket in which the program will listen for routing changes")
39 "-A", "--announce-only", dest="announce_only", action="store_true",
41 help = "If given, only group membership announcements will be made. "
42 "Useful for non-router non-member multicast nodes.")
44 "-R", "--no-router", dest="no_router", action="store_true",
46 help = "If given, only group membership announcements and forwarding to the default multicast egress will be made. "
47 "Useful for non-router but member multicast nodes.")
49 "-v", "--verbose", dest="verbose", action="store_true",
51 help = "Log more verbosely")
53 (options, remaining_args) = parser.parse_args(sys.argv[1:])
57 level=logging.DEBUG if options.verbose else logging.WARNING)
59 ETH_P_ALL = 0x00000003
61 TUNSETIFF = 0x400454ca
62 IFF_NO_PI = 0x00001000
65 IFF_VNET_HDR = 0x00004000
66 TUN_PKT_STRIP = 0x00000001
67 IFHWADDRLEN = 0x00000006
72 class IGMPThread(threading.Thread):
73 def __init__(self, vif_addr, *p, **kw):
74 super(IGMPThread, self).__init__(*p, **kw)
76 vif_addr = vif_addr.strip()
77 self.vif_addr = vif_addr
78 self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
79 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
80 socket.inet_aton(self.vif_addr) )
81 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
82 self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
87 proc = subprocess.Popen(['ip','addr','show'],
88 stdout = subprocess.PIPE,
89 stderr = subprocess.STDOUT,
90 stdin = open('/dev/null','r+b') )
92 heading = re.compile(r"\d+:\s*([-a-zA-Z0-9_]+):.*")
93 addr = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}).*")
94 for line in proc.stdout:
95 match = heading.match(line)
97 tun_name = match.group(1)
99 match = addr.match(line)
100 if match and match.group(1) == vif_addr:
101 self.tun_name = tun_name
104 raise RuntimeError, "Could not find iterface for", vif_addr
107 devnull = open('/dev/null','r+b')
108 maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
110 lastfullrefresh = time.time()
111 vif_addr_i = socket.inet_aton(self.vif_addr)
112 while not self._stop:
113 mirror_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
115 # Get current subscriptions @ vif
116 proc = subprocess.Popen(['ip','maddr','show',self.tun_name],
117 stdout = subprocess.PIPE,
118 stderr = subprocess.STDOUT,
121 for line in proc.stdout:
122 match = maddr_re.match(line)
124 new_maddr.add(match.group(1))
127 # Get current subscriptions @ eth0 (default on PL),
128 # they should be considered "universal" suscriptions.
129 proc = subprocess.Popen(['ip','maddr','show', 'eth0'],
130 stdout = subprocess.PIPE,
131 stderr = subprocess.STDOUT,
134 for line in proc.stdout:
135 match = maddr_re.match(line)
137 new_maddr.add(match.group(1))
139 mirror_socket.setsockopt(
141 socket.IP_ADD_MEMBERSHIP,
142 socket.inet_aton(match.group(1))+vif_addr_i )
144 traceback.print_exc(file=sys.stderr)
147 # Every now and then, send a full report
149 report_new = new_maddr
150 if (now - lastfullrefresh) <= options.refresh_delay:
151 report_new = report_new - cur_maddr
153 lastfullrefresh = now
155 # Report subscriptions
156 for grp in report_new:
157 print >>sys.stderr, "JOINING", grp
158 igmpp = ipaddr2.ipigmp(
159 self.vif_addr, grp, 1, 0x16, 0, grp,
162 self.igmp_socket.sendto(igmpp, 0, (grp,0))
164 traceback.print_exc(file=sys.stderr)
167 for grp in cur_maddr - new_maddr:
168 print >>sys.stderr, "LEAVING", grp
169 igmpp = ipaddr2.ipigmp(
170 self.vif_addr, '224.0.0.2', 1, 0x17, 0, grp,
173 self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
175 traceback.print_exc(file=sys.stderr)
177 cur_maddr = new_maddr
179 time.sleep(options.poll_delay)
183 self.join(1+5*options.poll_delay)
186 class FWDThread(threading.Thread):
187 def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
188 super(FWDThread, self).__init__(*p, **kw)
190 self.in_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
191 self.in_socket.bind(options.fwd_path)
193 self.pending = collections.deque()
194 self.maxpending = 1000
195 self.rt_cache = rt_cache
196 self.router_socket = router_socket
199 # prepare forwarding sockets
200 self.fwd_sockets = {}
201 for fwd_target in remaining_args:
202 fwd_target = socket.inet_aton(fwd_target)
203 fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
204 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
205 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target)
206 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
207 self.fwd_sockets[fwd_target] = fwd_socket
209 # we always forward to eth0
210 # In PL, we cannot join the multicast routers in eth0,
211 # that would bring a lot of trouble. But we can
212 # listen there for subscriptions and forward interesting
213 # packets, partially joining the mbone
214 # TODO: IGMP messages from eth0 should be selectively
215 # replicated in all vifs to propagate external
216 # subscriptions. It is complex though.
217 fwd_target = '\x00'*4
218 fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
219 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
220 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, fwd_target)
221 fwd_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
222 self.fwd_sockets[fwd_target] = fwd_socket
228 in_socket = self.in_socket
229 rt_cache = self.rt_cache
231 router_socket = self.router_socket
235 pending = self.pending
236 in_socket.settimeout(options.poll_delay)
238 enumerate_ = enumerate
239 fwd_sockets = self.fwd_sockets
242 def_socket = fwd_sockets['\x00\x00\x00\x00']
244 while not self._stop:
247 if pending and npending:
248 packet = pending.pop()
251 packet = in_socket.recv(2000)
252 except socket.timeout, e:
253 if pending and not npending:
254 npending = len_(pending)
256 if not packet or len_(packet) < 24:
261 packet = buffer_(packet,4)
263 if packet[9] == '\x02':
264 # IGMP packet? It's for mrouted
265 # unless it's coming from it
266 # NOTE: mrouted already picks it up when it's sent
267 # to the virtual interface. Injecting it would
269 #if router_socket and packet[12:16] not in fwd_sockets:
271 # router_socket.send(packet)
273 # traceback.print_exc(file=sys.stderr)
275 elif packet[9] == '\x00':
276 # LOOPING packet, discard
282 addrinfo = packet[12:20]
283 fwd_targets, rparent = rt_cache.get(addrinfo, noent)
285 if fwd_targets is not None and (rparent == '\x00\x00\x00\x00' or rparent == parent):
287 ttl = ord_(packet[8])
288 tgt_group = (socket.inet_ntoa(addrinfo[4:]),0)
289 print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl,
290 nfwd_targets = len_(fwd_targets)
291 for vifi, vif in vifs.iteritems():
292 if vifi < nfwd_targets:
293 ttl_thresh = ord_(fwd_targets[vifi])
294 if ttl_thresh > 0 and ttl > ttl_thresh:
295 if vif[4] in fwd_sockets:
297 print >>sys.stderr, socket.inet_ntoa(vif[4]),
298 fwd_socket = fwd_sockets[vif[4]]
299 fwd_socket.sendto(packet, 0, tgt_group)
305 print >>sys.stderr, 'default',
306 def_socket.sendto(packet, 0, tgt_group)
310 print >>sys.stderr, "."
313 if len_(pending) < self.maxpending:
314 tgt_group = addrinfo[4:]
315 print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ?"
317 pending.append(fullpacket)
319 # Notify mrouted by forwarding it with protocol 0
320 router_socket.send(''.join(
321 (packet[:9],'\x00',packet[10:1500]) ))
324 ttl = ord_(packet[8])
325 tgt_group = (socket.inet_ntoa(addrinfo[4:]),0)
328 print >>sys.stderr, map(socket.inet_ntoa, (parent, addrinfo[:4], addrinfo[4:])), "-> ttl", ttl, 'default'
329 def_socket.sendto(packet, 0, tgt_group)
335 self.join(1+5*options.poll_delay)
338 class RouterThread(threading.Thread):
339 def __init__(self, rt_cache, router_socket, vifs, *p, **kw):
340 super(RouterThread, self).__init__(*p, **kw)
342 self.rt_cache = rt_cache
344 self.router_socket = router_socket
350 rt_cache = self.rt_cache
353 router_socket = self.router_socket
354 router_socket.settimeout(options.poll_delay)
361 MRT_ADD_VIF = MRT_BASE+2 # Add a virtual interface
362 MRT_DEL_VIF = MRT_BASE+3 # Delete a virtual interface
363 MRT_ADD_MFC = MRT_BASE+4 # Add a multicast forwarding entry
364 MRT_DEL_MFC = MRT_BASE+5 # Delete a multicast forwarding entry
366 def cmdhdr(cmd, unpack=struct.unpack, buffer=buffer):
367 op,dlen = unpack('II', buffer(cmd,0,8))
370 def vifctl(data, unpack=struct.unpack):
371 #vifi, flags,threshold,rate_limit,lcl_addr,rmt_addr = unpack('HBBI4s4s', data)
372 return unpack('HBBI4s4s', data)
373 def mfcctl(data, unpack=struct.unpack):
374 #origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = unpack('4s4sH10sIIIi', data)
375 return unpack('4s4sH32sIIIi', data)
381 addr_vifs[vifi[4]] = vifi[0]
382 print >>sys.stderr, "Added VIF", vifi
386 del addr_vifs[vifi[4]]
388 print >>sys.stderr, "Removed VIF", vifi
390 origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
392 parent_addr = vifs[parent][4]
394 parent_addr = '\x00\x00\x00\x00'
395 addrinfo = origin + mcastgrp
396 rt_cache[addrinfo] = (ttls, parent_addr)
397 print >>sys.stderr, "Added RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp))), map(ord,ttls)
399 origin,mcastgrp,parent,ttls,pkt_cnt,byte_cnt,wrong_if,expire = mfcctl(data)
401 parent_addr = vifs[parent][4]
403 parent_addr = '\x00\x00\x00\x00'
404 addrinfo = origin + mcastgrp
405 del rt_cache[addrinfo]
406 print >>sys.stderr, "Removed RT", '-'.join(map(socket.inet_ntoa,(parent_addr,origin,mcastgrp)))
409 MRT_ADD_VIF : add_vif,
410 MRT_DEL_VIF : del_vif,
411 MRT_ADD_MFC : add_mfc,
412 MRT_DEL_MFC : del_mfc,
415 while not self._stop:
416 if len_(buf) < 8 or len_(buf) < (cmdhdr(buf)[1]+8):
419 cmd = router_socket.recv(2000)
420 except socket.timeout, e:
423 print >>sys.stderr, "PLRT CONNECTION BROKEN"
424 TERMINATE.append(None)
434 op,dlen,data = cmdhdr(cmd)
435 if len_(data) < dlen:
438 buf = buffer_(data, dlen)
439 data = buffer_(data, 0, dlen)
441 print >>sys.stderr, "COMMAND", op, "DATA", dlen
447 traceback.print_exc(file=sys.stderr)
449 print >>sys.stderr, "IGNORING UNKNOWN COMMAND", op
453 self.join(1+5*options.poll_delay)
459 for vif_addr in remaining_args:
461 igmp_threads.append(IGMPThread(vif_addr))
462 valid_vifs.append(vif_addr)
464 traceback.print_exc()
465 print >>sys.stderr, "WARNING: could not listen on interface", vif_addr
467 remaining_args = valid_vifs
474 def _finalize(sig,frame):
476 TERMINATE.append(None)
477 signal.signal(signal.SIGTERM, _finalize)
481 if not options.announce_only and not options.no_router:
482 router_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
483 router_socket.bind(options.mrt_path)
484 router_socket.listen(0)
485 router_remote_socket, router_remote_addr = router_socket.accept()
486 router_thread = RouterThread(rt_cache, router_remote_socket, vifs)
488 router_remote_socket = None
491 if not options.announce_only:
492 fwd_thread = FWDThread(rt_cache, router_remote_socket, vifs)
494 for thread in igmp_threads:
497 if not options.announce_only:
499 if not options.no_router and not options.announce_only:
500 router_thread.start()
505 if os.path.exists(options.mrt_path):
507 os.remove(options.mrt_path)
510 if os.path.exists(options.fwd_path):
512 os.remove(options.fwd_path)