17 ipbytes = map(ord,ip.decode("hex"))
18 return '.'.join(map(str,ipbytes))
24 '8863' : 'PPPoE discover',
28 def etherProto(packet, len=len):
30 if packet[12] == "\x81" and packet[13] == "\x00":
38 def formatPacket(packet, ether_mode):
40 stripped_packet = etherStrip(packet)
41 if not stripped_packet:
42 packet = packet.encode("hex")
44 return "malformed eth " + packet.encode("hex")
46 if packet[24:28] == "8100":
48 ethertype = tagtype.get(packet[32:36], 'eth')
49 return ethertype + " " + ( '-'.join( (
50 packet[0:12], # MAC dest
51 packet[12:24], # MAC src
52 packet[24:32], # VLAN tag
53 packet[32:36], # Ethertype/len
54 packet[36:], # Payload
58 ethertype = tagtype.get(packet[24:28], 'eth')
59 return ethertype + " " + ( '-'.join( (
60 packet[0:12], # MAC dest
61 packet[12:24], # MAC src
62 packet[24:28], # Ethertype/len
63 packet[28:], # Payload
66 packet = stripped_packet
67 packet = packet.encode("hex")
69 return "malformed ip " + packet
71 return "ip " + ( '-'.join( (
73 packet[1:2], #header length
74 packet[2:4], #diffserv/ECN
75 packet[4:8], #total length
77 packet[12:16], #flags/fragment offs
79 packet[18:20], #ip-proto
80 packet[20:24], #checksum
81 ipfmt(packet[24:32]), # src-ip
82 ipfmt(packet[32:40]), # dst-ip
83 packet[40:48] if (int(packet[1],16) > 5) else "", # options
84 packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
87 def _packetReady(buf, ether_mode=False, len=len):
98 _,totallen = struct.unpack('HH',buf[0][:4])
99 totallen = socket.htons(totallen)
100 rv = len(buf[0]) >= totallen
101 if not rv and len(buf) > 1:
109 def _pullPacket(buf, ether_mode=False, len=len):
113 _,totallen = struct.unpack('HH',buf[0][:4])
114 totallen = socket.htons(totallen)
115 if len(buf[0]) < totallen:
116 rv = buf[0][:totallen]
117 buf[0] = buf[0][totallen:]
125 if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
126 # tagged ethernet frame
128 elif buf[12:14] == '\x08\x00':
129 # untagged ethernet frame
134 def etherWrap(packet):
136 "\x00"*6*2 # bogus src and dst mac
139 "\x00"*4, # bogus crc
142 def piStrip(buf, len=len):
148 def piWrap(buf, ether_mode, etherProto=etherProto):
150 proto = etherProto(buf)
154 "\x00\x00", # PI: 16 bits flags
155 proto, # 16 bits proto
159 _padmap = [ chr(padding) * padding for padding in xrange(127) ]
162 def encrypt(packet, crypter, len=len, padmap=_padmap):
164 padding = crypter.block_size - len(packet) % crypter.block_size
165 packet += padmap[padding]
168 return crypter.encrypt(packet)
170 def decrypt(packet, crypter, ord=ord):
173 packet = crypter.decrypt(packet)
176 padding = ord(packet[-1])
177 if not (0 < padding <= crypter.block_size):
179 raise RuntimeError, "Truncated packet"
180 packet = packet[:-padding]
186 fl = fcntl.fcntl(fd, fcntl.F_GETFL)
188 fcntl.fcntl(fd, fcntl.F_SETFL, fl)
191 traceback.print_exc(file=sys.stderr)
195 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
196 cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, bwlimit=None,
197 len=len, max=max, min=min, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
198 retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
203 if cipher_key and cipher:
206 __import__('Crypto.Cipher.'+cipher)
209 cipher = getattr(Crypto.Cipher, cipher)
210 hashed_key = hashlib.sha256(cipher_key).digest()
211 if getattr(cipher, 'key_size'):
212 hashed_key = hashed_key[:cipher.key_size]
213 elif ciphername == 'DES3':
214 hashed_key = hashed_key[:24]
215 crypter = cipher.new(
220 traceback.print_exc(file=sys.stderr)
224 if stderr is not None:
226 print >>stderr, "Packets are transmitted in CIPHER"
228 print >>stderr, "Packets are transmitted in PLAINTEXT"
230 if hasattr(remote, 'fileno'):
231 remote_fd = remote.fileno()
233 def rwrite(remote, packet, os_write=os.write):
234 return os_write(remote_fd, packet)
236 def rread(remote, maxlen, os_read=os.read):
237 return os_read(remote_fd, maxlen)
239 rnonblock = nonblock(remote)
240 tnonblock = nonblock(tun)
242 # Pick up TUN/TAP writing method
247 # We have iovec, so we can skip PI injection
248 # and use iovec which does it natively
250 twrite = iovec.ethpiwrite
251 tread = iovec.piread2
253 twrite = iovec.ippiwrite
254 tread = iovec.piread2
256 # We have to inject PI headers pythonically
257 def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
258 return oswrite(fd, piWrap(packet, ether_mode))
260 # For reading, we strip PI headers with buffer slicing and that's it
261 def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
262 return piStrip(osread(fd, maxlen))
264 # No need to inject PI headers
272 if accept_local is not None:
273 def tread(fd, maxlen, _tread=tread, accept=accept_local):
274 packet = _tread(fd, maxlen)
275 if accept(packet, 0):
280 if accept_remote is not None:
282 def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
283 packet = decrypt_(packet, crypter)
284 if accept(packet, 1):
289 def rread(fd, maxlen, _rread=rread, accept=accept_remote):
290 packet = _rread(fd, maxlen)
291 if accept(packet, 1):
296 maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
297 tunhurry = max(0,maxbkbuf/2)
299 if queueclass is None:
300 queueclass = collections.deque
304 maxfwbuf = maxbkbuf = 2000000000
313 if ether_mode or udp:
315 pullPacket = queueclass.popleft
316 reschedule = queueclass.appendleft
318 packetReady = _packetReady
319 pullPacket = _pullPacket
320 reschedule = queueclass.appendleft
326 maxbwfree = bwfree = 1500 * tunqueue
334 if packetReady(bkbuf):
336 if remoteok and packetReady(fwbuf) and (not bwlimit or bwfree > 0):
340 if len(fwbuf) < maxfwbuf:
342 if remoteok and len(bkbuf) < maxbkbuf:
351 rdrdy, wrdy, errs = select(rset,wset,eset,1)
352 except selecterror, e:
353 if e.args[0] == errno.EINTR:
361 if reconnect is not None and remote in errs and tun not in errs:
363 if hasattr(remote, 'fileno'):
364 remote_fd = remote.fileno()
365 elif udp and remote in errs and tun not in errs:
366 # In UDP mode, those are always transient errors
367 # Usually, an error will imply a read-ready socket
368 # that will raise an "Connection refused" error, so
369 # disable read-readiness just for now, and retry
378 # check to see if we can write
379 #rr = wr = rt = wt = 0
384 for x in xrange(maxbatch):
385 packet = pullPacket(fwbuf)
388 packet = encrypt_(packet, crypter)
390 sent += rwrite(remote, packet)
393 if not rnonblock or not packetReady(fwbuf):
396 # This except handles the entire While block on PURPOSE
397 # as an optimization (setting a try/except block is expensive)
398 # The only operation that can raise this exception is rwrite
399 if e.errno in retrycodes:
401 reschedule(fwbuf, packet)
405 if reconnect is not None:
406 # in UDP mode, sometimes connected sockets can return a connection refused.
407 # Give the caller a chance to reconnect
409 if hasattr(remote, 'fileno'):
410 remote_fd = remote.fileno()
412 # in UDP mode, we ignore errors - packet loss man...
414 #traceback.print_exc(file=sys.stderr)
420 for x in xrange(maxtbatch):
421 packet = pullPacket(bkbuf)
422 twrite(tunfd, packet)
425 # Do not inject packets into the TUN faster than they arrive, unless we're falling
426 # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
427 # don't block either (they're always ready to write), so if we flood the device
428 # we'll have high packet loss.
429 if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
433 # Give some time for the kernel to process the packets
436 # This except handles the entire While block on PURPOSE
437 # as an optimization (setting a try/except block is expensive)
438 # The only operation that can raise this exception is os_write
439 if e.errno in retrycodes:
441 reschedule(bkbuf, packet)
445 # check incoming data packets
448 for x in xrange(maxbatch):
449 packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
455 if not tnonblock or len(fwbuf) >= maxfwbuf:
458 # This except handles the entire While block on PURPOSE
459 # as an optimization (setting a try/except block is expensive)
460 # The only operation that can raise this exception is os_read
461 if e.errno not in retrycodes:
466 for x in xrange(maxbatch):
467 packet = rread(remote,2000)
472 packet = decrypt_(packet, crypter)
476 if not udp and packet == "":
477 # Connection broken, try to reconnect (or just die)
478 raise RuntimeError, "Connection broken"
484 if not rnonblock or len(bkbuf) >= maxbkbuf:
487 # This except handles the entire While block on PURPOSE
488 # as an optimization (setting a try/except block is expensive)
489 # The only operation that can raise this exception is rread
490 if e.errno not in retrycodes:
493 if reconnect is not None:
494 # in UDP mode, sometimes connected sockets can return a connection refused
495 # on read. Give the caller a chance to reconnect
497 if hasattr(remote, 'fileno'):
498 remote_fd = remote.fileno()
500 # in UDP mode, we ignore errors - packet loss man...
502 traceback.print_exc(file=sys.stderr)
506 delta = tnow - lastbwtime
508 delta = int(bwlimit * delta)
510 bwfree = min(bwfree+delta, maxbwfree)
513 #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
515 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
516 rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
519 # TERMINATE is a array. An item can be added to TERMINATE, from
520 # outside this function to force termination of the loop
522 raise OSError, "Killed"
524 rsock.bind((local_addr, local_port))
527 # wait a while, retry
528 print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
529 time.sleep(min(30.0,retrydelay))
532 rsock.bind((local_addr, local_port))
533 print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
534 print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
535 rsock.connect((peer_addr, peer_port))
538 def udp_handshake(TERMINATE, rsock):
541 while not endme and not TERMINATE:
551 keepalive_thread = threading.Thread(target=keepalive)
552 keepalive_thread.start()
556 raise OSError, "Killed"
558 heartbeat = rsock.recv(10)
561 time.sleep(min(30.0,retrydelay))
564 heartbeat = rsock.recv(10)
566 keepalive_thread.join()
568 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
569 rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
571 udp_handshake(TERMINATE, rsock)
574 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
577 # The peer has a firewall that prevents a response to the connect, we
578 # will be forever blocked in the connect, so we put a reasonable timeout.
585 raise OSError, "Killed"
587 rsock.connect((peer_addr, peer_port))
591 # wait a while, retry
592 print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
593 time.sleep(min(30.0,retrydelay))
596 rsock.connect((peer_addr, peer_port))
602 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
605 # We try to bind to the local virtual interface.
606 # It might not exist yet so we wait in a loop.
611 raise OSError, "Killed"
613 lsock.bind((local_addr, local_port))
616 # wait a while, retry
617 print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
618 time.sleep(min(30.0,retrydelay))
621 lsock.bind((local_addr, local_port))
623 # Now we wait until the other side connects.
624 # The other side might not be ready yet, so we also wait in a loop for timeouts.
629 raise OSError, "Killed"
630 rlist, wlist, xlist = select.select([lsock], [], [], timeout)
634 sock,raddr = lsock.accept()
639 def tcp_handshake(TERMINATE, sock, listen, dice):
640 # we are going to use a barrier algorithm to decide wich side listen.
641 # each side will "roll a dice" and send the resulting value to the other
645 for i in xrange(100):
647 raise OSError, "Killed"
651 peer_hand = sock.recv(1)
656 elif hand > peer_hand:
670 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
671 def listen(stop, result, lsock, dice):
672 lsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
674 lsock = tcp_handshake(TERMINATE, lsock, True, dice)
679 def connect(stop, result, rsock, dice):
680 rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
682 rsock = tcp_handshake(TERMINATE, rsock, False, dice)
691 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
692 rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
693 connect_thread = threading.Thread(target=connect, args=(stop, result, rsock, dice))
694 listen_thread = threading.Thread(target=listen, args=(stop, result, lsock, dice))
695 connect_thread.start()
696 listen_thread.start()
697 connect_thread.join()
700 raise OSError, "Error: tcp_establish could not establish connection."
710 self._condition = threading.Condition(threading.Lock())
715 self._condition.acquire()
719 self._condition.release()
723 self._condition.acquire()
725 if self._readers > 0:
727 if self._readers == 0:
728 self._condition.notifyAll()
730 self._condition.release()
733 self._condition.acquire()
735 while self._readers > 0:
736 self._condition.wait()
737 self._value = random.randint(1, 6)
739 self._condition.release()