f015bade32eb6297c3d8873eb5f554dd9c9ad2fd
[nepi.git] / src / nepi / testbeds / planetlab / scripts / tun_connect.py
1 import sys
2
3 import socket
4 import fcntl
5 import os
6 import os.path
7 import select
8 import signal
9
10 import struct
11 import ctypes
12 import optparse
13 import threading
14 import subprocess
15 import re
16 import functools
17 import time
18 import base64
19 import traceback
20
21 import tunchannel
22
23 try:
24     import iovec
25     HAS_IOVEC = True
26 except:
27     HAS_IOVEC = False
28
29 tun_name = 'tun0'
30 tun_path = '/dev/net/tun'
31 hostaddr = socket.gethostbyname(socket.gethostname())
32
33 usage = "usage: %prog [options]"
34
35 parser = optparse.OptionParser(usage=usage)
36
37 parser.add_option(
38     "-i", "--iface", dest="tun_name", metavar="DEVICE",
39     default = "tun0",
40     help = "TUN/TAP interface to tap into")
41 parser.add_option(
42     "-d", "--tun-path", dest="tun_path", metavar="PATH",
43     default = "/dev/net/tun",
44     help = "TUN/TAP device file path or file descriptor number")
45 parser.add_option(
46     "-p", "--peer-port", dest="peer_port", metavar="PEER_PORT", type="int",
47     default = 15000,
48     help = "Remote TCP/UDP port to connect to.")
49 parser.add_option(
50     "--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET",
51     default = None,
52     help = "Path to a unix-domain socket to pass the TUN file descriptor to. "
53            "If given, all other connectivity options are ignored, tun_connect will "
54            "simply wait to be killed after passing the file descriptor, and it will be "
55            "the receiver's responsability to handle the tunneling.")
56 parser.add_option(
57     "-m", "--mode", dest="mode", metavar="MODE",
58     default = "none",
59     help = 
60         "Set mode. One of none, tun, tap, pl-tun, pl-tap, pl-gre-ip, pl-gre-eth. In any mode except none, a TUN/TAP will be created "
61         "by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), "
62         "and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have "
63         "to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).")
64 parser.add_option(
65     "-t", "--protocol", dest="protocol", metavar="PROTOCOL",
66     default = None,
67     help = 
68         "Set protocol. One of tcp, udp, fd, gre. In any mode except none, a TUN/TAP will be created.")
69 parser.add_option(
70     "-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS",
71     default = None,
72     help = 
73         "See mode. This specifies the VIF_ADDRESS, "
74         "the IP address of the virtual interface.")
75 parser.add_option(
76     "-M", "--vif-mask", dest="vif_mask", type="int", metavar="VIF_MASK", 
77     default = None,
78     help = 
79         "See mode. This specifies the VIF_MASK, "
80         "a number indicating the network type (ie: 24 for a C-class network).")
81 parser.add_option(
82     "-P", "--port", dest="port", type="int", metavar="PORT", 
83     default = None,
84     help = 
85         "This specifies the LOCAL_PORT. This will be the local bind port for UDP/TCP.")
86 parser.add_option(
87     "-S", "--vif-snat", dest="vif_snat", 
88     action = "store_true",
89     default = False,
90     help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " )
91 parser.add_option(
92     "-Z", "--vif-pointopoint", dest="vif_pointopoint",  metavar="DST_ADDR",
93     default = None,
94     help = 
95         "See mode. This specifies the remote endpoint's virtual address, "
96         "for point-to-point routing configuration. "
97         "Not supported by PlanetLab" )
98 parser.add_option(
99     "-Q", "--vif-txqueuelen", dest="vif_txqueuelen", metavar="SIZE", type="int",
100     default = None,
101     help = 
102         "See mode. This specifies the interface's transmission queue length. " )
103 parser.add_option(
104     "-b", "--bwlimit", dest="bwlimit", metavar="BYTESPERSECOND", type="int",
105     default = None,
106     help = 
107         "This specifies the interface's emulated bandwidth in bytes per second." )
108 parser.add_option(
109     "-a", "--peer-address", dest="peer_addr", metavar="PEER_ADDRESS",
110     default = None,
111     help = 
112         "This specifies the PEER_ADDRESS, "
113         "the IP address of the remote interface.")
114 parser.add_option(
115     "-k", "--key", dest="cipher_key", metavar="KEY",
116     default = None,
117     help = 
118         "Specify a symmetric encryption key with which to protect packets across "
119         "the tunnel. python-crypto must be installed on the system." )
120 parser.add_option(
121     "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
122     default = "true",
123     help = 
124         "Specify a demultiplexing 32-bit numeric key for GRE." )
125 parser.add_option(
126     "-C", "--cipher", dest="cipher", metavar="CIPHER",
127     default = 'AES',
128     help = "One of PLAIN, AES, Blowfish, DES, DES3. " )
129 parser.add_option(
130     "-N", "--no-capture", dest="no_capture", 
131     action = "store_true",
132     default = False,
133     help = "If specified, packets won't be logged to standard output "
134            "(default is to log them to standard output). " )
135 parser.add_option(
136     "-c", "--pcap-capture", dest="pcap_capture", metavar="FILE",
137     default = None,
138     help = "If specified, packets won't be logged to standard output, "
139            "but dumped to a pcap-formatted trace in the specified file. " )
140 parser.add_option(
141     "--multicast-forwarder", dest="multicast_fwd", 
142     default = None,
143     help = "If specified, multicast packets will be forwarded to "
144            "the specified unix-domain socket. If the device uses ethernet "
145            "frames, ethernet headers will be stripped and IP packets "
146            "will be forwarded, prefixed with the interface's address." )
147 parser.add_option(
148     "--filter", dest="filter_module", metavar="PATH",
149     default = None,
150     help = "If specified, it should be either a .py or .so module. "
151            "It will be loaded, and all incoming and outgoing packets "
152            "will be routed through it. The filter will not be responsible "
153            "for buffering, packet queueing is performed in tun_connect "
154            "already, so it should not concern itself with it. It should "
155            "not, however, block in one direction if the other is congested.\n"
156            "\n"
157            "Modules are expected to have the following methods:\n"
158            "\tinit(**args)\n"
159            "\t\tIf arguments are given, this method will be called with the\n"
160            "\t\tgiven arguments (as keyword args in python modules, or a single\n"
161            "\t\tstring in c modules).\n"
162            "\taccept_packet(packet, direction):\n"
163            "\t\tDecide whether to drop the packet. Direction is 0 for packets "
164                "coming from the local side to the remote, and 1 is for packets "
165                "coming from the remote side to the local. Return a boolean, "
166                "true if the packet is not to be dropped.\n"
167            "\tfilter_init():\n"
168            "\t\tInitializes a filtering pipe (filter_run). It should "
169                "return two file descriptors to use as a bidirectional "
170                "pipe: local and remote. 'local' is where packets from the "
171                "local side will be written to. After filtering, those packets "
172                "should be written to 'remote', where tun_connect will read "
173                "from, and it will forward them to the remote peer. "
174                "Packets from the remote peer will be written to 'remote', "
175                "where the filter is expected to read from, and eventually "
176                "forward them to the local side. If the file descriptors are "
177                "not nonblocking, they will be set to nonblocking. So it's "
178                "better to set them from the start like that.\n"
179            "\tfilter_run(local, remote):\n"
180            "\t\tIf filter_init is provided, it will be called repeatedly, "
181                "in a separate thread until the process is killed. It should "
182                "sleep at most for a second.\n"
183            "\tfilter_close(local, remote):\n"
184            "\t\tCalled then the process is killed, if filter_init was provided. "
185                "It should, among other things, close the file descriptors.\n"
186            "\n"
187            "Python modules are expected to return a tuple in filter_init, "
188            "either of file descriptors or file objects, while native ones "
189            "will receive two int*.\n"
190            "\n"
191            "Python modules can additionally contain a custom queue class "
192            "that will replace the FIFO used by default. The class should "
193            "be named 'queueclass' and contain an interface compatible with "
194            "collections.deque. That is, indexing (especiall for q[0]), "
195            "bool(q), popleft, appendleft, pop (right), append (right), "
196            "len(q) and clear. When using a custom queue, queue size will "
197            "have no effect, pass an effective queue size to the module "
198            "by using filter_args" )
199 parser.add_option(
200     "--filter-args", dest="filter_args", metavar="FILE",
201     default = None,
202     help = "If specified, packets won't be logged to standard output, "
203            "but dumped to a pcap-formatted trace in the specified file. " )
204
205 (options,args) = parser.parse_args(sys.argv[1:])
206
207 options.cipher = {
208     'aes' : 'AES',
209     'des' : 'DES',
210     'des3' : 'DES3',
211     'blowfish' : 'Blowfish',
212     'plain' : None,
213 }[options.cipher.lower()]
214
215 ETH_P_ALL = 0x00000003
216 ETH_P_IP = 0x00000800
217 TUNSETIFF = 0x400454ca
218 IFF_NO_PI = 0x00001000
219 IFF_TAP = 0x00000002
220 IFF_TUN = 0x00000001
221 IFF_VNET_HDR = 0x00004000
222 TUN_PKT_STRIP = 0x00000001
223 IFHWADDRLEN = 0x00000006
224 IFNAMSIZ = 0x00000010
225 IFREQ_SZ = 0x00000028
226 FIONREAD = 0x0000541b
227
228 class HostLock(object):
229     # This class is used as a lock to prevent concurrency issues with more
230     # than one instance of netns running in the same machine. Both in 
231     # different processes or different threads.
232     taken = False
233     processcond = threading.Condition()
234     
235     def __init__(self, lockfile):
236         processcond = self.__class__.processcond
237         
238         processcond.acquire()
239         try:
240             # It's not reentrant
241             while self.__class__.taken:
242                 processcond.wait()
243             self.__class__.taken = True
244         finally:
245             processcond.release()
246         
247         self.lockfile = lockfile
248         
249         while True:
250             try:
251                 fcntl.flock(self.lockfile, fcntl.LOCK_EX)
252                 break
253             except (OSError, IOError), e:
254                 if e.args[0] != os.errno.EINTR:
255                     raise
256     
257     def __del__(self):
258         processcond = self.__class__.processcond
259         
260         processcond.acquire()
261         try:
262             if not self.lockfile.closed:
263                 fcntl.flock(self.lockfile, fcntl.LOCK_UN)
264             
265             # It's not reentrant
266             self.__class__.taken = False
267             processcond.notify()
268         finally:
269             processcond.release()
270
271 def ifnam(x):
272     return x+'\x00'*(IFNAMSIZ-len(x))
273
274 def ifreq(iface, flags):
275     # ifreq contains:
276     #   char[IFNAMSIZ] : interface name
277     #   short : flags
278     #   <padding>
279     ifreq = ifnam(iface)+struct.pack("H",flags);
280     ifreq += '\x00' * (len(ifreq)-IFREQ_SZ)
281     return ifreq
282
283 def tunopen(tun_path, tun_name):
284     if tun_path.isdigit():
285         # open TUN fd
286         print >>sys.stderr, "Using tun:", tun_name, "fd", tun_path
287         tun = os.fdopen(int(tun_path), 'r+b', 0)
288     else:
289         # open TUN path
290         print >>sys.stderr, "Using tun:", tun_name, "at", tun_path
291         tun = open(tun_path, 'r+b', 0)
292
293         # bind file descriptor to the interface
294         fcntl.ioctl(tun.fileno(), TUNSETIFF, ifreq(tun_name, IFF_NO_PI|IFF_TUN))
295     
296     return tun
297
298 def tunclose(tun_path, tun_name, tun):
299     if tun_path and tun_path.isdigit():
300         # close TUN fd
301         os.close(int(tun_path))
302         tun.close()
303     elif tun:
304         # close TUN object
305         tun.close()
306
307 def noopen(tun_path, tun_name):
308     print >>sys.stderr, "Using tun:", tun_name
309     return None
310 def noclose(tun_path, tun_name, tun):
311     pass
312
313 def tuntap_alloc(kind, tun_path, tun_name):
314     args = ["tunctl"]
315     if kind == "tun":
316         args.append("-n")
317     if tun_name:
318         args.append("-t")
319         args.append(tun_name)
320     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
321     out,err = proc.communicate()
322     if proc.wait():
323         raise RuntimeError, "Could not allocate %s device" % (kind,)
324         
325     match = re.search(r"Set '(?P<dev>(?:tun|tap)[0-9]*)' persistent and owned by .*", out, re.I)
326     if not match:
327         raise RuntimeError, "Could not allocate %s device - tunctl said: %s" % (kind, out)
328     
329     tun_name = match.group("dev")
330     print >>sys.stderr, "Allocated %s device: %s" % (kind, tun_name)
331     
332     return tun_path, tun_name
333
334 def tuntap_dealloc(tun_path, tun_name):
335     args = ["tunctl", "-d", tun_name]
336     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
337     out,err = proc.communicate()
338     if proc.wait():
339         print >> sys.stderr, "WARNING: error deallocating %s device" % (tun_name,)
340
341 def nmask_to_dot_notation(mask):
342     mask = hex(((1 << mask) - 1) << (32 - mask)) # 24 -> 0xFFFFFF00
343     mask = mask[2:] # strip 0x
344     mask = mask.decode("hex") # to bytes
345     mask = '.'.join(map(str,map(ord,mask))) # to 255.255.255.0
346     return mask
347
348 def vif_start(tun_path, tun_name):
349     args = ["ifconfig", tun_name, options.vif_addr, 
350             "netmask", nmask_to_dot_notation(options.vif_mask),
351             "-arp" ]
352     if options.vif_pointopoint:
353         args.extend(["pointopoint",options.vif_pointopoint])
354     if options.vif_txqueuelen is not None:
355         args.extend(["txqueuelen",str(options.vif_txqueuelen)])
356     args.append("up")
357     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
358     out,err = proc.communicate()
359     if proc.wait():
360         raise RuntimeError, "Error starting virtual interface"
361     
362     if options.vif_snat:
363         # set up SNAT using iptables
364         # TODO: stop vif on error. 
365         #   Not so necessary since deallocating the tun/tap device
366         #   will forcibly stop it, but it would be tidier
367         args = [ "iptables", "-t", "nat", "-A", "POSTROUTING", 
368                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
369                  "-j", "SNAT",
370                  "--to-source", hostaddr, "--random" ]
371         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
372         out,err = proc.communicate()
373         if proc.wait():
374             raise RuntimeError, "Error setting up SNAT"
375
376 def vif_stop(tun_path, tun_name):
377     if options.vif_snat:
378         # set up SNAT using iptables
379         args = [ "iptables", "-t", "nat", "-D", "POSTROUTING", 
380                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
381                  "-j", "SNAT",
382                  "--to-source", hostaddr, "--random" ]
383         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
384         out,err = proc.communicate()
385     
386     args = ["ifconfig", tun_name, "down"]
387     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
388     out,err = proc.communicate()
389     if proc.wait():
390         print >>sys.stderr, "WARNING: error stopping virtual interface"
391     
392     
393 def pl_tuntap_alloc(kind, tun_path, tun_name):
394     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
395     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
396     kind = {"tun":IFF_TUN,
397             "tap":IFF_TAP}[kind]
398     fd = tunalloc_so.tun_alloc(kind, c_tun_name)
399     name = c_tun_name.value
400     return str(fd), name
401
402 _name_reservation = None
403 def pl_tuntap_namealloc(kind, tun_path, tun_name):
404     global _name_reservation
405     # Serialize access
406     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
407     lock = HostLock(lockfile)
408     
409     # We need to do this, fd_tuntap is the only one who can
410     # tell us our slice id (this script runs as root, so no uid),
411     # and the pattern of device names accepted by vsys scripts
412     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
413     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
414     nkind= {"tun":IFF_TUN,
415             "tap":IFF_TAP}[kind]
416     fd = tunalloc_so.tun_alloc(nkind, c_tun_name)
417     name = c_tun_name.value
418     os.close(fd)
419
420     base = name[:name.index('-')+1]
421     existing = set(map(str.strip,os.popen("ip a | grep -o '%s[0-9]*'" % (base,)).read().strip().split('\n')))
422     
423     for i in xrange(9000,10000):
424         name = base + str(i)
425         if name not in existing:
426             break
427     else:
428         raise RuntimeError, "Could not assign interface name"
429     
430     _name_reservation = lock
431     
432     return None, name
433
434 def pl_vif_start(tun_path, tun_name):
435     global _name_reservation
436
437     out = []
438     def outreader():
439         out.append(stdout.read())
440         stdout.close()
441         time.sleep(1)
442
443     # Serialize access to vsys
444     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
445     lock = _name_reservation or HostLock(lockfile)
446     _name_reservation = None
447     
448     stdin = open("/vsys/vif_up.in","w")
449     stdout = open("/vsys/vif_up.out","r")
450
451     t = threading.Thread(target=outreader)
452     t.start()
453     
454     stdin.write(tun_name+"\n")
455     stdin.write(options.vif_addr+"\n")
456     stdin.write(str(options.vif_mask)+"\n")
457     if options.vif_snat:
458         stdin.write("snat=1\n")
459     if options.vif_pointopoint:
460         stdin.write("pointopoint=%s\n" % (options.vif_pointopoint,))
461     if options.vif_txqueuelen is not None:
462         stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
463     if options.mode.startswith('pl-gre'):
464         stdin.write("gre=%s\n" % (options.gre_key,))
465         stdin.write("remote=%s\n" % (options.peer_addr,))
466     stdin.close()
467     
468     t.join()
469     out = ''.join(out)
470     if out.strip():
471         print >>sys.stderr, out
472     
473     del lock, lockfile
474
475 def pl_vif_stop(tun_path, tun_name):
476     out = []
477     def outreader():
478         out.append(stdout.read())
479         stdout.close()
480         
481         if options.mode.startswith('pl-gre'):
482             lim = 120
483         else:
484             lim = 2
485         
486         for i in xrange(lim):
487             ifaces = set(map(str.strip,os.popen("ip a | grep -o '%s'" % (tun_name,)).read().strip().split('\n')))
488             if tun_name in ifaces:
489                 time.sleep(1)
490             else:
491                 break
492
493     # Serialize access to vsys
494     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
495     lock = HostLock(lockfile)
496
497     stdin = open("/vsys/vif_down.in","w")
498     stdout = open("/vsys/vif_down.out","r")
499     
500     t = threading.Thread(target=outreader)
501     t.start()
502     
503     stdin.write(tun_name+"\n")
504     stdin.close()
505     
506     t.join()
507     out = ''.join(out)
508     if out.strip():
509         print >>sys.stderr, out
510     
511     del lock, lockfile
512
513
514 def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True, bwlimit = None):
515     global TERMINATE
516     
517     tunqueue = options.vif_txqueuelen or 1000
518     tunkqueue = 500
519     
520     # in PL mode, we cannot strip PI structs
521     # so we'll have to handle them
522     tunchannel.tun_fwd(tun, remote,
523         with_pi = options.mode.startswith('pl-'),
524         ether_mode = tun_name.startswith('tap'),
525         cipher_key = options.cipher_key,
526         udp = options.protocol == 'udp',
527         TERMINATE = TERMINATE,
528         stderr = None,
529         reconnect = reconnect,
530         tunqueue = tunqueue,
531         tunkqueue = tunkqueue,
532         cipher = options.cipher,
533         accept_local = accept_local,
534         accept_remote = accept_remote,
535         queueclass = queueclass,
536         slowlocal = slowlocal,
537         bwlimit = bwlimit
538     )
539
540
541
542 nop = lambda tun_path, tun_name : (tun_path, tun_name)
543 MODEINFO = {
544     'none' : dict(alloc=nop,
545                   tunopen=tunopen, tunclose=tunclose,
546                   dealloc=nop,
547                   start=nop,
548                   stop=nop),
549     'tun'  : dict(alloc=functools.partial(tuntap_alloc, "tun"),
550                   tunopen=tunopen, tunclose=tunclose,
551                   dealloc=tuntap_dealloc,
552                   start=vif_start,
553                   stop=vif_stop),
554     'tap'  : dict(alloc=functools.partial(tuntap_alloc, "tap"),
555                   tunopen=tunopen, tunclose=tunclose,
556                   dealloc=tuntap_dealloc,
557                   start=vif_start,
558                   stop=vif_stop),
559     'pl-tun'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tun"),
560                   tunopen=tunopen, tunclose=tunclose,
561                   dealloc=nop,
562                   start=pl_vif_start,
563                   stop=pl_vif_stop),
564     'pl-tap'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tap"),
565                   tunopen=tunopen, tunclose=tunclose,
566                   dealloc=nop,
567                   start=pl_vif_start,
568                   stop=pl_vif_stop),
569     'pl-gre-ip' : dict(alloc=functools.partial(pl_tuntap_namealloc, "tun"),
570                   tunopen=noopen, tunclose=tunclose,
571                   dealloc=nop,
572                   start=pl_vif_start,
573                   stop=pl_vif_stop),
574     'pl-gre-eth': dict(alloc=functools.partial(pl_tuntap_namealloc, "tap"),
575                   tunopen=noopen, tunclose=noclose,
576                   dealloc=nop,
577                   start=pl_vif_start,
578                   stop=pl_vif_stop),
579 }
580     
581 tun_path = options.tun_path
582 tun_name = options.tun_name
583
584 modeinfo = MODEINFO[options.mode]
585
586 # Try to load filter module
587 filter_thread = None
588 if options.filter_module:
589     print >>sys.stderr, "Loading module", options.filter_module, "with args", options.filter_args
590     if options.filter_module.endswith('.py'):
591         sys.path.append(os.path.dirname(options.filter_module))
592         filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
593         if options.filter_args:
594             try:
595                 filter_args = dict(map(lambda x:x.split('=',1),options.filter_args.split(',')))
596                 filter_module.init(**filter_args)
597             except:
598                 pass
599     elif options.filter_module.endswith('.so'):
600         filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
601         if options.filter_args:
602             try:
603                 filter_module.init(options.filter_args)
604             except:
605                 pass
606     try:
607         accept_packet = filter_module.accept_packet
608         print >>sys.stderr, "Installing packet filter (accept_packet)"
609     except:
610         accept_packet = None
611     
612     try:
613         queueclass = filter_module.queueclass
614         print >>sys.stderr, "Installing custom queue"
615     except:
616         queueclass = None
617     
618     try:
619         _filter_init = filter_module.filter_init
620         filter_run = filter_module.filter_run
621         filter_close = filter_module.filter_close
622         
623         def filter_init():
624             filter_local = ctypes.c_int(0)
625             filter_remote = ctypes.c_int(0)
626             _filter_init(filter_local, filter_remote)
627             return filter_local, filter_remote
628
629         print >>sys.stderr, "Installing packet filter (stream filter)"
630     except:
631         filter_init = None
632         filter_run = None
633         filter_close = None
634 else:
635     accept_packet = None
636     filter_init = None
637     filter_run = None
638     filter_close = None
639     queueclass = None
640
641 # install multicast forwarding hook
642 if options.multicast_fwd:
643     print >>sys.stderr, "Connecting to mcast filter"
644     mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
645     tunchannel.nonblock(mcfwd_sock.fileno())
646
647 # be careful to roll back stuff on exceptions
648 tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
649 try:
650     modeinfo['start'](tun_path, tun_name)
651     try:
652         tun = modeinfo['tunopen'](tun_path, tun_name)
653     except:
654         modeinfo['stop'](tun_path, tun_name)
655         raise
656 except:
657     modeinfo['dealloc'](tun_path, tun_name)
658     raise
659
660
661 # Trak SIGTERM, and set global termination flag instead of dying
662 TERMINATE = []
663 def _finalize(sig,frame):
664     global TERMINATE
665     TERMINATE.append(None)
666 signal.signal(signal.SIGTERM, _finalize)
667
668 try:
669     tcpdump = None
670     reconnect = None
671     mcastthread = None
672
673     # install multicast forwarding hook
674     if options.multicast_fwd:
675         print >>sys.stderr, "Installing mcast filter"
676         
677         if HAS_IOVEC:
678             writev = iovec.writev
679         else:
680             os_write = os.write
681             map_ = map
682             str_ = str
683             def writev(fileno, *stuff):
684                 os_write(''.join(map_(str_,stuff)))
685         
686         def accept_packet(packet, direction, 
687                 _up_accept=accept_packet, 
688                 sock=mcfwd_sock, 
689                 sockno=mcfwd_sock.fileno(),
690                 etherProto=tunchannel.etherProto,
691                 etherStrip=tunchannel.etherStrip,
692                 etherMode=tun_name.startswith('tap'),
693                 multicast_fwd = options.multicast_fwd,
694                 vif_addr = socket.inet_aton(options.vif_addr),
695                 connected = [], writev=writev,
696                 len=len, ord=ord):
697             if _up_accept:
698                 rv = _up_accept(packet, direction)
699                 if not rv:
700                     return rv
701
702             if direction == 1:
703                 # Incoming... what?
704                 if etherMode:
705                     if etherProto(packet)=='\x08\x00':
706                         fwd = etherStrip(packet)
707                     else:
708                         fwd = None
709                 else:
710                     fwd = packet
711                 if fwd is not None and len(fwd) >= 20:
712                     if (ord(fwd[16]) & 0xf0) == 0xe0:
713                         # Forward it
714                         if not connected:
715                             try:
716                                 sock.connect(multicast_fwd)
717                                 connected.append(None)
718                             except:
719                                 traceback.print_exc(file=sys.stderr)
720                         if connected:
721                             try:
722                                 writev(sockno, vif_addr,fwd)
723                             except:
724                                 traceback.print_exc(file=sys.stderr)
725             return 1
726
727     
728     if options.protocol == 'fd':
729         if accept_packet or filter_init:
730             raise NotImplementedError, "--pass-fd and --filter are not compatible"
731         
732         if options.pass_fd.startswith("base64:"):
733             options.pass_fd = base64.b64decode(
734                 options.pass_fd[len("base64:"):])
735             options.pass_fd = os.path.expandvars(options.pass_fd)
736         
737         print >>sys.stderr, "Sending FD to: %r" % (options.pass_fd,)
738         
739         # send FD to whoever wants it
740         import passfd
741         
742         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
743         retrydelay = 1.0
744         for i in xrange(30):
745             if TERMINATE:
746                 raise OSError, "Killed"
747             try:
748                 sock.connect(options.pass_fd)
749                 break
750             except socket.error:
751                 # wait a while, retry
752                 print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
753                 time.sleep(min(30.0,retrydelay))
754                 retrydelay *= 1.1
755         else:
756             sock.connect(options.pass_fd)
757         passfd.sendfd(sock, tun.fileno(), '0')
758         
759         # just wait forever
760         def tun_fwd(tun, remote, **kw):
761             global TERMINATE
762             TERM = TERMINATE
763             while not TERM:
764                 time.sleep(1)
765         remote = None
766     elif options.protocol == "gre":
767         if accept_packet or filter_init:
768             raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
769         
770         # just wait forever
771         def tun_fwd(tun, remote, **kw):
772             global TERMINATE
773             TERM = TERMINATE
774             while not TERM:
775                 time.sleep(1)
776         remote = options.peer_addr
777     elif options.protocol == "udp":
778         # connect to remote endpoint
779         if options.peer_addr and options.peer_port:
780             rsock = tunchannel.udp_establish(TERMINATE, hostaddr, options.port, 
781                     options.peer_addr, options.peer_port)
782             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
783         else:
784             print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
785             raise AssertionError, "Error: need a remote endpoint in UDP mode"
786     elif options.protocol == "tcp":
787         # connect to remote endpoint
788         if options.peer_addr and options.peer_port:
789             rsock = tunchannel.tcp_establish(TERMINATE, hostaddr, options.port,
790                     options.peer_addr, options.peer_port)
791             remote = os.fdopen(rsock.fileno(), 'r+b', 0)
792         else:
793             print >>sys.stderr, "Error: need a remote endpoint in TCP mode"
794             raise AssertionError, "Error: need a remote endpoint in TCP mode"
795     else:
796         msg = "Error: Invalid protocol %s" % options.protocol
797         print >>sys.stderr, msg 
798         raise AssertionError, msg
799
800     if filter_init:
801         filter_local, filter_remote = filter_init()
802         
803         def filter_loop():
804             global TERMINATE
805             TERM = TERMINATE
806             run = filter_run
807             local = filter_local
808             remote = filter_remote
809             while not TERM:
810                 run(local, remote)
811             filter_close(local, remote)
812             
813         filter_thread = threading.Thread(target=filter_loop)
814         filter_thread.start()
815     
816     print >>sys.stderr, "Connected"
817
818     if not options.no_capture:
819         # Launch a tcpdump subprocess, to capture and dump packets.
820         # Make sure to catch sigterm and kill the tcpdump as well
821         tcpdump = subprocess.Popen(
822             ["tcpdump","-l","-n","-i",tun_name, "-s", "4096"]
823             + ["-w",options.pcap_capture,"-U"] * bool(options.pcap_capture) )
824     
825     # Try to give us high priority
826     try:
827         os.nice(-20)
828     except:
829         # Ignore errors, we might not have enough privileges,
830         # or perhaps there is no os.nice support in the system
831         pass
832     
833     if not filter_init:
834         tun_fwd(tun, remote,
835             reconnect = reconnect,
836             accept_local = accept_packet,
837             accept_remote = accept_packet,
838             bwlimit = options.bwlimit,
839             slowlocal = True)
840     else:
841         # Hm...
842         # ...ok, we need to:
843         #  1. Forward packets from tun to filter
844         #  2. Forward packets from remote to filter
845         #
846         # 1. needs TUN rate-limiting, while 
847         # 2. needs reconnection
848         #
849         # 1. needs ONLY TUN-side acceptance checks, while
850         # 2. needs ONLY remote-side acceptance checks
851         if isinstance(filter_local, ctypes.c_int):
852             filter_local_fd = filter_local.value
853         else:
854             filter_local_fd = filter_local
855         if isinstance(filter_remote, ctypes.c_int):
856             filter_remote_fd = filter_remote.value
857         else:
858             filter_remote_fd = filter_remote
859
860         def localside():
861             tun_fwd(tun, filter_local_fd,
862                 accept_local = accept_packet,
863                 slowlocal = True)
864         
865         def remoteside():
866             tun_fwd(filter_remote_fd, remote,
867                 reconnect = reconnect,
868                 accept_remote = accept_packet,
869                 bwlimit = options.bwlimit,
870                 slowlocal = False)
871         
872         localthread = threading.Thread(target=localside)
873         remotethread = threading.Thread(target=remoteside)
874         localthread.start()
875         remotethread.start()
876         localthread.join()
877         remotethread.join()
878
879 finally:
880     try:
881         print >>sys.stderr, "Shutting down..."
882     except:
883         # In case sys.stderr is broken
884         pass
885     
886     # tidy shutdown in every case - swallow exceptions
887     TERMINATE.append(None)
888     
889     if mcastthread:
890         try:
891             mcastthread.stop()
892         except:
893             pass
894     
895     if filter_thread:
896         try:
897             filter_thread.join()
898         except:
899             pass
900
901     try:
902         if tcpdump:
903             os.kill(tcpdump.pid, signal.SIGTERM)
904             tcpdump.wait()
905     except:
906         pass
907
908     try:
909         modeinfo['stop'](tun_path, tun_name)
910     except:
911         traceback.print_exc()
912
913     try:
914         modeinfo['tunclose'](tun_path, tun_name, tun)
915     except:
916         traceback.print_exc()
917         
918     try:
919         modeinfo['dealloc'](tun_path, tun_name)
920     except:
921         traceback.print_exc()
922     
923     print >>sys.stderr, "TERMINATED GRACEFULLY"
924