df864af22d692c1b0481c7ee0defc3ec5dbe8a4e
[nepi.git] / src / nepi / testbeds / planetlab / scripts / tun_connect.py
1 import sys
2
3 import socket
4 import fcntl
5 import os
6 import os.path
7 import select
8 import signal
9
10 import struct
11 import ctypes
12 import optparse
13 import threading
14 import subprocess
15 import re
16 import functools
17 import time
18 import base64
19 import traceback
20 from Queue import Queue
21
22 import tunchannel
23
24 try:
25     import iovec
26     HAS_IOVEC = True
27 except:
28     HAS_IOVEC = False
29
30 tun_name = 'tun0'
31 tun_path = '/dev/net/tun'
32 hostaddr = socket.gethostbyname(socket.gethostname())
33
34 usage = "usage: %prog [options]"
35
36 parser = optparse.OptionParser(usage=usage)
37
38 parser.add_option(
39     "-i", "--iface", dest="tun_name", metavar="DEVICE",
40     default = "tun0",
41     help = "TUN/TAP interface to tap into")
42 parser.add_option(
43     "-d", "--tun-path", dest="tun_path", metavar="PATH",
44     default = "/dev/net/tun",
45     help = "TUN/TAP device file path or file descriptor number")
46 parser.add_option(
47     "-p", "--peer-port", dest="peer_port", metavar="PEER_PORT", type="int",
48     default = 15000,
49     help = "Remote TCP/UDP port to connect to.")
50 parser.add_option(
51     "--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET",
52     default = None,
53     help = "Path to a unix-domain socket to pass the TUN file descriptor to. "
54            "If given, all other connectivity options are ignored, tun_connect will "
55            "simply wait to be killed after passing the file descriptor, and it will be "
56            "the receiver's responsability to handle the tunneling.")
57 parser.add_option(
58     "-m", "--mode", dest="mode", metavar="MODE",
59     default = "none",
60     help = 
61         "Set mode. One of none, tun, tap, pl-tun, pl-tap, pl-gre-ip, pl-gre-eth. In any mode except none, a TUN/TAP will be created "
62         "by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), "
63         "and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have "
64         "to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).")
65 parser.add_option(
66     "-t", "--protocol", dest="protocol", metavar="PROTOCOL",
67     default = None,
68     help = 
69         "Set protocol. One of tcp, udp, fd, gre. In any mode except none, a TUN/TAP will be created.")
70 parser.add_option(
71     "-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS",
72     default = None,
73     help = 
74         "See mode. This specifies the VIF_ADDRESS, "
75         "the IP address of the virtual interface.")
76 parser.add_option(
77     "-M", "--vif-mask", dest="vif_mask", type="int", metavar="VIF_MASK", 
78     default = None,
79     help = 
80         "See mode. This specifies the VIF_MASK, "
81         "a number indicating the network type (ie: 24 for a C-class network).")
82 parser.add_option(
83     "-P", "--port", dest="port", type="int", metavar="PORT", 
84     default = None,
85     help = 
86         "This specifies the LOCAL_PORT. This will be the local bind port for UDP/TCP.")
87 parser.add_option(
88     "-S", "--vif-snat", dest="vif_snat", 
89     action = "store_true",
90     default = False,
91     help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " )
92 parser.add_option(
93     "-Z", "--vif-pointopoint", dest="vif_pointopoint",  metavar="DST_ADDR",
94     default = None,
95     help = 
96         "See mode. This specifies the remote endpoint's virtual address, "
97         "for point-to-point routing configuration. "
98         "Not supported by PlanetLab" )
99 parser.add_option(
100     "-Q", "--vif-txqueuelen", dest="vif_txqueuelen", metavar="SIZE", type="int",
101     default = None,
102     help = 
103         "See mode. This specifies the interface's transmission queue length. " )
104 parser.add_option(
105     "-b", "--bwlimit", dest="bwlimit", metavar="BYTESPERSECOND", type="int",
106     default = None,
107     help = 
108         "This specifies the interface's emulated bandwidth in bytes per second." )
109 parser.add_option(
110     "-a", "--peer-address", dest="peer_addr", metavar="PEER_ADDRESS",
111     default = None,
112     help = 
113         "This specifies the PEER_ADDRESS, "
114         "the IP address of the remote interface.")
115 parser.add_option(
116     "-k", "--key", dest="cipher_key", metavar="KEY",
117     default = None,
118     help = 
119         "Specify a symmetric encryption key with which to protect packets across "
120         "the tunnel. python-crypto must be installed on the system." )
121 parser.add_option(
122     "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
123     default = "true",
124     help = 
125         "Specify a demultiplexing 32-bit numeric key for GRE." )
126 parser.add_option(
127     "-C", "--cipher", dest="cipher", metavar="CIPHER",
128     default = 'AES',
129     help = "One of PLAIN, AES, Blowfish, DES, DES3. " )
130 parser.add_option(
131     "-N", "--no-capture", dest="no_capture", 
132     action = "store_true",
133     default = False,
134     help = "If specified, packets won't be logged to standard output "
135            "(default is to log them to standard output). " )
136 parser.add_option(
137     "-c", "--pcap-capture", dest="pcap_capture", metavar="FILE",
138     default = None,
139     help = "If specified, packets won't be logged to standard output, "
140            "but dumped to a pcap-formatted trace in the specified file. " )
141 parser.add_option(
142     "--multicast-forwarder", dest="multicast_fwd", 
143     default = None,
144     help = "If specified, multicast packets will be forwarded to "
145            "the specified unix-domain socket. If the device uses ethernet "
146            "frames, ethernet headers will be stripped and IP packets "
147            "will be forwarded, prefixed with the interface's address." )
148 parser.add_option(
149     "--filter", dest="filter_module", metavar="PATH",
150     default = None,
151     help = "If specified, it should be either a .py or .so module. "
152            "It will be loaded, and all incoming and outgoing packets "
153            "will be routed through it. The filter will not be responsible "
154            "for buffering, packet queueing is performed in tun_connect "
155            "already, so it should not concern itself with it. It should "
156            "not, however, block in one direction if the other is congested.\n"
157            "\n"
158            "Modules are expected to have the following methods:\n"
159            "\tinit(**args)\n"
160            "\t\tIf arguments are given, this method will be called with the\n"
161            "\t\tgiven arguments (as keyword args in python modules, or a single\n"
162            "\t\tstring in c modules).\n"
163            "\taccept_packet(packet, direction):\n"
164            "\t\tDecide whether to drop the packet. Direction is 0 for packets "
165                "coming from the local side to the remote, and 1 is for packets "
166                "coming from the remote side to the local. Return a boolean, "
167                "true if the packet is not to be dropped.\n"
168            "\tfilter_init():\n"
169            "\t\tInitializes a filtering pipe (filter_run). It should "
170                "return two file descriptors to use as a bidirectional "
171                "pipe: local and remote. 'local' is where packets from the "
172                "local side will be written to. After filtering, those packets "
173                "should be written to 'remote', where tun_connect will read "
174                "from, and it will forward them to the remote peer. "
175                "Packets from the remote peer will be written to 'remote', "
176                "where the filter is expected to read from, and eventually "
177                "forward them to the local side. If the file descriptors are "
178                "not nonblocking, they will be set to nonblocking. So it's "
179                "better to set them from the start like that.\n"
180            "\tfilter_run(local, remote):\n"
181            "\t\tIf filter_init is provided, it will be called repeatedly, "
182                "in a separate thread until the process is killed. It should "
183                "sleep at most for a second.\n"
184            "\tfilter_close(local, remote):\n"
185            "\t\tCalled then the process is killed, if filter_init was provided. "
186                "It should, among other things, close the file descriptors.\n"
187            "\n"
188            "Python modules are expected to return a tuple in filter_init, "
189            "either of file descriptors or file objects, while native ones "
190            "will receive two int*.\n"
191            "\n"
192            "Python modules can additionally contain a custom queue class "
193            "that will replace the FIFO used by default. The class should "
194            "be named 'queueclass' and contain an interface compatible with "
195            "collections.deque. That is, indexing (especiall for q[0]), "
196            "bool(q), popleft, appendleft, pop (right), append (right), "
197            "len(q) and clear. When using a custom queue, queue size will "
198            "have no effect, pass an effective queue size to the module "
199            "by using filter_args" )
200 parser.add_option(
201     "--filter-args", dest="filter_args", metavar="FILE",
202     default = None,
203     help = "If specified, packets won't be logged to standard output, "
204            "but dumped to a pcap-formatted trace in the specified file. " )
205
206 (options,args) = parser.parse_args(sys.argv[1:])
207
208 options.cipher = {
209     'aes' : 'AES',
210     'des' : 'DES',
211     'des3' : 'DES3',
212     'blowfish' : 'Blowfish',
213     'plain' : None,
214 }[options.cipher.lower()]
215
216 ETH_P_ALL = 0x00000003
217 ETH_P_IP = 0x00000800
218 TUNSETIFF = 0x400454ca
219 IFF_NO_PI = 0x00001000
220 IFF_TAP = 0x00000002
221 IFF_TUN = 0x00000001
222 IFF_VNET_HDR = 0x00004000
223 TUN_PKT_STRIP = 0x00000001
224 IFHWADDRLEN = 0x00000006
225 IFNAMSIZ = 0x00000010
226 IFREQ_SZ = 0x00000028
227 FIONREAD = 0x0000541b
228
229 class HostLock(object):
230     # This class is used as a lock to prevent concurrency issues with more
231     # than one instance of netns running in the same machine. Both in 
232     # different processes or different threads.
233     taken = False
234     processcond = threading.Condition()
235     
236     def __init__(self, lockfile):
237         processcond = self.__class__.processcond
238         
239         processcond.acquire()
240         try:
241             # It's not reentrant
242             while self.__class__.taken:
243                 processcond.wait()
244             self.__class__.taken = True
245         finally:
246             processcond.release()
247         
248         self.lockfile = lockfile
249         
250         while True:
251             try:
252                 fcntl.flock(self.lockfile, fcntl.LOCK_EX)
253                 break
254             except (OSError, IOError), e:
255                 if e.args[0] != os.errno.EINTR:
256                     raise
257     
258     def __del__(self):
259         processcond = self.__class__.processcond
260         
261         processcond.acquire()
262         try:
263             if not self.lockfile.closed:
264                 fcntl.flock(self.lockfile, fcntl.LOCK_UN)
265             
266             # It's not reentrant
267             self.__class__.taken = False
268             processcond.notify()
269         finally:
270             processcond.release()
271
272 def ifnam(x):
273     return x+'\x00'*(IFNAMSIZ-len(x))
274
275 def ifreq(iface, flags):
276     # ifreq contains:
277     #   char[IFNAMSIZ] : interface name
278     #   short : flags
279     #   <padding>
280     ifreq = ifnam(iface)+struct.pack("H",flags);
281     ifreq += '\x00' * (len(ifreq)-IFREQ_SZ)
282     return ifreq
283
284 def tunopen(tun_path, tun_name):
285     if tun_path.isdigit():
286         # open TUN fd
287         print >>sys.stderr, "Using tun:", tun_name, "fd", tun_path
288         tun = os.fdopen(int(tun_path), 'r+b', 0)
289     else:
290         # open TUN path
291         print >>sys.stderr, "Using tun:", tun_name, "at", tun_path
292         tun = open(tun_path, 'r+b', 0)
293
294         # bind file descriptor to the interface
295         fcntl.ioctl(tun.fileno(), TUNSETIFF, ifreq(tun_name, IFF_NO_PI|IFF_TUN))
296     
297     return tun
298
299 def tunclose(tun_path, tun_name, tun):
300     if tun_path and tun_path.isdigit():
301         # close TUN fd
302         os.close(int(tun_path))
303         tun.close()
304     elif tun:
305         # close TUN object
306         tun.close()
307
308 def noopen(tun_path, tun_name):
309     print >>sys.stderr, "Using tun:", tun_name
310     return None
311 def noclose(tun_path, tun_name, tun):
312     pass
313
314 def tuntap_alloc(kind, tun_path, tun_name):
315     args = ["tunctl"]
316     if kind == "tun":
317         args.append("-n")
318     if tun_name:
319         args.append("-t")
320         args.append(tun_name)
321     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
322     out,err = proc.communicate()
323     if proc.wait():
324         raise RuntimeError, "Could not allocate %s device" % (kind,)
325         
326     match = re.search(r"Set '(?P<dev>(?:tun|tap)[0-9]*)' persistent and owned by .*", out, re.I)
327     if not match:
328         raise RuntimeError, "Could not allocate %s device - tunctl said: %s" % (kind, out)
329     
330     tun_name = match.group("dev")
331     print >>sys.stderr, "Allocated %s device: %s" % (kind, tun_name)
332     
333     return tun_path, tun_name
334
335 def tuntap_dealloc(tun_path, tun_name):
336     args = ["tunctl", "-d", tun_name]
337     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
338     out,err = proc.communicate()
339     if proc.wait():
340         print >> sys.stderr, "WARNING: error deallocating %s device" % (tun_name,)
341
342 def nmask_to_dot_notation(mask):
343     mask = hex(((1 << mask) - 1) << (32 - mask)) # 24 -> 0xFFFFFF00
344     mask = mask[2:] # strip 0x
345     mask = mask.decode("hex") # to bytes
346     mask = '.'.join(map(str,map(ord,mask))) # to 255.255.255.0
347     return mask
348
349 def vif_start(tun_path, tun_name):
350     args = ["ifconfig", tun_name, options.vif_addr, 
351             "netmask", nmask_to_dot_notation(options.vif_mask),
352             "-arp" ]
353     if options.vif_pointopoint:
354         args.extend(["pointopoint",options.vif_pointopoint])
355     if options.vif_txqueuelen is not None:
356         args.extend(["txqueuelen",str(options.vif_txqueuelen)])
357     args.append("up")
358     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
359     out,err = proc.communicate()
360     if proc.wait():
361         raise RuntimeError, "Error starting virtual interface"
362     
363     if options.vif_snat:
364         # set up SNAT using iptables
365         # TODO: stop vif on error. 
366         #   Not so necessary since deallocating the tun/tap device
367         #   will forcibly stop it, but it would be tidier
368         args = [ "iptables", "-t", "nat", "-A", "POSTROUTING", 
369                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
370                  "-j", "SNAT",
371                  "--to-source", hostaddr, "--random" ]
372         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
373         out,err = proc.communicate()
374         if proc.wait():
375             raise RuntimeError, "Error setting up SNAT"
376
377 def vif_stop(tun_path, tun_name):
378     if options.vif_snat:
379         # set up SNAT using iptables
380         args = [ "iptables", "-t", "nat", "-D", "POSTROUTING", 
381                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
382                  "-j", "SNAT",
383                  "--to-source", hostaddr, "--random" ]
384         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
385         out,err = proc.communicate()
386     
387     args = ["ifconfig", tun_name, "down"]
388     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
389     out,err = proc.communicate()
390     if proc.wait():
391         print >>sys.stderr, "WARNING: error stopping virtual interface"
392     
393     
394 def pl_tuntap_alloc(kind, tun_path, tun_name):
395     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
396     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
397     kind = {"tun":IFF_TUN,
398             "tap":IFF_TAP}[kind]
399     fd = tunalloc_so.tun_alloc(kind, c_tun_name)
400     name = c_tun_name.value
401     return str(fd), name
402
403 _name_reservation = None
404 def pl_tuntap_namealloc(kind, tun_path, tun_name):
405     global _name_reservation
406     # Serialize access
407     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
408     lock = HostLock(lockfile)
409     
410     # We need to do this, fd_tuntap is the only one who can
411     # tell us our slice id (this script runs as root, so no uid),
412     # and the pattern of device names accepted by vsys scripts
413     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
414     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
415     nkind= {"tun":IFF_TUN,
416             "tap":IFF_TAP}[kind]
417     fd = tunalloc_so.tun_alloc(nkind, c_tun_name)
418     name = c_tun_name.value
419     os.close(fd)
420
421     base = name[:name.index('-')+1]
422     existing = set(map(str.strip,os.popen("ip a | grep -o '%s[0-9]*'" % (base,)).read().strip().split('\n')))
423     
424     for i in xrange(9000,10000):
425         name = base + str(i)
426         if name not in existing:
427             break
428     else:
429         raise RuntimeError, "Could not assign interface name"
430     
431     _name_reservation = lock
432     
433     return None, name
434
435 def pl_vif_start(tun_path, tun_name):
436     global _name_reservation
437
438     out = []
439     def outreader():
440         out.append(stdout.read())
441         stdout.close()
442         time.sleep(1)
443
444     # Serialize access to vsys
445     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
446     lock = _name_reservation or HostLock(lockfile)
447     _name_reservation = None
448     
449     stdin = open("/vsys/vif_up.in","w")
450     stdout = open("/vsys/vif_up.out","r")
451
452     t = threading.Thread(target=outreader)
453     t.start()
454     
455     stdin.write(tun_name+"\n")
456     stdin.write(options.vif_addr+"\n")
457     stdin.write(str(options.vif_mask)+"\n")
458     if options.vif_snat:
459         stdin.write("snat=1\n")
460     if options.vif_pointopoint:
461         stdin.write("pointopoint=%s\n" % (options.vif_pointopoint,))
462     if options.vif_txqueuelen is not None:
463         stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
464     if options.mode.startswith('pl-gre'):
465         stdin.write("gre=%s\n" % (options.gre_key,))
466         stdin.write("remote=%s\n" % (options.peer_addr,))
467     stdin.close()
468     
469     t.join()
470     out = ''.join(out)
471     if out.strip():
472         print >>sys.stderr, out
473     
474     del lock, lockfile
475
476 def pl_vif_stop(tun_path, tun_name):
477     out = []
478     def outreader():
479         out.append(stdout.read())
480         stdout.close()
481         
482         if options.mode.startswith('pl-gre'):
483             lim = 120
484         else:
485             lim = 2
486         
487         for i in xrange(lim):
488             ifaces = set(map(str.strip,os.popen("ip a | grep -o '%s'" % (tun_name,)).read().strip().split('\n')))
489             if tun_name in ifaces:
490                 time.sleep(1)
491             else:
492                 break
493
494     # Serialize access to vsys
495     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
496     lock = HostLock(lockfile)
497
498     stdin = open("/vsys/vif_down.in","w")
499     stdout = open("/vsys/vif_down.out","r")
500     
501     t = threading.Thread(target=outreader)
502     t.start()
503     
504     stdin.write(tun_name+"\n")
505     stdin.close()
506     
507     t.join()
508     out = ''.join(out)
509     if out.strip():
510         print >>sys.stderr, out
511     
512     del lock, lockfile
513
514
515 def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True, bwlimit = None):
516     global TERMINATE
517     
518     tunqueue = options.vif_txqueuelen or 1000
519     tunkqueue = 500
520     
521     # in PL mode, we cannot strip PI structs
522     # so we'll have to handle them
523     tunchannel.tun_fwd(tun, remote,
524         with_pi = options.mode.startswith('pl-'),
525         ether_mode = tun_name.startswith('tap'),
526         cipher_key = options.cipher_key,
527         udp = options.protocol == 'udp',
528         TERMINATE = TERMINATE,
529         stderr = None,
530         reconnect = reconnect,
531         tunqueue = tunqueue,
532         tunkqueue = tunkqueue,
533         cipher = options.cipher,
534         accept_local = accept_local,
535         accept_remote = accept_remote,
536         queueclass = queueclass,
537         slowlocal = slowlocal,
538         bwlimit = bwlimit
539     )
540
541
542
543 nop = lambda tun_path, tun_name : (tun_path, tun_name)
544 MODEINFO = {
545     'none' : dict(alloc=nop,
546                   tunopen=tunopen, tunclose=tunclose,
547                   dealloc=nop,
548                   start=nop,
549                   stop=nop),
550     'tun'  : dict(alloc=functools.partial(tuntap_alloc, "tun"),
551                   tunopen=tunopen, tunclose=tunclose,
552                   dealloc=tuntap_dealloc,
553                   start=vif_start,
554                   stop=vif_stop),
555     'tap'  : dict(alloc=functools.partial(tuntap_alloc, "tap"),
556                   tunopen=tunopen, tunclose=tunclose,
557                   dealloc=tuntap_dealloc,
558                   start=vif_start,
559                   stop=vif_stop),
560     'pl-tun'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tun"),
561                   tunopen=tunopen, tunclose=tunclose,
562                   dealloc=nop,
563                   start=pl_vif_start,
564                   stop=pl_vif_stop),
565     'pl-tap'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tap"),
566                   tunopen=tunopen, tunclose=tunclose,
567                   dealloc=nop,
568                   start=pl_vif_start,
569                   stop=pl_vif_stop),
570     'pl-gre-ip' : dict(alloc=functools.partial(pl_tuntap_namealloc, "tun"),
571                   tunopen=noopen, tunclose=tunclose,
572                   dealloc=nop,
573                   start=pl_vif_start,
574                   stop=pl_vif_stop),
575     'pl-gre-eth': dict(alloc=functools.partial(pl_tuntap_namealloc, "tap"),
576                   tunopen=noopen, tunclose=noclose,
577                   dealloc=nop,
578                   start=pl_vif_start,
579                   stop=pl_vif_stop),
580 }
581     
582 tun_path = options.tun_path
583 tun_name = options.tun_name
584
585 modeinfo = MODEINFO[options.mode]
586
587 # Try to load filter module
588 filter_thread = None
589 if options.filter_module:
590     print >>sys.stderr, "Loading module", options.filter_module, "with args", options.filter_args
591     if options.filter_module.endswith('.py'):
592         sys.path.append(os.path.dirname(options.filter_module))
593         filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
594         if options.filter_args:
595             try:
596                 filter_args = dict(map(lambda x:x.split('=',1),options.filter_args.split(',')))
597                 filter_module.init(**filter_args)
598             except:
599                 traceback.print_exc()
600     elif options.filter_module.endswith('.so'):
601         filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
602         if options.filter_args:
603             try:
604                 filter_module.init(options.filter_args)
605             except:
606                 traceback.print_exc()
607     try:
608         accept_packet = filter_module.accept_packet
609         print >>sys.stderr, "Installing packet filter (accept_packet)"
610     except:
611         accept_packet = None
612     
613     try:
614         queueclass = filter_module.queueclass
615         print >>sys.stderr, "Installing custom queue"
616     except:
617         queueclass = None
618     
619     try:
620         _filter_init = filter_module.filter_init
621         filter_run = filter_module.filter_run
622         filter_close = filter_module.filter_close
623         
624         def filter_init():
625             filter_local = ctypes.c_int(0)
626             filter_remote = ctypes.c_int(0)
627             _filter_init(filter_local, filter_remote)
628             return filter_local, filter_remote
629
630         print >>sys.stderr, "Installing packet filter (stream filter)"
631     except:
632         filter_init = None
633         filter_run = None
634         filter_close = None
635 else:
636     accept_packet = None
637     filter_init = None
638     filter_run = None
639     filter_close = None
640     queueclass = None
641
642 # install multicast forwarding hook
643 if options.multicast_fwd:
644     print >>sys.stderr, "Connecting to mcast filter"
645     mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
646     #disable nonblocking, cannot handle EWOULDBLOCK
647     #tunchannel.nonblock(mcfwd_sock.fileno())
648     mcfwd_sock.settimeout(0.5) # 500ms tops - packet lost if it blocks more than that
649
650 # be careful to roll back stuff on exceptions
651 tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
652 try:
653     modeinfo['start'](tun_path, tun_name)
654     try:
655         tun = modeinfo['tunopen'](tun_path, tun_name)
656     except:
657         modeinfo['stop'](tun_path, tun_name)
658         raise
659 except:
660     modeinfo['dealloc'](tun_path, tun_name)
661     raise
662
663
664 # Trak SIGTERM, and set global termination flag instead of dying
665 TERMINATE = []
666 def _finalize(sig,frame):
667     global TERMINATE
668     TERMINATE.append(None)
669 signal.signal(signal.SIGTERM, _finalize)
670
671 try:
672     tcpdump = None
673     reconnect = None
674     mcfwd_thread = None
675
676     # install multicast forwarding hook
677     if options.multicast_fwd:
678         print >>sys.stderr, "Installing mcast filter"
679         
680         if HAS_IOVEC:
681             writev = iovec.writev
682         else:
683             os_write = os.write
684             map_ = map
685             str_ = str
686             def writev(fileno, *stuff):
687                 os_write(''.join(map_(str_,stuff)))
688         
689         
690         mcfwd_queue = Queue(options.vif_txqueuelen or 500)
691         def mcfwd_thread_fn(
692                 sock=mcfwd_sock, 
693                 sockno=mcfwd_sock.fileno(),
694                 multicast_fwd = options.multicast_fwd,
695                 vif_addr = socket.inet_aton(options.vif_addr),
696                 writev=writev,
697                 retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR),
698                 len=len, ord=ord):
699             TERMINATE_ = TERMINATE
700             connected = False
701             
702             while not TERMINATE_:
703                 try:
704                     fwd = mcfwd_queue.get(True, 1)
705                 except:
706                     continue
707                 
708                 # Forward it
709                 if not connected:
710                     try:
711                         sock.connect(multicast_fwd)
712                         connected = True
713                     except:
714                         traceback.print_exc(file=sys.stderr)
715                 if connected:
716                     try:
717                         writev(sockno, vif_addr,fwd)
718                     except OSError,e:
719                         if e.errno not in retrycodes:
720                             traceback.print_exc(file=sys.stderr)
721                         else:
722                             try:
723                                 writev(sockno, vif_addr,fwd)
724                             except:
725                                 traceback.print_exc(file=sys.stderr)
726                     except socket.timeout:
727                         # packet lost
728                         continue
729                     except:
730                         traceback.print_exc(file=sys.stderr)
731                 
732                 mcfwd_queue.task_done()
733         mcfwd_thread = threading.Thread(target=mcfwd_thread_fn)
734         mcfwd_thread.start()
735         
736         def accept_packet(packet, direction, 
737                 _up_accept=accept_packet, 
738                 etherProto=tunchannel.etherProto,
739                 etherStrip=tunchannel.etherStrip,
740                 etherMode=tun_name.startswith('tap'),
741                 len=len, ord=ord):
742             if _up_accept:
743                 rv = _up_accept(packet, direction)
744                 if not rv:
745                     return rv
746
747             if direction == 1:
748                 # Incoming... what?
749                 if etherMode:
750                     if etherProto(packet)=='\x08\x00':
751                         fwd = etherStrip(packet)
752                     else:
753                         fwd = None
754                 else:
755                     fwd = packet
756                 if fwd is not None and len(fwd) >= 20:
757                     if (ord(fwd[16]) & 0xf0) == 0xe0:
758                         # Queue for forwarding
759                         try:
760                             mcfwd_queue.put_nowait(fwd)
761                         except:
762                             print >>sys.stderr, "Multicast packet dropped, forwarder queue full"
763             return 1
764
765     
766     if options.protocol == 'fd':
767         if accept_packet or filter_init:
768             raise NotImplementedError, "--pass-fd and --filter are not compatible"
769         
770         if options.pass_fd.startswith("base64:"):
771             options.pass_fd = base64.b64decode(
772                 options.pass_fd[len("base64:"):])
773             options.pass_fd = os.path.expandvars(options.pass_fd)
774         
775         print >>sys.stderr, "Sending FD to: %r" % (options.pass_fd,)
776         
777         # send FD to whoever wants it
778         import passfd
779         
780         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
781         retrydelay = 1.0
782         for i in xrange(30):
783             if TERMINATE:
784                 raise OSError, "Killed"
785             try:
786                 sock.connect(options.pass_fd)
787                 break
788             except socket.error:
789                 # wait a while, retry
790                 print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
791                 time.sleep(min(30.0,retrydelay))
792                 retrydelay *= 1.1
793         else:
794             sock.connect(options.pass_fd)
795         passfd.sendfd(sock, tun.fileno(), '0')
796         
797         # just wait forever
798         def tun_fwd(tun, remote, **kw):
799             global TERMINATE
800             TERM = TERMINATE
801             while not TERM:
802                 time.sleep(1)
803         remote = None
804     elif options.protocol == "gre":
805         if accept_packet or filter_init:
806             raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
807         
808         # just wait forever
809         def tun_fwd(tun, remote, **kw):
810             global TERMINATE
811             TERM = TERMINATE
812             while not TERM:
813                 time.sleep(1)
814         remote = options.peer_addr
815     elif options.protocol == "udp":
816         # connect to remote endpoint
817         if options.peer_addr and options.peer_port:
818             rsock = tunchannel.udp_establish(TERMINATE, hostaddr, options.port, 
819                     options.peer_addr, options.peer_port)
820             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
821         else:
822             print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
823             raise AssertionError, "Error: need a remote endpoint in UDP mode"
824     elif options.protocol == "tcp":
825         # connect to remote endpoint
826         if options.peer_addr and options.peer_port:
827             rsock = tunchannel.tcp_establish(TERMINATE, hostaddr, options.port,
828                     options.peer_addr, options.peer_port)
829             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
830         else:
831             print >>sys.stderr, "Error: need a remote endpoint in TCP mode"
832             raise AssertionError, "Error: need a remote endpoint in TCP mode"
833     else:
834         msg = "Error: Invalid protocol %s" % options.protocol
835         print >>sys.stderr, msg 
836         raise AssertionError, msg
837
838     if filter_init:
839         filter_local, filter_remote = filter_init()
840         
841         def filter_loop():
842             global TERMINATE
843             TERM = TERMINATE
844             run = filter_run
845             local = filter_local
846             remote = filter_remote
847             while not TERM:
848                 run(local, remote)
849             filter_close(local, remote)
850             
851         filter_thread = threading.Thread(target=filter_loop)
852         filter_thread.start()
853     
854     print >>sys.stderr, "Connected"
855
856     if not options.no_capture:
857         # Launch a tcpdump subprocess, to capture and dump packets.
858         # Make sure to catch sigterm and kill the tcpdump as well
859         tcpdump = subprocess.Popen(
860             ["tcpdump","-l","-n","-i",tun_name, "-s", "4096"]
861             + ["-w",options.pcap_capture,"-U"] * bool(options.pcap_capture) )
862     
863     # Try to give us high priority
864     try:
865         os.nice(-20)
866     except:
867         # Ignore errors, we might not have enough privileges,
868         # or perhaps there is no os.nice support in the system
869         pass
870     
871     if not filter_init:
872         tun_fwd(tun, remote,
873             reconnect = reconnect,
874             accept_local = accept_packet,
875             accept_remote = accept_packet,
876             bwlimit = options.bwlimit,
877             slowlocal = True)
878     else:
879         # Hm...
880         # ...ok, we need to:
881         #  1. Forward packets from tun to filter
882         #  2. Forward packets from remote to filter
883         #
884         # 1. needs TUN rate-limiting, while 
885         # 2. needs reconnection
886         #
887         # 1. needs ONLY TUN-side acceptance checks, while
888         # 2. needs ONLY remote-side acceptance checks
889         if isinstance(filter_local, ctypes.c_int):
890             filter_local_fd = filter_local.value
891         else:
892             filter_local_fd = filter_local
893         if isinstance(filter_remote, ctypes.c_int):
894             filter_remote_fd = filter_remote.value
895         else:
896             filter_remote_fd = filter_remote
897
898         def localside():
899             tun_fwd(tun, filter_local_fd,
900                 accept_local = accept_packet,
901                 slowlocal = True)
902         
903         def remoteside():
904             tun_fwd(filter_remote_fd, remote,
905                 reconnect = reconnect,
906                 accept_remote = accept_packet,
907                 bwlimit = options.bwlimit,
908                 slowlocal = False)
909         
910         localthread = threading.Thread(target=localside)
911         remotethread = threading.Thread(target=remoteside)
912         localthread.start()
913         remotethread.start()
914         localthread.join()
915         remotethread.join()
916
917 finally:
918     try:
919         print >>sys.stderr, "Shutting down..."
920     except:
921         # In case sys.stderr is broken
922         pass
923     
924     # tidy shutdown in every case - swallow exceptions
925     TERMINATE.append(None)
926     
927     if mcfwd_thread:
928         try:
929             mcfwd_thread.join()
930         except:
931             pass
932
933     if filter_thread:
934         try:
935             filter_thread.join()
936         except:
937             pass
938
939     try:
940         if tcpdump:
941             os.kill(tcpdump.pid, signal.SIGTERM)
942             tcpdump.wait()
943     except:
944         pass
945
946     try:
947         modeinfo['stop'](tun_path, tun_name)
948     except:
949         traceback.print_exc()
950
951     try:
952         modeinfo['tunclose'](tun_path, tun_name, tun)
953     except:
954         traceback.print_exc()
955         
956     try:
957         modeinfo['dealloc'](tun_path, tun_name)
958     except:
959         traceback.print_exc()
960     
961     print >>sys.stderr, "TERMINATED GRACEFULLY"
962