2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Claudio Freire <claudio-daniel.freire@inria.fr>
40 ipbytes = map(ord,ip.decode("hex"))
41 return '.'.join(map(str,ipbytes))
47 '8863' : 'PPPoE discover',
52 def etherProto(packet, len=len):
54 if packet[12] == "\x81" and packet[13] == "\x00":
63 def formatPacket(packet, ether_mode):
65 stripped_packet = etherStrip(packet)
66 if not stripped_packet:
67 packet = packet.encode("hex")
69 return "malformed eth " + packet.encode("hex")
71 if packet[24:28] == "8100":
73 ethertype = tagtype.get(packet[32:36], 'eth')
74 return ethertype + " " + ( '-'.join( (
75 packet[0:12], # MAC dest
76 packet[12:24], # MAC src
77 packet[24:32], # VLAN tag
78 packet[32:36], # Ethertype/len
79 packet[36:], # Payload
83 ethertype = tagtype.get(packet[24:28], 'eth')
84 return ethertype + " " + ( '-'.join( (
85 packet[0:12], # MAC dest
86 packet[12:24], # MAC src
87 packet[24:28], # Ethertype/len
88 packet[28:], # Payload
91 packet = stripped_packet
92 packet = packet.encode("hex")
94 return "malformed ip " + packet
96 return "ip " + ( '-'.join( (
98 packet[1:2], #header length
99 packet[2:4], #diffserv/ECN
100 packet[4:8], #total length
102 packet[12:16], #flags/fragment offs
104 packet[18:20], #ip-proto
105 packet[20:24], #checksum
106 ipfmt(packet[24:32]), # src-ip
107 ipfmt(packet[32:40]), # dst-ip
108 packet[40:48] if (int(packet[1],16) > 5) else "", # options
109 packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
112 def _packetReady(buf, ether_mode=False, len=len, str=str):
123 _,totallen = struct.unpack('HH',buf[0][:4])
124 totallen = socket.htons(totallen)
125 rv = len(buf[0]) >= totallen
126 if not rv and len(buf) > 1:
127 # collapse only first two buffers
128 # as needed, to mantain len(buf) meaningful
130 buf[0] = p1+str(buf[0])
135 def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer):
139 _,totallen = struct.unpack('HH',buf[0][:4])
140 totallen = socket.htons(totallen)
141 if len(buf[0]) > totallen:
142 rv = buffer(buf[0],0,totallen)
143 buf[0] = buffer(buf[0],totallen)
148 def etherStrip(buf, buffer=buffer, len=len):
151 if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
152 # tagged ethernet frame
153 return buffer(buf, 18)
154 elif buf[12:14] == '\x08\x00':
155 # untagged ethernet frame
156 return buffer(buf, 14)
160 def etherWrap(packet):
162 "\x00"*6*2 # bogus src and dst mac
165 "\x00"*4, # bogus crc
168 def piStrip(buf, len=len):
174 def piWrap(buf, ether_mode, etherProto=etherProto):
176 proto = etherProto(buf)
180 "\x00\x00", # PI: 16 bits flags
181 proto, # 16 bits proto
185 _padmap = [ chr(padding) * padding for padding in xrange(127) ]
188 def encrypt(packet, crypter, len=len, padmap=_padmap):
190 padding = crypter.block_size - len(packet) % crypter.block_size
191 packet += padmap[padding]
194 return crypter.encrypt(packet)
196 def decrypt(packet, crypter, ord=ord):
199 packet = crypter.decrypt(packet)
202 padding = ord(packet[-1])
203 if not (0 < padding <= crypter.block_size):
205 raise RuntimeError, "Truncated packet %s"
206 packet = packet[:-padding]
212 fl = fcntl.fcntl(fd, fcntl.F_GETFL)
214 fcntl.fcntl(fd, fcntl.F_SETFL, fl)
217 traceback.print_exc(file=sys.stderr)
221 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND,
222 stderr = sys.stderr, reconnect = None, rwrite = None, rread = None,
223 tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None,
224 accept_remote = None, slowlocal = True, queueclass = None,
225 bwlimit = None, len = len, max = max, min = min, buffer = buffer,
226 OSError = OSError, select = select.select, selecterror = select.error,
227 os = os, socket = socket,
228 retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
233 if cipher_key and cipher:
236 __import__('Crypto.Cipher.'+cipher)
239 cipher = getattr(Crypto.Cipher, cipher)
240 hashed_key = hashlib.sha256(cipher_key).digest()
242 if ciphername == 'AES':
243 hashed_key = hashed_key[:16]
244 elif ciphername == 'Blowfish':
245 hashed_key = hashed_key[:24]
246 elif ciphername == 'DES':
247 hashed_key = hashed_key[:8]
248 elif ciphername == 'DES3':
249 hashed_key = hashed_key[:24]
251 crypter = cipher.new(
256 # We don't want decription to work only on one side,
257 # This could break things really bad
260 traceback.print_exc(file=sys.stderr)
263 if stderr is not None:
265 print >>stderr, "Packets are transmitted in CIPHER"
267 print >>stderr, "Packets are transmitted in PLAINTEXT"
269 if hasattr(remote, 'fileno'):
270 remote_fd = remote.fileno()
272 def rwrite(remote, packet, os_write=os.write):
273 return os_write(remote_fd, packet)
275 def rread(remote, maxlen, os_read=os.read):
276 return os_read(remote_fd, maxlen)
278 rnonblock = nonblock(remote)
279 tnonblock = nonblock(tun)
281 # Pick up TUN/TAP writing method
286 # We have iovec, so we can skip PI injection
287 # and use iovec which does it natively
289 twrite = iovec.ethpiwrite
290 tread = iovec.piread2
292 twrite = iovec.ippiwrite
293 tread = iovec.piread2
295 # We have to inject PI headers pythonically
296 def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
297 return oswrite(fd, piWrap(packet, ether_mode))
299 # For reading, we strip PI headers with buffer slicing and that's it
300 def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
301 return piStrip(osread(fd, maxlen))
303 # No need to inject PI headers
311 if accept_local is not None:
312 def tread(fd, maxlen, _tread=tread, accept=accept_local):
313 packet = _tread(fd, maxlen)
314 if accept(packet, 0):
319 if accept_remote is not None:
321 def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
322 packet = decrypt_(packet, crypter)
323 if accept(packet, 1):
328 def rread(fd, maxlen, _rread=rread, accept=accept_remote):
329 packet = _rread(fd, maxlen)
330 if accept(packet, 1):
335 maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
336 tunhurry = max(0,maxbkbuf/2)
338 if queueclass is None:
339 queueclass = collections.deque
343 maxfwbuf = maxbkbuf = 2000000000
353 # backwards queue functions
354 # they may need packet inspection to
355 # reconstruct packet boundaries
356 if ether_mode or udp:
358 pullPacket = queueclass.popleft
359 reschedule = queueclass.appendleft
361 packetReady = _packetReady
362 pullPacket = _pullPacket
363 reschedule = queueclass.appendleft
365 # forward queue functions
366 # no packet inspection needed
368 fpullPacket = queueclass.popleft
369 freschedule = queueclass.appendleft
376 maxbwfree = bwfree = 1500 * tunqueue
383 # The SUSPEND flag has been set. This means we need to wait on
384 # the SUSPEND condition until it is released.
385 while SUSPEND and not TERMINATE:
389 if packetReady(bkbuf):
391 if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0):
395 if len(fwbuf) < maxfwbuf:
397 if remoteok and len(bkbuf) < maxbkbuf:
406 rdrdy, wrdy, errs = select(rset,wset,eset,1)
407 except selecterror, e:
408 if e.args[0] == errno.EINTR:
412 traceback.print_exc(file=sys.stderr)
413 # If the SUSPEND flag has been set, then the TUN will be in a bad
414 # state and the select error should be ignores.
422 if reconnect is not None and remote in errs and tun not in errs:
424 if hasattr(remote, 'fileno'):
425 remote_fd = remote.fileno()
426 elif udp and remote in errs and tun not in errs:
427 # In UDP mode, those are always transient errors
428 # Usually, an error will imply a read-ready socket
429 # that will raise an "Connection refused" error, so
430 # disable read-readiness just for now, and retry
439 # check to see if we can write
440 #rr = wr = rt = wt = 0
445 for x in xrange(maxbatch):
446 packet = pullPacket(fwbuf)
449 packet = encrypt_(packet, crypter)
451 sentnow = rwrite(remote, packet)
455 if not udp and 0 <= sentnow < len(packet):
456 # packet partially sent
457 # reschedule the remaining part
458 # this doesn't happen ever in udp mode
459 freschedule(fwbuf, buffer(packet,sentnow))
461 if not rnonblock or not fpacketReady(fwbuf):
464 # This except handles the entire While block on PURPOSE
465 # as an optimization (setting a try/except block is expensive)
466 # The only operation that can raise this exception is rwrite
467 if e.errno in retrycodes:
469 freschedule(fwbuf, packet)
473 if reconnect is not None:
474 # in UDP mode, sometimes connected sockets can return a connection refused.
475 # Give the caller a chance to reconnect
477 if hasattr(remote, 'fileno'):
478 remote_fd = remote.fileno()
480 # in UDP mode, we ignore errors - packet loss man...
482 #traceback.print_exc(file=sys.stderr)
488 for x in xrange(maxtbatch):
489 packet = pullPacket(bkbuf)
490 twrite(tunfd, packet)
493 # Do not inject packets into the TUN faster than they arrive, unless we're falling
494 # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
495 # don't block either (they're always ready to write), so if we flood the device
496 # we'll have high packet loss.
497 if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
501 # Give some time for the kernel to process the packets
504 # This except handles the entire While block on PURPOSE
505 # as an optimization (setting a try/except block is expensive)
506 # The only operation that can raise this exception is os_write
507 if e.errno in retrycodes:
509 reschedule(bkbuf, packet)
513 # check incoming data packets
516 for x in xrange(maxbatch):
517 packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
523 if not tnonblock or len(fwbuf) >= maxfwbuf:
526 # This except handles the entire While block on PURPOSE
527 # as an optimization (setting a try/except block is expensive)
528 # The only operation that can raise this exception is os_read
529 if e.errno not in retrycodes:
534 for x in xrange(maxbatch):
535 packet = rread(remote,2000)
540 packet = decrypt_(packet, crypter)
544 if not udp and packet == "":
545 # Connection broken, try to reconnect (or just die)
546 raise RuntimeError, "Connection broken"
552 if not rnonblock or len(bkbuf) >= maxbkbuf:
555 # This except handles the entire While block on PURPOSE
556 # as an optimization (setting a try/except block is expensive)
557 # The only operation that can raise this exception is rread
558 if e.errno not in retrycodes:
561 if reconnect is not None:
562 # in UDP mode, sometimes connected sockets can return a connection refused
563 # on read. Give the caller a chance to reconnect
565 if hasattr(remote, 'fileno'):
566 remote_fd = remote.fileno()
568 # in UDP mode, we ignore errors - packet loss man...
570 traceback.print_exc(file=sys.stderr)
574 delta = tnow - lastbwtime
576 delta = int(bwlimit * delta)
578 bwfree = min(bwfree+delta, maxbwfree)
581 #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
583 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
584 rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
587 # TERMINATE is a array. An item can be added to TERMINATE, from
588 # outside this function to force termination of the loop
590 raise OSError, "Killed"
592 rsock.bind((local_addr, local_port))
595 # wait a while, retry
596 print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
597 time.sleep(min(30.0,retrydelay))
600 rsock.bind((local_addr, local_port))
601 print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
602 print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
603 rsock.connect((peer_addr, peer_port))
606 def udp_handshake(TERMINATE, rsock):
609 while not endme and not TERMINATE:
619 keepalive_thread = threading.Thread(target=keepalive)
620 keepalive_thread.start()
621 for i in xrange(900):
623 raise OSError, "Killed"
625 heartbeat = rsock.recv(10)
630 heartbeat = rsock.recv(10)
632 keepalive_thread.join()
634 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
635 rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
637 udp_handshake(TERMINATE, rsock)
640 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
643 # The peer has a firewall that prevents a response to the connect, we
644 # will be forever blocked in the connect, so we put a reasonable timeout.
651 raise OSError, "Killed"
653 rsock.connect((peer_addr, peer_port))
657 # wait a while, retry
658 print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
659 time.sleep(min(30.0,retrydelay))
662 rsock.connect((peer_addr, peer_port))
665 print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
668 print >>sys.stderr, "tcp_connect: disabling NAGLE"
669 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
672 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
675 # We try to bind to the local virtual interface.
676 # It might not exist yet so we wait in a loop.
681 raise OSError, "Killed"
683 lsock.bind((local_addr, local_port))
686 # wait a while, retry
687 print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
688 time.sleep(min(30.0,retrydelay))
691 lsock.bind((local_addr, local_port))
693 print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
694 # Now we wait until the other side connects.
695 # The other side might not be ready yet, so we also wait in a loop for timeouts.
700 raise OSError, "Killed"
701 rlist, wlist, xlist = select.select([lsock], [], [], timeout)
705 sock,raddr = lsock.accept()
706 print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
711 def tcp_handshake(rsock, listen, hand):
712 # we are going to use a barrier algorithm to decide wich side listen.
713 # each side will "roll a dice" and send the resulting value to the other
719 peer_hand = rsock.recv(4)
721 print >>sys.stderr, "tcp_handshake: connection reset by peer"
724 print >>sys.stderr, "tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand)
728 elif hand > peer_hand:
735 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
736 def listen(stop, hand, lsock, lresult):
738 rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
740 win = tcp_handshake(rsock, True, hand)
742 lresult.append((win, rsock))
744 def connect(stop, hand, rsock, rresult):
746 rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
748 win = tcp_handshake(rsock, False, hand)
750 rresult.append((win, rsock))
754 for i in xrange(0, 50):
758 raise OSError, "Killed"
759 hand = struct.pack("!L", random.randint(0, 2**30))
763 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
764 rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
765 listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
766 connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
767 connect_thread.start()
768 listen_thread.start()
769 connect_thread.join()
771 (lwin, lrsock) = lresult[0]
772 (rwin, rrsock) = rresult[0]
773 if not lrsock or not rrsock:
779 # both socket are connected
789 raise OSError, "Error: tcp_establish could not establish connection."