2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Claudio Freire <claudio-daniel.freire@inria.fr>
21 from __future__ import print_function
40 return '.'.join(str(ord(x)) for x in ip.decode("hex"))
46 '8863' : 'PPPoE discover',
51 def etherProto(packet, len=len):
53 if packet[12] == "\x81" and packet[13] == "\x00":
62 def formatPacket(packet, ether_mode):
64 stripped_packet = etherStrip(packet)
65 if not stripped_packet:
66 packet = packet.encode("hex")
68 return "malformed eth " + packet.encode("hex")
70 if packet[24:28] == "8100":
72 ethertype = tagtype.get(packet[32:36], 'eth')
73 return ethertype + " " + ( '-'.join( (
74 packet[0:12], # MAC dest
75 packet[12:24], # MAC src
76 packet[24:32], # VLAN tag
77 packet[32:36], # Ethertype/len
78 packet[36:], # Payload
82 ethertype = tagtype.get(packet[24:28], 'eth')
83 return ethertype + " " + ( '-'.join( (
84 packet[0:12], # MAC dest
85 packet[12:24], # MAC src
86 packet[24:28], # Ethertype/len
87 packet[28:], # Payload
90 packet = stripped_packet
91 packet = packet.encode("hex")
93 return "malformed ip " + packet
95 return "ip " + ( '-'.join( (
97 packet[1:2], #header length
98 packet[2:4], #diffserv/ECN
99 packet[4:8], #total length
101 packet[12:16], #flags/fragment offs
103 packet[18:20], #ip-proto
104 packet[20:24], #checksum
105 ipfmt(packet[24:32]), # src-ip
106 ipfmt(packet[32:40]), # dst-ip
107 packet[40:48] if (int(packet[1],16) > 5) else "", # options
108 packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
111 def _packetReady(buf, ether_mode=False, len=len, str=str):
122 _,totallen = struct.unpack('HH',buf[0][:4])
123 totallen = socket.htons(totallen)
124 rv = len(buf[0]) >= totallen
125 if not rv and len(buf) > 1:
126 # collapse only first two buffers
127 # as needed, to mantain len(buf) meaningful
129 buf[0] = p1+str(buf[0])
134 def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer):
138 _,totallen = struct.unpack('HH',buf[0][:4])
139 totallen = socket.htons(totallen)
140 if len(buf[0]) > totallen:
141 rv = buffer(buf[0],0,totallen)
142 buf[0] = buffer(buf[0],totallen)
147 def etherStrip(buf, buffer=buffer, len=len):
150 if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
151 # tagged ethernet frame
152 return buffer(buf, 18)
153 elif buf[12:14] == '\x08\x00':
154 # untagged ethernet frame
155 return buffer(buf, 14)
159 def etherWrap(packet):
161 "\x00"*6*2 # bogus src and dst mac
164 "\x00"*4, # bogus crc
167 def piStrip(buf, len=len):
173 def piWrap(buf, ether_mode, etherProto=etherProto):
175 proto = etherProto(buf)
179 "\x00\x00", # PI: 16 bits flags
180 proto, # 16 bits proto
184 _padmap = [ chr(padding) * padding for padding in range(127) ]
187 def encrypt(packet, crypter, len=len, padmap=_padmap):
189 padding = crypter.block_size - len(packet) % crypter.block_size
190 packet += padmap[padding]
193 return crypter.encrypt(packet)
195 def decrypt(packet, crypter, ord=ord):
198 packet = crypter.decrypt(packet)
201 padding = ord(packet[-1])
202 if not (0 < padding <= crypter.block_size):
204 raise RuntimeError("Truncated packet %s")
205 packet = packet[:-padding]
211 fl = fcntl.fcntl(fd, fcntl.F_GETFL)
213 fcntl.fcntl(fd, fcntl.F_SETFL, fl)
216 traceback.print_exc(file=sys.stderr)
220 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND,
221 stderr = sys.stderr, reconnect = None, rwrite = None, rread = None,
222 tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None,
223 accept_remote = None, slowlocal = True, queueclass = None,
224 bwlimit = None, len = len, max = max, min = min, buffer = buffer,
225 OSError = OSError, select = select.select, selecterror = select.error,
226 os = os, socket = socket,
227 retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
232 if cipher_key and cipher:
235 __import__('Crypto.Cipher.'+cipher)
238 cipher = getattr(Crypto.Cipher, cipher)
239 hashed_key = hashlib.sha256(cipher_key).digest()
241 if ciphername == 'AES':
242 hashed_key = hashed_key[:16]
243 elif ciphername == 'Blowfish':
244 hashed_key = hashed_key[:24]
245 elif ciphername == 'DES':
246 hashed_key = hashed_key[:8]
247 elif ciphername == 'DES3':
248 hashed_key = hashed_key[:24]
250 crypter = cipher.new(
255 # We don't want decription to work only on one side,
256 # This could break things really bad
259 traceback.print_exc(file=sys.stderr)
262 if stderr is not None:
264 print("Packets are transmitted in CIPHER", file=stderr)
266 print("Packets are transmitted in PLAINTEXT", file=stderr)
268 if hasattr(remote, 'fileno'):
269 remote_fd = remote.fileno()
271 def rwrite(remote, packet, os_write=os.write):
272 return os_write(remote_fd, packet)
274 def rread(remote, maxlen, os_read=os.read):
275 return os_read(remote_fd, maxlen)
277 rnonblock = nonblock(remote)
278 tnonblock = nonblock(tun)
280 # Pick up TUN/TAP writing method
285 # We have iovec, so we can skip PI injection
286 # and use iovec which does it natively
288 twrite = iovec.ethpiwrite
289 tread = iovec.piread2
291 twrite = iovec.ippiwrite
292 tread = iovec.piread2
294 # We have to inject PI headers pythonically
295 def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
296 return oswrite(fd, piWrap(packet, ether_mode))
298 # For reading, we strip PI headers with buffer slicing and that's it
299 def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
300 return piStrip(osread(fd, maxlen))
302 # No need to inject PI headers
310 if accept_local is not None:
311 def tread(fd, maxlen, _tread=tread, accept=accept_local):
312 packet = _tread(fd, maxlen)
313 if accept(packet, 0):
318 if accept_remote is not None:
320 def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
321 packet = decrypt_(packet, crypter)
322 if accept(packet, 1):
327 def rread(fd, maxlen, _rread=rread, accept=accept_remote):
328 packet = _rread(fd, maxlen)
329 if accept(packet, 1):
334 maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
335 tunhurry = max(0,maxbkbuf/2)
337 if queueclass is None:
338 queueclass = collections.deque
342 maxfwbuf = maxbkbuf = 2000000000
352 # backwards queue functions
353 # they may need packet inspection to
354 # reconstruct packet boundaries
355 if ether_mode or udp:
357 pullPacket = queueclass.popleft
358 reschedule = queueclass.appendleft
360 packetReady = _packetReady
361 pullPacket = _pullPacket
362 reschedule = queueclass.appendleft
364 # forward queue functions
365 # no packet inspection needed
367 fpullPacket = queueclass.popleft
368 freschedule = queueclass.appendleft
375 maxbwfree = bwfree = 1500 * tunqueue
382 # The SUSPEND flag has been set. This means we need to wait on
383 # the SUSPEND condition until it is released.
384 while SUSPEND and not TERMINATE:
388 if packetReady(bkbuf):
390 if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0):
394 if len(fwbuf) < maxfwbuf:
396 if remoteok and len(bkbuf) < maxbkbuf:
405 rdrdy, wrdy, errs = select(rset,wset,eset,1)
406 except selecterror as e:
407 if e.args[0] == errno.EINTR:
411 traceback.print_exc(file=sys.stderr)
412 # If the SUSPEND flag has been set, then the TUN will be in a bad
413 # state and the select error should be ignores.
421 if reconnect is not None and remote in errs and tun not in errs:
423 if hasattr(remote, 'fileno'):
424 remote_fd = remote.fileno()
425 elif udp and remote in errs and tun not in errs:
426 # In UDP mode, those are always transient errors
427 # Usually, an error will imply a read-ready socket
428 # that will raise an "Connection refused" error, so
429 # disable read-readiness just for now, and retry
438 # check to see if we can write
439 #rr = wr = rt = wt = 0
444 for x in range(maxbatch):
445 packet = pullPacket(fwbuf)
448 packet = encrypt_(packet, crypter)
450 sentnow = rwrite(remote, packet)
454 if not udp and 0 <= sentnow < len(packet):
455 # packet partially sent
456 # reschedule the remaining part
457 # this doesn't happen ever in udp mode
458 freschedule(fwbuf, buffer(packet,sentnow))
460 if not rnonblock or not fpacketReady(fwbuf):
463 # This except handles the entire While block on PURPOSE
464 # as an optimization (setting a try/except block is expensive)
465 # The only operation that can raise this exception is rwrite
466 if e.errno in retrycodes:
468 freschedule(fwbuf, packet)
472 if reconnect is not None:
473 # in UDP mode, sometimes connected sockets can return a connection refused.
474 # Give the caller a chance to reconnect
476 if hasattr(remote, 'fileno'):
477 remote_fd = remote.fileno()
479 # in UDP mode, we ignore errors - packet loss man...
481 #traceback.print_exc(file=sys.stderr)
487 for x in range(maxtbatch):
488 packet = pullPacket(bkbuf)
489 twrite(tunfd, packet)
492 # Do not inject packets into the TUN faster than they arrive, unless we're falling
493 # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
494 # don't block either (they're always ready to write), so if we flood the device
495 # we'll have high packet loss.
496 if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
500 # Give some time for the kernel to process the packets
503 # This except handles the entire While block on PURPOSE
504 # as an optimization (setting a try/except block is expensive)
505 # The only operation that can raise this exception is os_write
506 if e.errno in retrycodes:
508 reschedule(bkbuf, packet)
512 # check incoming data packets
515 for x in range(maxbatch):
516 packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
522 if not tnonblock or len(fwbuf) >= maxfwbuf:
525 # This except handles the entire While block on PURPOSE
526 # as an optimization (setting a try/except block is expensive)
527 # The only operation that can raise this exception is os_read
528 if e.errno not in retrycodes:
533 for x in range(maxbatch):
534 packet = rread(remote,2000)
539 packet = decrypt_(packet, crypter)
543 if not udp and packet == "":
544 # Connection broken, try to reconnect (or just die)
545 raise RuntimeError("Connection broken")
551 if not rnonblock or len(bkbuf) >= maxbkbuf:
554 # This except handles the entire While block on PURPOSE
555 # as an optimization (setting a try/except block is expensive)
556 # The only operation that can raise this exception is rread
557 if e.errno not in retrycodes:
559 except Exception as e:
560 if reconnect is not None:
561 # in UDP mode, sometimes connected sockets can return a connection refused
562 # on read. Give the caller a chance to reconnect
564 if hasattr(remote, 'fileno'):
565 remote_fd = remote.fileno()
567 # in UDP mode, we ignore errors - packet loss man...
569 traceback.print_exc(file=sys.stderr)
573 delta = tnow - lastbwtime
575 delta = int(bwlimit * delta)
577 bwfree = min(bwfree+delta, maxbwfree)
580 #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
582 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
583 rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
586 # TERMINATE is a array. An item can be added to TERMINATE, from
587 # outside this function to force termination of the loop
589 raise OSError("Killed")
591 rsock.bind((local_addr, local_port))
594 # wait a while, retry
595 print("%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),), file=sys.stderr)
596 time.sleep(min(30.0,retrydelay))
599 rsock.bind((local_addr, local_port))
600 print("Listening UDP at: %s:%d" % (local_addr, local_port), file=sys.stderr)
601 print("Connecting UDP to: %s:%d" % (peer_addr, peer_port), file=sys.stderr)
602 rsock.connect((peer_addr, peer_port))
605 def udp_handshake(TERMINATE, rsock):
608 while not endme and not TERMINATE:
618 keepalive_thread = threading.Thread(target=keepalive)
619 keepalive_thread.start()
622 raise OSError("Killed")
624 heartbeat = rsock.recv(10)
629 heartbeat = rsock.recv(10)
631 keepalive_thread.join()
633 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
634 rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
636 udp_handshake(TERMINATE, rsock)
639 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
642 # The peer has a firewall that prevents a response to the connect, we
643 # will be forever blocked in the connect, so we put a reasonable timeout.
650 raise OSError("Killed")
652 rsock.connect((peer_addr, peer_port))
656 # wait a while, retry
657 print("%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),), file=sys.stderr)
658 time.sleep(min(30.0,retrydelay))
661 rsock.connect((peer_addr, peer_port))
664 print("tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port), file=sys.stderr)
667 print("tcp_connect: disabling NAGLE", file=sys.stderr)
668 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
671 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
674 # We try to bind to the local virtual interface.
675 # It might not exist yet so we wait in a loop.
680 raise OSError("Killed")
682 lsock.bind((local_addr, local_port))
685 # wait a while, retry
686 print("%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),), file=sys.stderr)
687 time.sleep(min(30.0,retrydelay))
690 lsock.bind((local_addr, local_port))
692 print("tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port), file=sys.stderr)
693 # Now we wait until the other side connects.
694 # The other side might not be ready yet, so we also wait in a loop for timeouts.
699 raise OSError("Killed")
700 rlist, wlist, xlist = select.select([lsock], [], [], timeout)
704 sock,raddr = lsock.accept()
705 print("tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port), file=sys.stderr)
710 def tcp_handshake(rsock, listen, hand):
711 # we are going to use a barrier algorithm to decide wich side listen.
712 # each side will "roll a dice" and send the resulting value to the other
718 peer_hand = rsock.recv(4)
720 print("tcp_handshake: connection reset by peer", file=sys.stderr)
723 print("tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand), file=sys.stderr)
727 elif hand > peer_hand:
734 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
735 def listen(stop, hand, lsock, lresult):
737 rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
739 win = tcp_handshake(rsock, True, hand)
741 lresult.append((win, rsock))
743 def connect(stop, hand, rsock, rresult):
745 rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
747 win = tcp_handshake(rsock, False, hand)
749 rresult.append((win, rsock))
753 for i in range(0, 50):
757 raise OSError("Killed")
758 hand = struct.pack("!L", random.randint(0, 2**30))
762 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
763 rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
764 listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
765 connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
766 connect_thread.start()
767 listen_thread.start()
768 connect_thread.join()
770 (lwin, lrsock) = lresult[0]
771 (rwin, rrsock) = rresult[0]
772 if not lrsock or not rrsock:
778 # both socket are connected
788 raise OSError("Error: tcp_establish could not establish connection.")