bd74454f293a4ca712def4ab2b1442a569375d82
[nepi.git] / src / nepi / testbeds / planetlab / scripts / tun_connect.py
1 import sys
2
3 import socket
4 import fcntl
5 import os
6 import os.path
7 import select
8 import signal
9
10 import struct
11 import ctypes
12 import optparse
13 import threading
14 import subprocess
15 import re
16 import functools
17 import time
18 import base64
19 import traceback
20
21 import tunchannel
22 import ipaddr2
23
24 try:
25     import iovec
26     HAS_IOVEC = True
27 except:
28     HAS_IOVEC = False
29
30 tun_name = 'tun0'
31 tun_path = '/dev/net/tun'
32 hostaddr = socket.gethostbyname(socket.gethostname())
33
34 usage = "usage: %prog [options] <remote-endpoint>"
35
36 parser = optparse.OptionParser(usage=usage)
37
38 parser.add_option(
39     "-i", "--iface", dest="tun_name", metavar="DEVICE",
40     default = "tun0",
41     help = "TUN/TAP interface to tap into")
42 parser.add_option(
43     "-d", "--tun-path", dest="tun_path", metavar="PATH",
44     default = "/dev/net/tun",
45     help = "TUN/TAP device file path or file descriptor number")
46 parser.add_option(
47     "-p", "--port", dest="port", metavar="PORT", type="int",
48     default = 15000,
49     help = "Peering TCP port to connect or listen to.")
50 parser.add_option(
51     "--pass-fd", dest="pass_fd", metavar="UNIX_SOCKET",
52     default = None,
53     help = "Path to a unix-domain socket to pass the TUN file descriptor to. "
54            "If given, all other connectivity options are ignored, tun_connect will "
55            "simply wait to be killed after passing the file descriptor, and it will be "
56            "the receiver's responsability to handle the tunneling.")
57
58 parser.add_option(
59     "-m", "--mode", dest="mode", metavar="MODE",
60     default = "none",
61     help = 
62         "Set mode. One of none, tun, tap, pl-tun, pl-tap, pl-gre-ip, pl-gre-eth. In any mode except none, a TUN/TAP will be created "
63         "by using the proper interface (tunctl for tun/tap, /vsys/fd_tuntap.control for pl-tun/pl-tap), "
64         "and it will be brought up (with ifconfig for tun/tap, with /vsys/vif_up for pl-tun/pl-tap). You have "
65         "to specify an VIF_ADDRESS and VIF_MASK in any case (except for none).")
66 parser.add_option(
67     "-A", "--vif-address", dest="vif_addr", metavar="VIF_ADDRESS",
68     default = None,
69     help = 
70         "See mode. This specifies the VIF_ADDRESS, "
71         "the IP address of the virtual interface.")
72 parser.add_option(
73     "-M", "--vif-mask", dest="vif_mask", type="int", metavar="VIF_MASK", 
74     default = None,
75     help = 
76         "See mode. This specifies the VIF_MASK, "
77         "a number indicating the network type (ie: 24 for a C-class network).")
78 parser.add_option(
79     "-S", "--vif-snat", dest="vif_snat", 
80     action = "store_true",
81     default = False,
82     help = "See mode. This specifies whether SNAT will be enabled for the virtual interface. " )
83 parser.add_option(
84     "-P", "--vif-pointopoint", dest="vif_pointopoint",  metavar="DST_ADDR",
85     default = None,
86     help = 
87         "See mode. This specifies the remote endpoint's virtual address, "
88         "for point-to-point routing configuration. "
89         "Not supported by PlanetLab" )
90 parser.add_option(
91     "-Q", "--vif-txqueuelen", dest="vif_txqueuelen", metavar="SIZE", type="int",
92     default = None,
93     help = 
94         "See mode. This specifies the interface's transmission queue length. " )
95 parser.add_option(
96     "-b", "--bwlimit", dest="bwlimit", metavar="BYTESPERSECOND", type="int",
97     default = None,
98     help = 
99         "This specifies the interface's emulated bandwidth in bytes per second." )
100 parser.add_option(
101     "-u", "--udp", dest="udp", metavar="PORT", type="int",
102     default = None,
103     help = 
104         "Bind to the specified UDP port locally, and send UDP datagrams to the "
105         "remote endpoint, creating a tunnel through UDP rather than TCP." )
106 parser.add_option(
107     "-k", "--key", dest="cipher_key", metavar="KEY",
108     default = None,
109     help = 
110         "Specify a symmetric encryption key with which to protect packets across "
111         "the tunnel. python-crypto must be installed on the system." )
112 parser.add_option(
113     "-K", "--gre-key", dest="gre_key", metavar="KEY", type="string",
114     default = "true",
115     help = 
116         "Specify a demultiplexing 32-bit numeric key for GRE." )
117 parser.add_option(
118     "-C", "--cipher", dest="cipher", metavar="CIPHER",
119     default = 'AES',
120     help = "One of PLAIN, AES, Blowfish, DES, DES3. " )
121 parser.add_option(
122     "-N", "--no-capture", dest="no_capture", 
123     action = "store_true",
124     default = False,
125     help = "If specified, packets won't be logged to standard output "
126            "(default is to log them to standard output). " )
127 parser.add_option(
128     "-c", "--pcap-capture", dest="pcap_capture", metavar="FILE",
129     default = None,
130     help = "If specified, packets won't be logged to standard output, "
131            "but dumped to a pcap-formatted trace in the specified file. " )
132 parser.add_option(
133     "--multicast", dest="multicast", 
134     action = "store_true",
135     default = False,
136     help = "If specified, multicast packets will be forwarded and IGMP "
137            "join/leave packets will be generated. Routing information "
138            "must be sent to the mroute unix socket, in a format identical "
139            "to that of the kernel's MRT ioctls, prefixed with 32-bit IOCTL "
140            "code and 32-bit data length." )
141 parser.add_option(
142     "--multicast-forwarder", dest="multicast_fwd", 
143     default = None,
144     help = "If specified, multicast packets will be forwarded to "
145            "the specified unix-domain socket. If the device uses ethernet "
146            "frames, ethernet headers will be stripped and IP packets "
147            "will be forwarded." )
148 parser.add_option(
149     "--filter", dest="filter_module", metavar="PATH",
150     default = None,
151     help = "If specified, it should be either a .py or .so module. "
152            "It will be loaded, and all incoming and outgoing packets "
153            "will be routed through it. The filter will not be responsible "
154            "for buffering, packet queueing is performed in tun_connect "
155            "already, so it should not concern itself with it. It should "
156            "not, however, block in one direction if the other is congested.\n"
157            "\n"
158            "Modules are expected to have the following methods:\n"
159            "\tinit(**args)\n"
160            "\t\tIf arguments are given, this method will be called with the\n"
161            "\t\tgiven arguments (as keyword args in python modules, or a single\n"
162            "\t\tstring in c modules).\n"
163            "\taccept_packet(packet, direction):\n"
164            "\t\tDecide whether to drop the packet. Direction is 0 for packets "
165                "coming from the local side to the remote, and 1 is for packets "
166                "coming from the remote side to the local. Return a boolean, "
167                "true if the packet is not to be dropped.\n"
168            "\tfilter_init():\n"
169            "\t\tInitializes a filtering pipe (filter_run). It should "
170                "return two file descriptors to use as a bidirectional "
171                "pipe: local and remote. 'local' is where packets from the "
172                "local side will be written to. After filtering, those packets "
173                "should be written to 'remote', where tun_connect will read "
174                "from, and it will forward them to the remote peer. "
175                "Packets from the remote peer will be written to 'remote', "
176                "where the filter is expected to read from, and eventually "
177                "forward them to the local side. If the file descriptors are "
178                "not nonblocking, they will be set to nonblocking. So it's "
179                "better to set them from the start like that.\n"
180            "\tfilter_run(local, remote):\n"
181            "\t\tIf filter_init is provided, it will be called repeatedly, "
182                "in a separate thread until the process is killed. It should "
183                "sleep at most for a second.\n"
184            "\tfilter_close(local, remote):\n"
185            "\t\tCalled then the process is killed, if filter_init was provided. "
186                "It should, among other things, close the file descriptors.\n"
187            "\n"
188            "Python modules are expected to return a tuple in filter_init, "
189            "either of file descriptors or file objects, while native ones "
190            "will receive two int*.\n"
191            "\n"
192            "Python modules can additionally contain a custom queue class "
193            "that will replace the FIFO used by default. The class should "
194            "be named 'queueclass' and contain an interface compatible with "
195            "collections.deque. That is, indexing (especiall for q[0]), "
196            "bool(q), popleft, appendleft, pop (right), append (right), "
197            "len(q) and clear. When using a custom queue, queue size will "
198            "have no effect, pass an effective queue size to the module "
199            "by using filter_args" )
200 parser.add_option(
201     "--filter-args", dest="filter_args", metavar="FILE",
202     default = None,
203     help = "If specified, packets won't be logged to standard output, "
204            "but dumped to a pcap-formatted trace in the specified file. " )
205
206 (options, remaining_args) = parser.parse_args(sys.argv[1:])
207
208 options.cipher = {
209     'aes' : 'AES',
210     'des' : 'DES',
211     'des3' : 'DES3',
212     'blowfish' : 'Blowfish',
213     'plain' : None,
214 }[options.cipher.lower()]
215
216 ETH_P_ALL = 0x00000003
217 ETH_P_IP = 0x00000800
218 TUNSETIFF = 0x400454ca
219 IFF_NO_PI = 0x00001000
220 IFF_TAP = 0x00000002
221 IFF_TUN = 0x00000001
222 IFF_VNET_HDR = 0x00004000
223 TUN_PKT_STRIP = 0x00000001
224 IFHWADDRLEN = 0x00000006
225 IFNAMSIZ = 0x00000010
226 IFREQ_SZ = 0x00000028
227 FIONREAD = 0x0000541b
228
229 class MulticastThread(threading.Thread):
230     def __init__(self, *p, **kw):
231         super(MulticastThread, self).__init__(*p, **kw)
232         self.igmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IGMP)
233         self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
234             socket.inet_aton(options.vif_addr) )
235         self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
236         self.igmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
237         self._stop = False
238         self.setDaemon(True)
239     
240     def run(self):
241         devnull = open('/dev/null','r+b')
242         maddr_re = re.compile(r"\s*inet\s*(\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3})\s*")
243         cur_maddr = set()
244         lastfullrefresh = time.time()
245         while not self._stop:
246             # Get current subscriptions
247             proc = subprocess.Popen(['ip','maddr','show',tun_name],
248                 stdout = subprocess.PIPE,
249                 stderr = subprocess.STDOUT,
250                 stdin = devnull)
251             new_maddr = set()
252             for line in proc.stdout:
253                 match = maddr_re.match(line)
254                 if match:
255                     new_maddr.add(match.group(1))
256             proc.wait()
257             
258             # Every now and then, send a full report
259             now = time.time()
260             report_new = new_maddr
261             if (now - lastfullrefresh) <= 30.0:
262                 report_new = report_new - cur_maddr
263             else:
264                 lastfullrefresh = now
265             
266             # Report subscriptions
267             for grp in report_new:
268                 igmpp = ipaddr2.ipigmp(
269                     options.vif_addr, '224.0.0.2', 1, 0x16, 0, grp, 
270                     noipcksum=True)
271                 self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
272
273             # Notify group leave
274             for grp in cur_maddr - new_maddr:
275                 igmpp = ipaddr2.ipigmp(
276                     options.vif_addr, '224.0.0.2', 1, 0x17, 0, grp, 
277                     noipcksum=True)
278                 self.igmp_socket.sendto(igmpp, 0, ('224.0.0.2',0))
279
280             cur_maddr = new_maddr
281             
282             time.sleep(1)
283     
284     def stop(self):
285         self._stop = True
286         self.join(5)
287
288 class HostLock(object):
289     # This class is used as a lock to prevent concurrency issues with more
290     # than one instance of netns running in the same machine. Both in 
291     # different processes or different threads.
292     taken = False
293     processcond = threading.Condition()
294     
295     def __init__(self, lockfile):
296         processcond = self.__class__.processcond
297         
298         processcond.acquire()
299         try:
300             # It's not reentrant
301             while self.__class__.taken:
302                 processcond.wait()
303             self.__class__.taken = True
304         finally:
305             processcond.release()
306         
307         self.lockfile = lockfile
308         
309         while True:
310             try:
311                 fcntl.flock(self.lockfile, fcntl.LOCK_EX)
312                 break
313             except (OSError, IOError), e:
314                 if e.args[0] != os.errno.EINTR:
315                     raise
316     
317     def __del__(self):
318         processcond = self.__class__.processcond
319         
320         processcond.acquire()
321         try:
322             if not self.lockfile.closed:
323                 fcntl.flock(self.lockfile, fcntl.LOCK_UN)
324             
325             # It's not reentrant
326             self.__class__.taken = False
327             processcond.notify()
328         finally:
329             processcond.release()
330
331 def ifnam(x):
332     return x+'\x00'*(IFNAMSIZ-len(x))
333
334 def ifreq(iface, flags):
335     # ifreq contains:
336     #   char[IFNAMSIZ] : interface name
337     #   short : flags
338     #   <padding>
339     ifreq = ifnam(iface)+struct.pack("H",flags);
340     ifreq += '\x00' * (len(ifreq)-IFREQ_SZ)
341     return ifreq
342
343 def tunopen(tun_path, tun_name):
344     if tun_path.isdigit():
345         # open TUN fd
346         print >>sys.stderr, "Using tun:", tun_name, "fd", tun_path
347         tun = os.fdopen(int(tun_path), 'r+b', 0)
348     else:
349         # open TUN path
350         print >>sys.stderr, "Using tun:", tun_name, "at", tun_path
351         tun = open(tun_path, 'r+b', 0)
352
353         # bind file descriptor to the interface
354         fcntl.ioctl(tun.fileno(), TUNSETIFF, ifreq(tun_name, IFF_NO_PI|IFF_TUN))
355     
356     return tun
357
358 def tunclose(tun_path, tun_name, tun):
359     if tun_path and tun_path.isdigit():
360         # close TUN fd
361         os.close(int(tun_path))
362         tun.close()
363     elif tun:
364         # close TUN object
365         tun.close()
366
367 def noopen(tun_path, tun_name):
368     print >>sys.stderr, "Using tun:", tun_name
369     return None
370 def noclose(tun_path, tun_name, tun):
371     pass
372
373 def tuntap_alloc(kind, tun_path, tun_name):
374     args = ["tunctl"]
375     if kind == "tun":
376         args.append("-n")
377     if tun_name:
378         args.append("-t")
379         args.append(tun_name)
380     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
381     out,err = proc.communicate()
382     if proc.wait():
383         raise RuntimeError, "Could not allocate %s device" % (kind,)
384         
385     match = re.search(r"Set '(?P<dev>(?:tun|tap)[0-9]*)' persistent and owned by .*", out, re.I)
386     if not match:
387         raise RuntimeError, "Could not allocate %s device - tunctl said: %s" % (kind, out)
388     
389     tun_name = match.group("dev")
390     print >>sys.stderr, "Allocated %s device: %s" % (kind, tun_name)
391     
392     return tun_path, tun_name
393
394 def tuntap_dealloc(tun_path, tun_name):
395     args = ["tunctl", "-d", tun_name]
396     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
397     out,err = proc.communicate()
398     if proc.wait():
399         print >> sys.stderr, "WARNING: error deallocating %s device" % (tun_name,)
400
401 def nmask_to_dot_notation(mask):
402     mask = hex(((1 << mask) - 1) << (32 - mask)) # 24 -> 0xFFFFFF00
403     mask = mask[2:] # strip 0x
404     mask = mask.decode("hex") # to bytes
405     mask = '.'.join(map(str,map(ord,mask))) # to 255.255.255.0
406     return mask
407
408 def vif_start(tun_path, tun_name):
409     args = ["ifconfig", tun_name, options.vif_addr, 
410             "netmask", nmask_to_dot_notation(options.vif_mask),
411             "-arp" ]
412     if options.vif_pointopoint:
413         args.extend(["pointopoint",options.vif_pointopoint])
414     if options.vif_txqueuelen is not None:
415         args.extend(["txqueuelen",str(options.vif_txqueuelen)])
416     args.append("up")
417     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
418     out,err = proc.communicate()
419     if proc.wait():
420         raise RuntimeError, "Error starting virtual interface"
421     
422     if options.vif_snat:
423         # set up SNAT using iptables
424         # TODO: stop vif on error. 
425         #   Not so necessary since deallocating the tun/tap device
426         #   will forcibly stop it, but it would be tidier
427         args = [ "iptables", "-t", "nat", "-A", "POSTROUTING", 
428                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
429                  "-j", "SNAT",
430                  "--to-source", hostaddr, "--random" ]
431         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
432         out,err = proc.communicate()
433         if proc.wait():
434             raise RuntimeError, "Error setting up SNAT"
435
436 def vif_stop(tun_path, tun_name):
437     if options.vif_snat:
438         # set up SNAT using iptables
439         args = [ "iptables", "-t", "nat", "-D", "POSTROUTING", 
440                  "-s", "%s/%d" % (options.vif_addr, options.vif_mask),
441                  "-j", "SNAT",
442                  "--to-source", hostaddr, "--random" ]
443         proc = subprocess.Popen(args, stdout=subprocess.PIPE)
444         out,err = proc.communicate()
445     
446     args = ["ifconfig", tun_name, "down"]
447     proc = subprocess.Popen(args, stdout=subprocess.PIPE)
448     out,err = proc.communicate()
449     if proc.wait():
450         print >>sys.stderr, "WARNING: error stopping virtual interface"
451     
452     
453 def pl_tuntap_alloc(kind, tun_path, tun_name):
454     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
455     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
456     kind = {"tun":IFF_TUN,
457             "tap":IFF_TAP}[kind]
458     fd = tunalloc_so.tun_alloc(kind, c_tun_name)
459     name = c_tun_name.value
460     return str(fd), name
461
462 _name_reservation = None
463 def pl_tuntap_namealloc(kind, tun_path, tun_name):
464     global _name_reservation
465     # Serialize access
466     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
467     lock = HostLock(lockfile)
468     
469     # We need to do this, fd_tuntap is the only one who can
470     # tell us our slice id (this script runs as root, so no uid),
471     # and the pattern of device names accepted by vsys scripts
472     tunalloc_so = ctypes.cdll.LoadLibrary("./tunalloc.so")
473     c_tun_name = ctypes.c_char_p("\x00"*IFNAMSIZ) # the string will be mutated!
474     nkind= {"tun":IFF_TUN,
475             "tap":IFF_TAP}[kind]
476     fd = tunalloc_so.tun_alloc(nkind, c_tun_name)
477     name = c_tun_name.value
478     os.close(fd)
479
480     base = name[:name.index('-')+1]
481     existing = set(map(str.strip,os.popen("ip a | grep -o '%s[0-9]*'" % (base,)).read().strip().split('\n')))
482     
483     for i in xrange(9000,10000):
484         name = base + str(i)
485         if name not in existing:
486             break
487     else:
488         raise RuntimeError, "Could not assign interface name"
489     
490     _name_reservation = lock
491     
492     return None, name
493
494 def pl_vif_start(tun_path, tun_name):
495     global _name_reservation
496
497     out = []
498     def outreader():
499         out.append(stdout.read())
500         stdout.close()
501         time.sleep(1)
502
503     # Serialize access to vsys
504     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
505     lock = _name_reservation or HostLock(lockfile)
506     _name_reservation = None
507     
508     stdin = open("/vsys/vif_up.in","w")
509     stdout = open("/vsys/vif_up.out","r")
510
511     t = threading.Thread(target=outreader)
512     t.start()
513     
514     stdin.write(tun_name+"\n")
515     stdin.write(options.vif_addr+"\n")
516     stdin.write(str(options.vif_mask)+"\n")
517     if options.vif_snat:
518         stdin.write("snat=1\n")
519     if options.vif_pointopoint:
520         stdin.write("pointopoint=%s\n" % (options.vif_pointopoint,))
521     if options.vif_txqueuelen is not None:
522         stdin.write("txqueuelen=%d\n" % (options.vif_txqueuelen,))
523     if options.mode.startswith('pl-gre'):
524         stdin.write("gre=%s\n" % (options.gre_key,))
525         stdin.write("remote=%s\n" % (remaining_args[0],))
526     stdin.close()
527     
528     t.join()
529     out = ''.join(out)
530     if out.strip():
531         print >>sys.stderr, out
532     
533     del lock, lockfile
534
535 def pl_vif_stop(tun_path, tun_name):
536     out = []
537     def outreader():
538         out.append(stdout.read())
539         stdout.close()
540         
541         if options.mode.startswith('pl-gre'):
542             lim = 120
543         else:
544             lim = 2
545         
546         for i in xrange(lim):
547             ifaces = set(map(str.strip,os.popen("ip a | grep -o '%s'" % (tun_name,)).read().strip().split('\n')))
548             if tun_name in ifaces:
549                 time.sleep(1)
550             else:
551                 break
552
553     # Serialize access to vsys
554     lockfile = open("/tmp/nepi-tun-connect.lock", "a")
555     lock = HostLock(lockfile)
556
557     stdin = open("/vsys/vif_down.in","w")
558     stdout = open("/vsys/vif_down.out","r")
559     
560     t = threading.Thread(target=outreader)
561     t.start()
562     
563     stdin.write(tun_name+"\n")
564     stdin.close()
565     
566     t.join()
567     out = ''.join(out)
568     if out.strip():
569         print >>sys.stderr, out
570     
571     del lock, lockfile
572
573
574 def tun_fwd(tun, remote, reconnect = None, accept_local = None, accept_remote = None, slowlocal = True, bwlimit = None):
575     global TERMINATE
576     
577     tunqueue = options.vif_txqueuelen or 1000
578     tunkqueue = 500
579     
580     # in PL mode, we cannot strip PI structs
581     # so we'll have to handle them
582     tunchannel.tun_fwd(tun, remote,
583         with_pi = options.mode.startswith('pl-'),
584         ether_mode = tun_name.startswith('tap'),
585         cipher_key = options.cipher_key,
586         udp = options.udp,
587         TERMINATE = TERMINATE,
588         stderr = None,
589         reconnect = reconnect,
590         tunqueue = tunqueue,
591         tunkqueue = tunkqueue,
592         cipher = options.cipher,
593         accept_local = accept_local,
594         accept_remote = accept_remote,
595         queueclass = queueclass,
596         slowlocal = slowlocal,
597         bwlimit = bwlimit
598     )
599
600
601
602 nop = lambda tun_path, tun_name : (tun_path, tun_name)
603 MODEINFO = {
604     'none' : dict(alloc=nop,
605                   tunopen=tunopen, tunclose=tunclose,
606                   dealloc=nop,
607                   start=nop,
608                   stop=nop),
609     'tun'  : dict(alloc=functools.partial(tuntap_alloc, "tun"),
610                   tunopen=tunopen, tunclose=tunclose,
611                   dealloc=tuntap_dealloc,
612                   start=vif_start,
613                   stop=vif_stop),
614     'tap'  : dict(alloc=functools.partial(tuntap_alloc, "tap"),
615                   tunopen=tunopen, tunclose=tunclose,
616                   dealloc=tuntap_dealloc,
617                   start=vif_start,
618                   stop=vif_stop),
619     'pl-tun'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tun"),
620                   tunopen=tunopen, tunclose=tunclose,
621                   dealloc=nop,
622                   start=pl_vif_start,
623                   stop=pl_vif_stop),
624     'pl-tap'  : dict(alloc=functools.partial(pl_tuntap_alloc, "tap"),
625                   tunopen=tunopen, tunclose=tunclose,
626                   dealloc=nop,
627                   start=pl_vif_start,
628                   stop=pl_vif_stop),
629     'pl-gre-ip' : dict(alloc=functools.partial(pl_tuntap_namealloc, "tun"),
630                   tunopen=noopen, tunclose=tunclose,
631                   dealloc=nop,
632                   start=pl_vif_start,
633                   stop=pl_vif_stop),
634     'pl-gre-eth': dict(alloc=functools.partial(pl_tuntap_namealloc, "tap"),
635                   tunopen=noopen, tunclose=noclose,
636                   dealloc=nop,
637                   start=pl_vif_start,
638                   stop=pl_vif_stop),
639 }
640     
641 tun_path = options.tun_path
642 tun_name = options.tun_name
643
644 modeinfo = MODEINFO[options.mode]
645
646 # Try to load filter module
647 filter_thread = None
648 if options.filter_module:
649     print >>sys.stderr, "Loading module", options.filter_module, "with args", options.filter_args
650     if options.filter_module.endswith('.py'):
651         sys.path.append(os.path.dirname(options.filter_module))
652         filter_module = __import__(os.path.basename(options.filter_module).rsplit('.',1)[0])
653         if options.filter_args:
654             try:
655                 filter_args = dict(map(lambda x:x.split('=',1),options.filter_args.split(',')))
656                 filter_module.init(**filter_args)
657             except:
658                 pass
659     elif options.filter_module.endswith('.so'):
660         filter_module = ctypes.cdll.LoadLibrary(options.filter_module)
661         if options.filter_args:
662             try:
663                 filter_module.init(options.filter_args)
664             except:
665                 pass
666     try:
667         accept_packet = filter_module.accept_packet
668         print >>sys.stderr, "Installing packet filter (accept_packet)"
669     except:
670         accept_packet = None
671     
672     try:
673         queueclass = filter_module.queueclass
674         print >>sys.stderr, "Installing custom queue"
675     except:
676         queueclass = None
677     
678     try:
679         _filter_init = filter_module.filter_init
680         filter_run = filter_module.filter_run
681         filter_close = filter_module.filter_close
682         
683         def filter_init():
684             filter_local = ctypes.c_int(0)
685             filter_remote = ctypes.c_int(0)
686             _filter_init(filter_local, filter_remote)
687             return filter_local, filter_remote
688
689         print >>sys.stderr, "Installing packet filter (stream filter)"
690     except:
691         filter_init = None
692         filter_run = None
693         filter_close = None
694 else:
695     accept_packet = None
696     filter_init = None
697     filter_run = None
698     filter_close = None
699     queueclass = None
700
701 # install multicast forwarding hook
702 if options.multicast_fwd:
703     print >>sys.stderr, "Connecting to mcast filter"
704     mcfwd_sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
705     tunchannel.nonblock(mcfwd_sock.fileno())
706
707 # be careful to roll back stuff on exceptions
708 tun_path, tun_name = modeinfo['alloc'](tun_path, tun_name)
709 try:
710     modeinfo['start'](tun_path, tun_name)
711     try:
712         tun = modeinfo['tunopen'](tun_path, tun_name)
713     except:
714         modeinfo['stop'](tun_path, tun_name)
715         raise
716 except:
717     modeinfo['dealloc'](tun_path, tun_name)
718     raise
719
720
721 # Trak SIGTERM, and set global termination flag instead of dying
722 TERMINATE = []
723 def _finalize(sig,frame):
724     global TERMINATE
725     TERMINATE.append(None)
726 signal.signal(signal.SIGTERM, _finalize)
727
728 try:
729     tcpdump = None
730     reconnect = None
731     mcastthread = None
732
733     # install multicast forwarding hook
734     if options.multicast_fwd:
735         print >>sys.stderr, "Installing mcast filter"
736         
737         if HAS_IOVEC:
738             writev = iovec.writev
739         else:
740             os_write = os.write
741             map_ = map
742             str_ = str
743             def writev(fileno, *stuff):
744                 os_write(''.join(map_(str_,stuff)))
745         
746         def accept_packet(packet, direction, 
747                 _up_accept=accept_packet, 
748                 sock=mcfwd_sock, 
749                 sockno=mcfwd_sock.fileno(),
750                 etherProto=tunchannel.etherProto,
751                 etherStrip=tunchannel.etherStrip,
752                 etherMode=tun_name.startswith('tap'),
753                 multicast_fwd = options.multicast_fwd,
754                 vif_addr = socket.inet_aton(options.vif_addr),
755                 connected = [], writev=writev,
756                 len=len, ord=ord):
757             if _up_accept:
758                 rv = _up_accept(packet, direction)
759                 if not rv:
760                     return rv
761
762             if direction == 1:
763                 # Incoming... what?
764                 if etherMode:
765                     if etherProto(packet)=='\x08\x00':
766                         fwd = etherStrip(packet)
767                     else:
768                         fwd = None
769                 else:
770                     fwd = packet
771                 if fwd is not None and len(fwd) >= 20:
772                     if (ord(fwd[16]) & 0xf0) == 0xe0:
773                         # Forward it
774                         if not connected:
775                             try:
776                                 sock.connect(multicast_fwd)
777                                 connected.append(None)
778                             except:
779                                 traceback.print_exc(file=sys.stderr)
780                         if connected:
781                             try:
782                                 writev(sockno, vif_addr,fwd)
783                             except:
784                                 traceback.print_exc(file=sys.stderr)
785             return 1
786
787     
788     if options.pass_fd:
789         if accept_packet or filter_init:
790             raise NotImplementedError, "--pass-fd and --filter are not compatible"
791         
792         if options.pass_fd.startswith("base64:"):
793             options.pass_fd = base64.b64decode(
794                 options.pass_fd[len("base64:"):])
795             options.pass_fd = os.path.expandvars(options.pass_fd)
796         
797         print >>sys.stderr, "Sending FD to: %r" % (options.pass_fd,)
798         
799         # send FD to whoever wants it
800         import passfd
801         
802         sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
803         retrydelay = 1.0
804         for i in xrange(30):
805             if TERMINATE:
806                 raise OSError, "Killed"
807             try:
808                 sock.connect(options.pass_fd)
809                 break
810             except socket.error:
811                 # wait a while, retry
812                 print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
813                 time.sleep(min(30.0,retrydelay))
814                 retrydelay *= 1.1
815         else:
816             sock.connect(options.pass_fd)
817         passfd.sendfd(sock, tun.fileno(), '0')
818         
819         # just wait forever
820         def tun_fwd(tun, remote, **kw):
821             global TERMINATE
822             TERM = TERMINATE
823             while not TERM:
824                 time.sleep(1)
825         remote = None
826     elif options.mode.startswith('pl-gre'):
827         if accept_packet or filter_init:
828             raise NotImplementedError, "--mode %s and --filter are not compatible" % (options.mode,)
829         
830         # just wait forever
831         def tun_fwd(tun, remote, **kw):
832             global TERMINATE
833             TERM = TERMINATE
834             while not TERM:
835                 time.sleep(1)
836         remote = remaining_args[0]
837     elif options.udp:
838         # connect to remote endpoint
839         if remaining_args and not remaining_args[0].startswith('-'):
840             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
841             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
842             rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
843             retrydelay = 1.0
844             for i in xrange(30):
845                 if TERMINATE:
846                     raise OSError, "Killed"
847                 try:
848                     rsock.bind((hostaddr,options.udp))
849                     break
850                 except socket.error:
851                     # wait a while, retry
852                     print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
853                     time.sleep(min(30.0,retrydelay))
854                     retrydelay *= 1.1
855             else:
856                 rsock.bind((hostaddr,options.udp))
857             rsock.connect((remaining_args[0],options.port))
858         else:
859             print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
860             raise AssertionError, "Error: need a remote endpoint in UDP mode"
861         
862         # Wait for other peer
863         tunchannel.udp_handshake(TERMINATE, rsock)
864         
865         remote = os.fdopen(rsock.fileno(), 'r+b', 0)
866     else:
867         # connect to remote endpoint
868         if remaining_args and not remaining_args[0].startswith('-'):
869             print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
870             rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
871             retrydelay = 1.0
872             for i in xrange(30):
873                 if TERMINATE:
874                     raise OSError, "Killed"
875                 try:
876                     rsock.connect((remaining_args[0],options.port))
877                     break
878                 except socket.error:
879                     # wait a while, retry
880                     print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
881                     time.sleep(min(30.0,retrydelay))
882                     retrydelay *= 1.1
883             else:
884                 rsock.connect((remaining_args[0],options.port))
885         else:
886             print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
887             lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
888             retrydelay = 1.0
889             for i in xrange(30):
890                 if TERMINATE:
891                     raise OSError, "Killed"
892                 try:
893                     lsock.bind((hostaddr,options.port))
894                     break
895                 except socket.error:
896                     # wait a while, retry
897                     print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
898                     time.sleep(min(30.0,retrydelay))
899                     retrydelay *= 1.1
900             else:
901                 lsock.bind((hostaddr,options.port))
902             lsock.listen(1)
903             rsock,raddr = lsock.accept()
904         remote = os.fdopen(rsock.fileno(), 'r+b', 0)
905
906     if filter_init:
907         filter_local, filter_remote = filter_init()
908         
909         def filter_loop():
910             global TERMINATE
911             TERM = TERMINATE
912             run = filter_run
913             local = filter_local
914             remote = filter_remote
915             while not TERM:
916                 run(local, remote)
917             filter_close(local, remote)
918             
919         filter_thread = threading.Thread(target=filter_loop)
920         filter_thread.start()
921     
922     print >>sys.stderr, "Connected"
923
924     if not options.no_capture:
925         # Launch a tcpdump subprocess, to capture and dump packets.
926         # Make sure to catch sigterm and kill the tcpdump as well
927         tcpdump = subprocess.Popen(
928             ["tcpdump","-l","-n","-i",tun_name, "-s", "4096"]
929             + ["-w",options.pcap_capture,"-U"] * bool(options.pcap_capture) )
930     
931     # Try to give us high priority
932     try:
933         os.nice(-20)
934     except:
935         # Ignore errors, we might not have enough privileges,
936         # or perhaps there is no os.nice support in the system
937         pass
938     
939     if options.multicast:
940         # Start multicast forwarding daemon
941         mcastthread = MulticastThread()
942         mcastthread.start()
943
944     if not filter_init:
945         tun_fwd(tun, remote,
946             reconnect = reconnect,
947             accept_local = accept_packet,
948             accept_remote = accept_packet,
949             bwlimit = options.bwlimit,
950             slowlocal = True)
951     else:
952         # Hm...
953         # ...ok, we need to:
954         #  1. Forward packets from tun to filter
955         #  2. Forward packets from remote to filter
956         #
957         # 1. needs TUN rate-limiting, while 
958         # 2. needs reconnection
959         #
960         # 1. needs ONLY TUN-side acceptance checks, while
961         # 2. needs ONLY remote-side acceptance checks
962         if isinstance(filter_local, ctypes.c_int):
963             filter_local_fd = filter_local.value
964         else:
965             filter_local_fd = filter_local
966         if isinstance(filter_remote, ctypes.c_int):
967             filter_remote_fd = filter_remote.value
968         else:
969             filter_remote_fd = filter_remote
970
971         def localside():
972             tun_fwd(tun, filter_local_fd,
973                 accept_local = accept_packet,
974                 slowlocal = True)
975         
976         def remoteside():
977             tun_fwd(filter_remote_fd, remote,
978                 reconnect = reconnect,
979                 accept_remote = accept_packet,
980                 bwlimit = options.bwlimit,
981                 slowlocal = False)
982         
983         localthread = threading.Thread(target=localside)
984         remotethread = threading.Thread(target=remoteside)
985         localthread.start()
986         remotethread.start()
987         localthread.join()
988         remotethread.join()
989
990 finally:
991     try:
992         print >>sys.stderr, "Shutting down..."
993     except:
994         # In case sys.stderr is broken
995         pass
996     
997     # tidy shutdown in every case - swallow exceptions
998     TERMINATE.append(None)
999     
1000     if mcastthread:
1001         try:
1002             mcastthread.stop()
1003         except:
1004             pass
1005     
1006     if filter_thread:
1007         try:
1008             filter_thread.join()
1009         except:
1010             pass
1011
1012     try:
1013         if tcpdump:
1014             os.kill(tcpdump.pid, signal.SIGTERM)
1015             tcpdump.wait()
1016     except:
1017         pass
1018
1019     try:
1020         modeinfo['stop'](tun_path, tun_name)
1021     except:
1022         traceback.print_exc()
1023
1024     try:
1025         modeinfo['tunclose'](tun_path, tun_name, tun)
1026     except:
1027         traceback.print_exc()
1028         
1029     try:
1030         modeinfo['dealloc'](tun_path, tun_name)
1031     except:
1032         traceback.print_exc()
1033     
1034     print >>sys.stderr, "TERMINATED GRACEFULLY"
1035