Added support for suspending and resuming traffic on PlanetLab TAP/TUN interfaces.
[nepi.git] / src / nepi / testbeds / planetlab / scripts / tun_connect.py
1 import sys
2
3 import socket
4 import fcntl
5 import os
6 import os.path
7 import select
8 import signal
9
10 import struct
11 import ctypes
12 import optparse
13 import threading
14 import subprocess
15 import re
16 import functools
17 import time
18 import base64
19 import traceback
20 from Queue import Queue
21
22 import tunchannel
23
24 try:
25     import iovec
26     HAS_IOVEC = True
27 except:
28     HAS_IOVEC = False
29
30 tun_name = 'tun0'
31 tun_path = '/dev/net/tun'
32 hostaddr = socket.gethostbyname(socket.gethostname())
33
34 usage = "usage: %prog [options]"
35
36 parser = optparse.OptionParser(usage=usage)
37
38 parser.add_option(
39     "-i", "--iface", dest="tun_name", metavar="DEVICE",
40     default = "tun0",
41     help = "TUN/TAP interface to tap into")
42 parser.add_option(
43     "-d", "--tun-path", dest="tun_path", metavar="PATH",
44     default = "/dev/net/tun",
45     help = "TUN/TAP device file path or file descriptor number")
46 parser.add_option(
47     "-p", "--peer-port", dest="peer_port", metavar="PEER_PORT", type="int",
48     default = 15000,
49     help = "Remote TCP/UDP port to connect to.")
50 parser.add_option(
51     "--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET",
52     default = None,
53     help = "Path to a unix-domain socket to pass the TUN file descriptor to. "
54            "If given, all other connectivity options are ignored, tun_connect will "
55            "simply wait to be killed after passing the file descriptor, and it will be "
56            "the receiver's responsability to handle the tunneling.")
57 parser.add_option(
58     "-m", "--mode", dest="mode", metavar="MODE",
59     default = "none",
60     help = 
61         "Set mode. One of none, tun, tap, pl-tun, pl-tap, pl-gre-ip, pl-gre-eth. In any mode except none, a TUN/TAP will be created "
62         "by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), "
63         "and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have "
64         "to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).")
65 parser.add_option(
66     "-t", "--protocol", dest="protocol", metavar="PROTOCOL",
67     default = None,
68     help = 
69         "Set protocol. One of tcp, udp, fd, gre. In any mode except none, a TUN/TAP will be created.")
70 parser.add_option(
71     "-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS",
72     default = None,
73     help = 
74         "See mode. This specifies the VIF_ADDRESS, "
75         "the IP address of the virtual interface.")
76 parser.add_option(
77     "-M", "--vif-mask", dest="vif_mask", type="int", metavar="VIF_MASK", 
78     default = None,
79     help = 
80         "See mode. This specifies the VIF_MASK, "
81         "a number indicating the network type (ie: 24 for a C-class network).")
82 parser.add_option(
83     "-P", "--port", dest="port", type="int", metavar="PORT", 
84     default = None,
85     help = 
86         "This specifies the LOCAL_PORT. This will be the local bind port for UDP/TCP.")
87 parser.add_option(
88     "-S", "--vif-snat", dest="vif_snat", 
89     action = "store_true",
90     default = False,
91     help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " )
92 parser.add_option(
93     "-Z", "--vif-pointopoint", dest="vif_pointopoint",  metavar="DST_ADDR",
94     default = None,
95     help = 
96         "See mode. This specifies the remote endpoint's virtual address, "
97         "for point-to-point routing configuration. "
98         "Not supported by PlanetLab" )
99 parser.add_option(
100     "-Q", "--vif-txqueuelen", dest="vif_txqueuelen", metavar="SIZE", type="int",
101     default = None,
102     help = 
103         "See mode. This specifies the interface's transmission queue length. " )
104 parser.add_option(
105     "-b", "--bwlimit", dest="bwlimit", metavar="BYTESPERSECOND", type="int",
106     default = None,
107     help = 
108         "This specifies the interface's emulated bandwidth in bytes per second." )
109 parser.add_option(
110     "-a", "--peer-address", dest="peer_addr", metavar="PEER_ADDRESS",
111     default = None,
112     help = 
113         "This specifies the PEER_ADDRESS, "
114         "the IP address of the remote interface.")
115 parser.add_option(
116     "-k", "--key", dest="cipher_key", metavar="KEY",
117     default = None,
118     help = 
119         "Specify a symmetric encryption key with which to protect packets across "
120         "the tunnel. python-crypto must be installed on the system." )
121 parser.add_option(
122     "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
123     default = "true",
124     help = 
125         "Specify a demultiplexing 32-bit numeric key for GRE." )
126 parser.add_option(
127     "-C", "--cipher", dest="cipher", metavar="CIPHER",
128     default = 'AES',
129     help = "One of PLAIN, AES, Blowfish, DES, DES3. " )
130 parser.add_option(
131     "-N", "--no-capture", dest="no_capture", 
132     action = "store_true",
133     default = False,
134     help = "If specified, packets won't be logged to standard output "
135            "(default is to log them to standard output). " )
136 parser.add_option(
137     "-c", "--pcap-capture", dest="pcap_capture", metavar="FILE",
138     default = None,
139     help = "If specified, packets won't be logged to standard output, "
140            "but dumped to a pcap-formatted trace in the specified file. " )
141 parser.add_option(
142     "--multicast-forwarder", dest="multicast_fwd", 
143     default = None,
144     help = "If specified, multicast packets will be forwarded to "
145            "the specified unix-domain socket. If the device uses ethernet "
146            "frames, ethernet headers will be stripped and IP packets "
147            "will be forwarded, prefixed with the interface's address." )
148 parser.add_option(
149     "--filter", dest="filter_module", metavar="PATH",
150     default = None,
151     help = "If specified, it should be either a .py or .so module. "
152            "It will be loaded, and all incoming and outgoing packets "
153            "will be routed through it. The filter will not be responsible "
154            "for buffering, packet queueing is performed in tun_connect "
155            "already, so it should not concern itself with it. It should "
156            "not, however, block in one direction if the other is congested.\n"
157            "\n"
158            "Modules are expected to have the following methods:\n"
159            "\tinit(**args)\n"
160            "\t\tIf arguments are given, this method will be called with the\n"
161            "\t\tgiven arguments (as keyword args in python modules, or a single\n"
162            "\t\tstring in c modules).\n"
163            "\taccept_packet(packet, direction):\n"
164            "\t\tDecide whether to drop the packet. Direction is 0 for packets "
165                "coming from the local side to the remote, and 1 is for packets "
166                "coming from the remote side to the local. Return a boolean, "
167                "true if the packet is not to be dropped.\n"
168            "\tfilter_init():\n"
169            "\t\tInitializes a filtering pipe (filter_run). It should "
170                "return two file descriptors to use as a bidirectional "
171                "pipe: local and remote. 'local' is where packets from the "
172                "local side will be written to. After filtering, those packets "
173                "should be written to 'remote', where tun_connect will read "
174                "from, and it will forward them to the remote peer. "
175                "Packets from the remote peer will be written to 'remote', "
176                "where the filter is expected to read from, and eventually "
177                "forward them to the local side. If the file descriptors are "
178                "not nonblocking, they will be set to nonblocking. So it's "
179                "better to set them from the start like that.\n"
180            "\tfilter_run(local, remote):\n"
181            "\t\tIf filter_init is provided, it will be called repeatedly, "
182                "in a separate thread until the process is killed. It should "
183                "sleep at most for a second.\n"
184            "\tfilter_close(local, remote):\n"
185            "\t\tCalled then the process is killed, if filter_init was provided. "
186                "It should, among other things, close the file descriptors.\n"
187            "\n"
188            "Python modules are expected to return a tuple in filter_init, "
189            "either of file descriptors or file objects, while native ones "
190            "will receive two int*.\n"
191            "\n"
192            "Python modules can additionally contain a custom queue class "
193            "that will replace the FIFO used by default. The class should "
194            "be named 'queueclass' and contain an interface compatible with "
195            "collections.deque. That is, indexing (especiall for q[0]), "
196            "bool(q), popleft, appendleft, pop (right), append (right), "
197            "len(q) and clear. When using a custom queue, queue size will "
198            "have no effect, pass an effective queue size to the module "
199            "by using filter_args" )
200 parser.add_option(
201     "--filter-args", dest="filter_args", metavar="FILE",
202     default = None,
203     help = "If specified, packets won't be logged to standard output, "
204            "but dumped to a pcap-formatted trace in the specified file. " )
205
206 (options,args) = parser.parse_args(sys.argv[1:])
207
208 options.cipher = {
209     'aes' : 'AES',
210     'des' : 'DES',
211     'des3' : 'DES3',
212     'blowfish' : 'Blowfish',
213     'plain' : None,
214 }[options.cipher.lower()]
215
216 ETH_P_ALL = 0x00000003
217 ETH_P_IP = 0x00000800
218 TUNSETIFF = 0x400454ca
219 IFF_NO_PI = 0x00001000
220 IFF_TAP = 0x00000002
221 IFF_TUN = 0x00000001
222 IFF_VNET_HDR = 0x00004000
223 TUN_PKT_STRIP = 0x00000001
224 IFHWADDRLEN = 0x00000006
225 IFNAMSIZ = 0x00000010
226 IFREQ_SZ = 0x00000028
227 FIONREAD = 0x0000541b
228
229 class HostLock(object):
230     # This class is used as a lock to prevent concurrency issues with more
231     # than one instance of netns running in the same machine. Both in 
232     # different processes or different threads.
233     taken = False
234     processcond = threading.Condition()
235     
236     def __init__(self, lockfile):
237         processcond = self.__class__.processcond
238         
239         processcond.acquire()
240         try:
241             # It's not reentrant
242             while self.__class__.taken:
243                 processcond.wait()
244             self.__class__.taken = True
245         finally:
246             processcond.release()
247         
248         self.lockfile = lockfile
249         
250         while True:
251             try:
252                 fcntl.flock(self.lockfile, fcntl.LOCK_EX)
253                 break
254             except (OSError, IOError), e:
255                 if e.args[0] != os.errno.EINTR:
256                     raise
257     
258     def __del__(self):
259         processcond = self.__class__.processcond
260         
261         processcond.acquire()
262         try:
263             if not self.lockfile.closed:
264                 fcntl.flock(self.lockfile, fcntl.LOCK_UN)
265             
266             # It's not reentrant
267             self.__class__.taken = False
268             processcond.notify()
269         finally:
270             processcond.release()
271
272 def ifnam(x):
273     return x+'\x00'*(IFNAMSIZ-len(x))
274
275 def ifreq(iface, flags):
276     # ifreq contains:
277     #   char[IFNAMSIZ] : interface name
278     #   short : flags
279     #   <padding>
280     ifreq = ifnam(iface)+struct.pack("H",flags);
281     ifreq += '\x00' * (len(ifreq)-IFREQ_SZ)
282     return ifreq
283
284 def tunopen(tun_path, tun_name):
285     if tun_path.isdigit():
286         # open TUN fd
287         print >>sys.stderr, "Using tun:", tun_name, "fd", tun_path
288         tun = os.fdopen(int(tun_path), 'r+b', 0)
289     else:
290         # open TUN path
291         print >>sys.stderr, "Using tun:", tun_name, "at", tun_path
292         tun = open(tun_path, 'r+b', 0)
293
294         # bind file descriptor to the interface
295         fcntl.ioctl(tun.fileno(), TUNSETIFF, ifreq(tun_name, IFF_NO_PI|IFF_TUN))
296     
297     return tun
298
299 def tunclose(tun_path, tun_name, tun):
300     if tun_path and tun_path.isdigit():
301         # close TUN fd
302         os.close(int(tun_path))
303         tun.close()
304     elif tun:
305         # close TUN object
306         tun.close()
307
308 def noopen(tun_path, tun_name):
309     print >>sys.stderr, "Using tun:", tun_name
310     return None
311 def noclose(tun_path, tun_name, tun):
312     pass
313
314 def tuntap_alloc(kind, tun_path, tun_name):
315     args = ["tunctl"]
316     if kind == "tun":
317         args.append("-n")
318     if tun_name:
319         args.append("-t")
320         args.append(tun_name)
321     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
322     out,err = proc.communicate()
323     if proc.wait():
324         raise RuntimeError, "Could not allocate %s device" % (kind,)
325         
326     match = re.search(r"Set '(?P<dev>(?:tun|tap)[0-9]*)' persistent and owned by .*", out, re.I)
327     if not match:
328         raise RuntimeError, "Could not allocate %s device - tunctl said: %s" % (kind, out)
329     
330     tun_name = match.group("dev")
331     print >>sys.stderr, "Allocated %s device: %s" % (kind, tun_name)
332     
333     return tun_path, tun_name
334
335 def tuntap_dealloc(tun_path, tun_name):
336     args = ["tunctl", "-d", tun_name]
337     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
338     out,err = proc.communicate()
339     if proc.wait():
340         print >> sys.stderr, "WARNING: error deallocating %s device" % (tun_name,)
341
342 def nmask_to_dot_notation(mask):
343     mask = hex(((1 << mask) - 1) << (32 - mask)) # 24 -> 0xFFFFFF00
344     mask = mask[2:] # strip 0x
345     mask = mask.decode("hex") # to bytes
346     mask = '.'.join(map(str,map(ord,mask))) # to 255.255.255.0
347     return mask
348
349 def vif_start(tun_path, tun_name):
350     args = ["ifconfig", tun_name, options.vif_addr, 
351             "netmask", nmask_to_dot_notation(options.vif_mask),
352             "-arp" ]
353     if options.vif_pointopoint:
354         args.extend(["pointopoint",options.vif_pointopoint])
355     if options.vif_txqueuelen is not None:
356         args.extend(["txqueuelen",str(options.vif_txqueuelen)])
357     args.append("up")
358     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
359     out,err = proc.communicate()
360     if proc.wait():
361         raise RuntimeError, "Error starting virtual interface"
362     
363     if options.vif_snat:
364         # set up SNAT using iptables
365         # TODO: stop vif on error. 
366         #   Not so necessary since deallocating the tun/tap device
367         #   will forcibly stop it, but it would be tidier
368         args = [ "iptables", "-t", "nat", "-A", "POSTROUTING", 
369                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
370                  "-j", "SNAT",
371                  "--to-source", hostaddr, "--random" ]
372         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
373         out,err = proc.communicate()
374         if proc.wait():
375             raise RuntimeError, "Error setting up SNAT"
376
377 def vif_stop(tun_path, tun_name):
378     if options.vif_snat:
379         # set up SNAT using iptables
380         args = [ "iptables", "-t", "nat", "-D", "POSTROUTING", 
381                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
382                  "-j", "SNAT",
383                  "--to-source", hostaddr, "--random" ]
384         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
385         out,err = proc.communicate()
386     
387     args = ["ifconfig", tun_name, "down"]
388     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
389     out,err = proc.communicate()
390     if proc.wait():
391         print >>sys.stderr, "WARNING: error stopping virtual interface"
392     
393     
394 def pl_tuntap_alloc(kind, tun_path, tun_name):
395     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
396     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
397     kind = {"tun":IFF_TUN,
398             "tap":IFF_TAP}[kind]
399     fd = tunalloc_so.tun_alloc(kind, c_tun_name)
400     name = c_tun_name.value
401     return str(fd), name
402
403 _name_reservation = None
404 def pl_tuntap_namealloc(kind, tun_path, tun_name):
405     global _name_reservation
406     # Serialize access
407     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
408     lock = HostLock(lockfile)
409     
410     # We need to do this, fd_tuntap is the only one who can
411     # tell us our slice id (this script runs as root, so no uid),
412     # and the pattern of device names accepted by vsys scripts
413     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
414     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
415     nkind= {"tun":IFF_TUN,
416             "tap":IFF_TAP}[kind]
417     fd = tunalloc_so.tun_alloc(nkind, c_tun_name)
418     name = c_tun_name.value
419     os.close(fd)
420
421     base = name[:name.index('-')+1]
422     existing = set(map(str.strip,os.popen("ip a | grep -o '%s[0-9]*'" % (base,)).read().strip().split('\n')))
423     
424     for i in xrange(9000,10000):
425         name = base + str(i)
426         if name not in existing:
427             break
428     else:
429         raise RuntimeError, "Could not assign interface name"
430     
431     _name_reservation = lock
432     
433     return None, name
434
435 def pl_vif_start(tun_path, tun_name):
436     global _name_reservation
437
438     out = []
439     def outreader():
440         out.append(stdout.read())
441         stdout.close()
442         time.sleep(1)
443
444     # Serialize access to vsys
445     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
446     lock = _name_reservation or HostLock(lockfile)
447     _name_reservation = None
448     
449     stdin = open("/vsys/vif_up.in","w")
450     stdout = open("/vsys/vif_up.out","r")
451
452     t = threading.Thread(target=outreader)
453     t.start()
454     
455     stdin.write(tun_name+"\n")
456     stdin.write(options.vif_addr+"\n")
457     stdin.write(str(options.vif_mask)+"\n")
458     if options.vif_snat:
459         stdin.write("snat=1\n")
460     if options.vif_pointopoint:
461         stdin.write("pointopoint=%s\n" % (options.vif_pointopoint,))
462     if options.vif_txqueuelen is not None:
463         stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
464     if options.mode.startswith('pl-gre'):
465         stdin.write("gre=%s\n" % (options.gre_key,))
466         stdin.write("remote=%s\n" % (options.peer_addr,))
467     stdin.close()
468     
469     t.join()
470     out = ''.join(out)
471     if out.strip():
472         print >>sys.stderr, out
473     
474     del lock, lockfile
475
476 def pl_vif_stop(tun_path, tun_name):
477     out = []
478     def outreader():
479         out.append(stdout.read())
480         stdout.close()
481         
482         if options.mode.startswith('pl-gre'):
483             lim = 120
484         else:
485             lim = 2
486         
487         for i in xrange(lim):
488             ifaces = set(map(str.strip,os.popen("ip a | grep -o '%s'" % (tun_name,)).read().strip().split('\n')))
489             if tun_name in ifaces:
490                 time.sleep(1)
491             else:
492                 break
493
494     # Serialize access to vsys
495     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
496     lock = HostLock(lockfile)
497
498     stdin = open("/vsys/vif_down.in","w")
499     stdout = open("/vsys/vif_down.out","r")
500     
501     t = threading.Thread(target=outreader)
502     t.start()
503     
504     stdin.write(tun_name+"\n")
505     stdin.close()
506     
507     t.join()
508     out = ''.join(out)
509     if out.strip():
510         print >>sys.stderr, out
511     
512     del lock, lockfile
513
514
515 def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True, bwlimit = None):
516     global TERMINATE
517     global SUSPEND
518     
519     tunqueue = options.vif_txqueuelen or 1000
520     tunkqueue = 500
521     
522     # in PL mode, we cannot strip PI structs
523     # so we'll have to handle them
524     tunchannel.tun_fwd(tun, remote,
525         with_pi = options.mode.startswith('pl-'),
526         ether_mode = tun_name.startswith('tap'),
527         cipher_key = options.cipher_key,
528         udp = options.protocol == 'udp',
529         TERMINATE = TERMINATE,
530         SUSPEND = SUSPEND,
531         stderr = None,
532         reconnect = reconnect,
533         tunqueue = tunqueue,
534         tunkqueue = tunkqueue,
535         cipher = options.cipher,
536         accept_local = accept_local,
537         accept_remote = accept_remote,
538         queueclass = queueclass,
539         slowlocal = slowlocal,
540         bwlimit = bwlimit
541     )
542
543
544
545 nop = lambda tun_path, tun_name : (tun_path, tun_name)
546 MODEINFO = {
547     'none' : dict(alloc=nop,
548                   tunopen=tunopen, tunclose=tunclose,
549                   dealloc=nop,
550                   start=nop,
551                   stop=nop),
552     'tun'  : dict(alloc=functools.partial(tuntap_alloc, "tun"),
553                   tunopen=tunopen, tunclose=tunclose,
554                   dealloc=tuntap_dealloc,
555                   start=vif_start,
556                   stop=vif_stop),
557     'tap'  : dict(alloc=functools.partial(tuntap_alloc, "tap"),
558                   tunopen=tunopen, tunclose=tunclose,
559                   dealloc=tuntap_dealloc,
560                   start=vif_start,
561                   stop=vif_stop),
562     'pl-tun'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tun"),
563                   tunopen=tunopen, tunclose=tunclose,
564                   dealloc=nop,
565                   start=pl_vif_start,
566                   stop=pl_vif_stop),
567     'pl-tap'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tap"),
568                   tunopen=tunopen, tunclose=tunclose,
569                   dealloc=nop,
570                   start=pl_vif_start,
571                   stop=pl_vif_stop),
572     'pl-gre-ip' : dict(alloc=functools.partial(pl_tuntap_namealloc, "tun"),
573                   tunopen=noopen, tunclose=tunclose,
574                   dealloc=nop,
575                   start=pl_vif_start,
576                   stop=pl_vif_stop),
577     'pl-gre-eth': dict(alloc=functools.partial(pl_tuntap_namealloc, "tap"),
578                   tunopen=noopen, tunclose=noclose,
579                   dealloc=nop,
580                   start=pl_vif_start,
581                   stop=pl_vif_stop),
582 }
583     
584 tun_path = options.tun_path
585 tun_name = options.tun_name
586
587 modeinfo = MODEINFO[options.mode]
588
589 # Try to load filter module
590 filter_thread = None
591 if options.filter_module:
592     print >>sys.stderr, "Loading module", options.filter_module, "with args", options.filter_args
593     if options.filter_module.endswith('.py'):
594         sys.path.append(os.path.dirname(options.filter_module))
595         filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
596         if options.filter_args:
597             try:
598                 filter_args = dict(map(lambda x:x.split('=',1),options.filter_args.split(',')))
599                 filter_module.init(**filter_args)
600             except:
601                 traceback.print_exc()
602     elif options.filter_module.endswith('.so'):
603         filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
604         if options.filter_args:
605             try:
606                 filter_module.init(options.filter_args)
607             except:
608                 traceback.print_exc()
609     try:
610         accept_packet = filter_module.accept_packet
611         print >>sys.stderr, "Installing packet filter (accept_packet)"
612     except:
613         accept_packet = None
614     
615     try:
616         queueclass = filter_module.queueclass
617         print >>sys.stderr, "Installing custom queue"
618     except:
619         queueclass = None
620     
621     try:
622         _filter_init = filter_module.filter_init
623         filter_run = filter_module.filter_run
624         filter_close = filter_module.filter_close
625         
626         def filter_init():
627             filter_local = ctypes.c_int(0)
628             filter_remote = ctypes.c_int(0)
629             _filter_init(filter_local, filter_remote)
630             return filter_local, filter_remote
631
632         print >>sys.stderr, "Installing packet filter (stream filter)"
633     except:
634         filter_init = None
635         filter_run = None
636         filter_close = None
637 else:
638     accept_packet = None
639     filter_init = None
640     filter_run = None
641     filter_close = None
642     queueclass = None
643
644 # install multicast forwarding hook
645 if options.multicast_fwd:
646     print >>sys.stderr, "Connecting to mcast filter"
647     mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
648     #disable nonblocking, cannot handle EWOULDBLOCK
649     #tunchannel.nonblock(mcfwd_sock.fileno())
650     mcfwd_sock.settimeout(0.5) # 500ms tops - packet lost if it blocks more than that
651
652 # be careful to roll back stuff on exceptions
653 tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
654 try:
655     modeinfo['start'](tun_path, tun_name)
656     try:
657         tun = modeinfo['tunopen'](tun_path, tun_name)
658     except:
659         modeinfo['stop'](tun_path, tun_name)
660         raise
661 except:
662     modeinfo['dealloc'](tun_path, tun_name)
663     raise
664
665
666 # Trak SIGTERM, and set global termination flag instead of dying
667 TERMINATE = []
668 def _finalize(sig,frame):
669     global TERMINATE
670     TERMINATE.append(None)
671 signal.signal(signal.SIGTERM, _finalize)
672
673 # SIGUSR1 suspends forwading, SIGUSR2 resumes forwarding
674 SUSPEND = []
675 def _suspend(sig,frame):
676     global SUSPEND
677     if not SUSPEND:
678         SUSPEND.append(None)
679 signal.signal(signal.SIGUSR1, _suspend)
680
681 def _resume(sig,frame):
682     global SUSPEND
683     if SUSPEND:
684         SUSPEND.remove(None)
685 signal.signal(signal.SIGUSR2, _resume)
686
687 try:
688     tcpdump = None
689     reconnect = None
690     mcfwd_thread = None
691
692     # install multicast forwarding hook
693     if options.multicast_fwd:
694         print >>sys.stderr, "Installing mcast filter"
695         
696         if HAS_IOVEC:
697             writev = iovec.writev
698         else:
699             os_write = os.write
700             map_ = map
701             str_ = str
702             def writev(fileno, *stuff):
703                 os_write(''.join(map_(str_,stuff)))
704         
705         
706         mcfwd_queue = Queue(options.vif_txqueuelen or 500)
707         def mcfwd_thread_fn(
708                 sock=mcfwd_sock, 
709                 sockno=mcfwd_sock.fileno(),
710                 multicast_fwd = options.multicast_fwd,
711                 vif_addr = socket.inet_aton(options.vif_addr),
712                 writev=writev,
713                 retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR),
714                 len=len, ord=ord):
715             TERMINATE_ = TERMINATE
716             connected = False
717             
718             while not TERMINATE_:
719                 try:
720                     fwd = mcfwd_queue.get(True, 1)
721                 except:
722                     continue
723                 
724                 # Forward it
725                 if not connected:
726                     try:
727                         sock.connect(multicast_fwd)
728                         connected = True
729                     except:
730                         traceback.print_exc(file=sys.stderr)
731                 if connected:
732                     try:
733                         writev(sockno, vif_addr,fwd)
734                     except OSError,e:
735                         if e.errno not in retrycodes:
736                             traceback.print_exc(file=sys.stderr)
737                         else:
738                             try:
739                                 writev(sockno, vif_addr,fwd)
740                             except:
741                                 traceback.print_exc(file=sys.stderr)
742                     except socket.timeout:
743                         # packet lost
744                         continue
745                     except:
746                         traceback.print_exc(file=sys.stderr)
747                 
748                 mcfwd_queue.task_done()
749         mcfwd_thread = threading.Thread(target=mcfwd_thread_fn)
750         mcfwd_thread.start()
751         
752         def accept_packet(packet, direction, 
753                 _up_accept=accept_packet, 
754                 etherProto=tunchannel.etherProto,
755                 etherStrip=tunchannel.etherStrip,
756                 etherMode=tun_name.startswith('tap'),
757                 len=len, ord=ord):
758             if _up_accept:
759                 rv = _up_accept(packet, direction)
760                 if not rv:
761                     return rv
762
763             if direction == 1:
764                 # Incoming... what?
765                 if etherMode:
766                     if etherProto(packet)=='\x08\x00':
767                         fwd = etherStrip(packet)
768                     else:
769                         fwd = None
770                 else:
771                     fwd = packet
772                 if fwd is not None and len(fwd) >= 20:
773                     if (ord(fwd[16]) & 0xf0) == 0xe0:
774                         # Queue for forwarding
775                         try:
776                             mcfwd_queue.put_nowait(fwd)
777                         except:
778                             print >>sys.stderr, "Multicast packet dropped, forwarder queue full"
779             return 1
780
781     
782     if options.protocol == 'fd':
783         if accept_packet or filter_init:
784             raise NotImplementedError, "--pass-fd and --filter are not compatible"
785         
786         if options.pass_fd.startswith("base64:"):
787             options.pass_fd = base64.b64decode(
788                 options.pass_fd[len("base64:"):])
789             options.pass_fd = os.path.expandvars(options.pass_fd)
790         
791         print >>sys.stderr, "Sending FD to: %r" % (options.pass_fd,)
792         
793         # send FD to whoever wants it
794         import passfd
795         
796         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
797         retrydelay = 1.0
798         for i in xrange(30):
799             if TERMINATE:
800                 raise OSError, "Killed"
801             try:
802                 sock.connect(options.pass_fd)
803                 break
804             except socket.error:
805                 # wait a while, retry
806                 print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
807                 time.sleep(min(30.0,retrydelay))
808                 retrydelay *= 1.1
809         else:
810             sock.connect(options.pass_fd)
811         passfd.sendfd(sock, tun.fileno(), '0')
812         
813         # just wait forever
814         def tun_fwd(tun, remote, **kw):
815             global TERMINATE
816             TERM = TERMINATE
817             while not TERM:
818                 time.sleep(1)
819         remote = None
820     elif options.protocol == "gre":
821         if accept_packet or filter_init:
822             raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
823         
824         # just wait forever
825         def tun_fwd(tun, remote, **kw):
826             global TERMINATE
827             TERM = TERMINATE
828             while not TERM:
829                 time.sleep(1)
830         remote = options.peer_addr
831     elif options.protocol == "udp":
832         # connect to remote endpoint
833         if options.peer_addr and options.peer_port:
834             rsock = tunchannel.udp_establish(TERMINATE, hostaddr, options.port, 
835                     options.peer_addr, options.peer_port)
836             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
837         else:
838             print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
839             raise AssertionError, "Error: need a remote endpoint in UDP mode"
840     elif options.protocol == "tcp":
841         # connect to remote endpoint
842         if options.peer_addr and options.peer_port:
843             rsock = tunchannel.tcp_establish(TERMINATE, hostaddr, options.port,
844                     options.peer_addr, options.peer_port)
845             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
846         else:
847             print >>sys.stderr, "Error: need a remote endpoint in TCP mode"
848             raise AssertionError, "Error: need a remote endpoint in TCP mode"
849     else:
850         msg = "Error: Invalid protocol %s" % options.protocol
851         print >>sys.stderr, msg 
852         raise AssertionError, msg
853
854     if filter_init:
855         filter_local, filter_remote = filter_init()
856         
857         def filter_loop():
858             global TERMINATE
859             TERM = TERMINATE
860             run = filter_run
861             local = filter_local
862             remote = filter_remote
863             while not TERM:
864                 run(local, remote)
865             filter_close(local, remote)
866             
867         filter_thread = threading.Thread(target=filter_loop)
868         filter_thread.start()
869     
870     print >>sys.stderr, "Connected"
871
872     if not options.no_capture:
873         # Launch a tcpdump subprocess, to capture and dump packets.
874         # Make sure to catch sigterm and kill the tcpdump as well
875         tcpdump = subprocess.Popen(
876             ["tcpdump","-l","-n","-i",tun_name, "-s", "4096"]
877             + ["-w",options.pcap_capture,"-U"] * bool(options.pcap_capture) )
878     
879     # Try to give us high priority
880     try:
881         os.nice(-20)
882     except:
883         # Ignore errors, we might not have enough privileges,
884         # or perhaps there is no os.nice support in the system
885         pass
886     
887     if not filter_init:
888         tun_fwd(tun, remote,
889             reconnect = reconnect,
890             accept_local = accept_packet,
891             accept_remote = accept_packet,
892             bwlimit = options.bwlimit,
893             slowlocal = True)
894     else:
895         # Hm...
896         # ...ok, we need to:
897         #  1. Forward packets from tun to filter
898         #  2. Forward packets from remote to filter
899         #
900         # 1. needs TUN rate-limiting, while 
901         # 2. needs reconnection
902         #
903         # 1. needs ONLY TUN-side acceptance checks, while
904         # 2. needs ONLY remote-side acceptance checks
905         if isinstance(filter_local, ctypes.c_int):
906             filter_local_fd = filter_local.value
907         else:
908             filter_local_fd = filter_local
909         if isinstance(filter_remote, ctypes.c_int):
910             filter_remote_fd = filter_remote.value
911         else:
912             filter_remote_fd = filter_remote
913
914         def localside():
915             tun_fwd(tun, filter_local_fd,
916                 accept_local = accept_packet,
917                 slowlocal = True)
918         
919         def remoteside():
920             tun_fwd(filter_remote_fd, remote,
921                 reconnect = reconnect,
922                 accept_remote = accept_packet,
923                 bwlimit = options.bwlimit,
924                 slowlocal = False)
925         
926         localthread = threading.Thread(target=localside)
927         remotethread = threading.Thread(target=remoteside)
928         localthread.start()
929         remotethread.start()
930         localthread.join()
931         remotethread.join()
932
933 finally:
934     try:
935         print >>sys.stderr, "Shutting down..."
936     except:
937         # In case sys.stderr is broken
938         pass
939     
940     # tidy shutdown in every case - swallow exceptions
941     TERMINATE.append(None)
942     
943     if mcfwd_thread:
944         try:
945             mcfwd_thread.join()
946         except:
947             pass
948
949     if filter_thread:
950         try:
951             filter_thread.join()
952         except:
953             pass
954
955     try:
956         if tcpdump:
957             os.kill(tcpdump.pid, signal.SIGTERM)
958             tcpdump.wait()
959     except:
960         pass
961
962     try:
963         modeinfo['stop'](tun_path, tun_name)
964     except:
965         traceback.print_exc()
966
967     try:
968         modeinfo['tunclose'](tun_path, tun_name, tun)
969     except:
970         traceback.print_exc()
971         
972     try:
973         modeinfo['dealloc'](tun_path, tun_name)
974     except:
975         traceback.print_exc()
976     
977     print >>sys.stderr, "TERMINATED GRACEFULLY"
978