439649e6ce91076d450545e9ffa1974b998f133b
[nepi.git] / src / nepi / resources / linux / scripts / tunchannel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19 #
20
21
22 import select
23 import sys
24 import os
25 import struct
26 import socket
27 import threading
28 import traceback
29 import errno
30 import fcntl
31 import random
32 import traceback
33 import functools
34 import collections
35 import ctypes
36 import time
37
38 def ipfmt(ip):
39     ipbytes = map(ord,ip.decode("hex"))
40     return '.'.join(map(str,ipbytes))
41
42 tagtype = {
43     '0806' : 'arp',
44     '0800' : 'ipv4',
45     '8870' : 'jumbo',
46     '8863' : 'PPPoE discover',
47     '8864' : 'PPPoE',
48     '86dd' : 'ipv6',
49 }
50
51 def etherProto(packet, len=len):
52     if len(packet) > 14:
53         if packet[12] == "\x81" and packet[13] == "\x00":
54             # tagged
55             return packet[16:18]
56         else:
57             # untagged
58             return packet[12:14]
59     # default: ip
60     return "\x08\x00"
61
62 def formatPacket(packet, ether_mode):
63     if ether_mode:
64         stripped_packet = etherStrip(packet)
65         if not stripped_packet:
66             packet = packet.encode("hex")
67             if len(packet) < 28:
68                 return "malformed eth " + packet.encode("hex")
69             else:
70                 if packet[24:28] == "8100":
71                     # tagged
72                     ethertype = tagtype.get(packet[32:36], 'eth')
73                     return ethertype + " " + ( '-'.join( (
74                         packet[0:12], # MAC dest
75                         packet[12:24], # MAC src
76                         packet[24:32], # VLAN tag
77                         packet[32:36], # Ethertype/len
78                         packet[36:], # Payload
79                     ) ) )
80                 else:
81                     # untagged
82                     ethertype = tagtype.get(packet[24:28], 'eth')
83                     return ethertype + " " + ( '-'.join( (
84                         packet[0:12], # MAC dest
85                         packet[12:24], # MAC src
86                         packet[24:28], # Ethertype/len
87                         packet[28:], # Payload
88                     ) ) )
89         else:
90             packet = stripped_packet
91     packet = packet.encode("hex")
92     if len(packet) < 48:
93         return "malformed ip " + packet
94     else:
95         return "ip " + ( '-'.join( (
96             packet[0:1], #version
97             packet[1:2], #header length
98             packet[2:4], #diffserv/ECN
99             packet[4:8], #total length
100             packet[8:12], #ident
101             packet[12:16], #flags/fragment offs
102             packet[16:18], #ttl
103             packet[18:20], #ip-proto
104             packet[20:24], #checksum
105             ipfmt(packet[24:32]), # src-ip
106             ipfmt(packet[32:40]), # dst-ip
107             packet[40:48] if (int(packet[1],16) > 5) else "", # options
108             packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
109         ) ) )
110
111 def _packetReady(buf, ether_mode=False, len=len, str=str):
112     if not buf:
113         return False
114         
115     rv = False
116     while not rv:
117         if len(buf[0]) < 4:
118             rv = False
119         elif ether_mode:
120             rv = True
121         else:
122             _,totallen = struct.unpack('HH',buf[0][:4])
123             totallen = socket.htons(totallen)
124             rv = len(buf[0]) >= totallen
125         if not rv and len(buf) > 1:
126             # collapse only first two buffers
127             # as needed, to mantain len(buf) meaningful
128             p1 = buf.popleft()
129             buf[0] = p1+str(buf[0])
130         else:
131             return rv
132     return rv
133
134 def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer):
135     if ether_mode:
136         return buf.popleft()
137     else:
138         _,totallen = struct.unpack('HH',buf[0][:4])
139         totallen = socket.htons(totallen)
140         if len(buf[0]) > totallen:
141             rv = buffer(buf[0],0,totallen)
142             buf[0] = buffer(buf[0],totallen)
143         else:
144             rv = buf.popleft()
145         return rv
146
147 def etherStrip(buf, buffer=buffer, len=len):
148     if len(buf) < 14:
149         return ""
150     if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
151         # tagged ethernet frame
152         return buffer(buf, 18)
153     elif buf[12:14] == '\x08\x00':
154         # untagged ethernet frame
155         return buffer(buf, 14)
156     else:
157         return ""
158
159 def etherWrap(packet):
160     return ''.join((
161         "\x00"*6*2 # bogus src and dst mac
162         +"\x08\x00", # IPv4
163         packet, # payload
164         "\x00"*4, # bogus crc
165     ))
166
167 def piStrip(buf, len=len):
168     if len(buf) < 4:
169         return buf
170     else:
171         return buffer(buf,4)
172     
173 def piWrap(buf, ether_mode, etherProto=etherProto):
174     if ether_mode:
175         proto = etherProto(buf)
176     else:
177         proto = "\x08\x00"
178     return ''.join((
179         "\x00\x00", # PI: 16 bits flags
180         proto, # 16 bits proto
181         buf,
182     ))
183
184 _padmap = [ chr(padding) * padding for padding in xrange(127) ]
185 del padding
186
187 def encrypt(packet, crypter, len=len, padmap=_padmap):
188     # pad
189     padding = crypter.block_size - len(packet) % crypter.block_size
190     packet += padmap[padding]
191     
192     # encrypt
193     return crypter.encrypt(packet)
194
195 def decrypt(packet, crypter, ord=ord):
196     if packet:
197         # decrypt
198         packet = crypter.decrypt(packet)
199         
200         # un-pad
201         padding = ord(packet[-1])
202         if not (0 < padding <= crypter.block_size):
203             # wrong padding
204             raise RuntimeError, "Truncated packet %s"
205         packet = packet[:-padding]
206     
207     return packet
208
209 def nonblock(fd):
210     try:
211         fl = fcntl.fcntl(fd, fcntl.F_GETFL)
212         fl |= os.O_NONBLOCK
213         fcntl.fcntl(fd, fcntl.F_SETFL, fl)
214         return True
215     except:
216         traceback.print_exc(file=sys.stderr)
217         # Just ignore
218         return False
219
220 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND,
221         stderr = sys.stderr, reconnect = None, rwrite = None, rread = None,
222         tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None, 
223         accept_remote = None, slowlocal = True, queueclass = None, 
224         bwlimit = None, len = len, max = max, min = min, buffer = buffer,
225         OSError = OSError, select = select.select, selecterror = select.error, 
226         os = os, socket = socket,
227         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
228     crypto_mode = False
229     crypter = None
230
231     try:
232         if cipher_key and cipher:
233             import Crypto.Cipher
234             import hashlib
235             __import__('Crypto.Cipher.'+cipher)
236             
237             ciphername = cipher
238             cipher = getattr(Crypto.Cipher, cipher)
239             hashed_key = hashlib.sha256(cipher_key).digest()
240
241             if ciphername == 'AES':
242                 hashed_key = hashed_key[:16]
243             elif ciphername == 'Blowfish':
244                 hashed_key = hashed_key[:24]
245             elif ciphername == 'DES':
246                 hashed_key = hashed_key[:8]
247             elif ciphername == 'DES3':
248                 hashed_key = hashed_key[:24]
249
250             crypter = cipher.new(
251                 hashed_key, 
252                 cipher.MODE_ECB)
253             crypto_mode = True
254     except:
255         # We don't want decription to work only on one side,
256         # This could break things really bad
257         #crypto_mode = False
258         #crypter = None
259         traceback.print_exc(file=sys.stderr)
260         raise
261
262     if stderr is not None:
263         if crypto_mode:
264             print >>stderr, "Packets are transmitted in CIPHER"
265         else:
266             print >>stderr, "Packets are transmitted in PLAINTEXT"
267     
268     if hasattr(remote, 'fileno'):
269         remote_fd = remote.fileno()
270         if rwrite is None:
271             def rwrite(remote, packet, os_write=os.write):
272                 return os_write(remote_fd, packet)
273         if rread is None:
274             def rread(remote, maxlen, os_read=os.read):
275                 return os_read(remote_fd, maxlen)
276  
277     rnonblock = nonblock(remote)
278     tnonblock = nonblock(tun)
279     
280     # Pick up TUN/TAP writing method
281     if with_pi:
282         try:
283             import iovec
284             
285             # We have iovec, so we can skip PI injection
286             # and use iovec which does it natively
287             if ether_mode:
288                 twrite = iovec.ethpiwrite
289                 tread = iovec.piread2
290             else:
291                 twrite = iovec.ippiwrite
292                 tread = iovec.piread2
293         except ImportError:
294             # We have to inject PI headers pythonically
295             def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
296                 return oswrite(fd, piWrap(packet, ether_mode))
297             
298             # For reading, we strip PI headers with buffer slicing and that's it
299             def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
300                 return piStrip(osread(fd, maxlen))
301     else:
302         # No need to inject PI headers
303         twrite = os.write
304         tread = os.read
305     
306     encrypt_ = encrypt
307     decrypt_ = decrypt
308     xrange_ = xrange
309
310     if accept_local is not None:
311         def tread(fd, maxlen, _tread=tread, accept=accept_local):
312             packet = _tread(fd, maxlen)
313             if accept(packet, 0):
314                 return packet
315             else:
316                 return None
317
318     if accept_remote is not None:
319         if crypto_mode:
320             def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
321                 packet = decrypt_(packet, crypter)
322                 if accept(packet, 1):
323                     return packet
324                 else:
325                     return None
326         else:
327             def rread(fd, maxlen, _rread=rread, accept=accept_remote):
328                 packet = _rread(fd, maxlen)
329                 if accept(packet, 1):
330                     return packet
331                 else:
332                     return None
333     
334     maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
335     tunhurry = max(0,maxbkbuf/2)
336     
337     if queueclass is None:
338         queueclass = collections.deque
339         maxbatch = 2000
340         maxtbatch = 50
341     else:
342         maxfwbuf = maxbkbuf = 2000000000
343         maxbatch = 50
344         maxtbatch = 30
345         tunhurry = 30
346     
347     fwbuf = queueclass()
348     bkbuf = queueclass()
349     nfwbuf = 0
350     nbkbuf = 0
351     
352     # backwards queue functions
353     # they may need packet inspection to 
354     # reconstruct packet boundaries
355     if ether_mode or udp:
356         packetReady = bool
357         pullPacket = queueclass.popleft
358         reschedule = queueclass.appendleft
359     else:
360         packetReady = _packetReady
361         pullPacket = _pullPacket
362         reschedule = queueclass.appendleft
363     
364     # forward queue functions
365     # no packet inspection needed
366     fpacketReady = bool
367     fpullPacket = queueclass.popleft
368     freschedule = queueclass.appendleft
369     
370     tunfd = tun.fileno()
371     os_read = os.read
372     os_write = os.write
373     
374     tget = time.time
375     maxbwfree = bwfree = 1500 * tunqueue
376     lastbwtime = tget()
377     
378     remoteok = True
379     
380     
381     while not TERMINATE:
382         # The SUSPEND flag has been set. This means we need to wait on
383         # the SUSPEND condition until it is released.
384         while SUSPEND and not TERMINATE:
385             time.sleep(0.5)
386
387         wset = []
388         if packetReady(bkbuf):
389             wset.append(tun)
390         if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0):
391             wset.append(remote)
392         
393         rset = []
394         if len(fwbuf) < maxfwbuf:
395             rset.append(tun)
396         if remoteok and len(bkbuf) < maxbkbuf:
397             rset.append(remote)
398         
399         if remoteok:
400             eset = (tun,remote)
401         else:
402             eset = (tun,)
403         
404         try:
405             rdrdy, wrdy, errs = select(rset,wset,eset,1)
406         except selecterror, e:
407             if e.args[0] == errno.EINTR:
408                 # just retry
409                 continue
410             else:
411                 traceback.print_exc(file=sys.stderr)
412                 # If the SUSPEND flag has been set, then the TUN will be in a bad
413                 # state and the select error should be ignores.
414                 if SUSPEND:
415                     continue
416                 else:
417                     raise
418
419         # check for errors
420         if errs:
421             if reconnect is not None and remote in errs and tun not in errs:
422                 remote = reconnect()
423                 if hasattr(remote, 'fileno'):
424                     remote_fd = remote.fileno()
425             elif udp and remote in errs and tun not in errs:
426                 # In UDP mode, those are always transient errors
427                 # Usually, an error will imply a read-ready socket
428                 # that will raise an "Connection refused" error, so
429                 # disable read-readiness just for now, and retry
430                 # the select
431                 remoteok = False
432                 continue
433             else:
434                 break
435         else:
436             remoteok = True
437         
438         # check to see if we can write
439         #rr = wr = rt = wt = 0
440         if remote in wrdy:
441             sent = 0
442             try:
443                 try:
444                     for x in xrange(maxbatch):
445                         packet = pullPacket(fwbuf)
446
447                         if crypto_mode:
448                             packet = encrypt_(packet, crypter)
449                         
450                         sentnow = rwrite(remote, packet)
451                         sent += sentnow
452                         #wr += 1
453                         
454                         if not udp and 0 <= sentnow < len(packet):
455                             # packet partially sent
456                             # reschedule the remaining part
457                             # this doesn't happen ever in udp mode
458                             freschedule(fwbuf, buffer(packet,sentnow))
459                         
460                         if not rnonblock or not fpacketReady(fwbuf):
461                             break
462                 except OSError,e:
463                     # This except handles the entire While block on PURPOSE
464                     # as an optimization (setting a try/except block is expensive)
465                     # The only operation that can raise this exception is rwrite
466                     if e.errno in retrycodes:
467                         # re-schedule packet
468                         freschedule(fwbuf, packet)
469                     else:
470                         raise
471             except:
472                 if reconnect is not None:
473                     # in UDP mode, sometimes connected sockets can return a connection refused.
474                     # Give the caller a chance to reconnect
475                     remote = reconnect()
476                     if hasattr(remote, 'fileno'):
477                         remote_fd = remote.fileno()
478                 elif not udp:
479                     # in UDP mode, we ignore errors - packet loss man...
480                     raise
481                 #traceback.print_exc(file=sys.stderr)
482             
483             if bwlimit:
484                 bwfree -= sent
485         if tun in wrdy:
486             try:
487                 for x in xrange(maxtbatch):
488                     packet = pullPacket(bkbuf)
489                     twrite(tunfd, packet)
490                     #wt += 1
491                     
492                     # Do not inject packets into the TUN faster than they arrive, unless we're falling
493                     # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
494                     # don't block either (they're always ready to write), so if we flood the device 
495                     # we'll have high packet loss.
496                     if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
497                         break
498                 else:
499                     if slowlocal:
500                         # Give some time for the kernel to process the packets
501                         time.sleep(0)
502             except OSError,e:
503                 # This except handles the entire While block on PURPOSE
504                 # as an optimization (setting a try/except block is expensive)
505                 # The only operation that can raise this exception is os_write
506                 if e.errno in retrycodes:
507                     # re-schedule packet
508                     reschedule(bkbuf, packet)
509                 else:
510                     raise
511         
512         # check incoming data packets
513         if tun in rdrdy:
514             try:
515                 for x in xrange(maxbatch):
516                     packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
517                     if not packet:
518                         continue
519                     #rt += 1
520                     fwbuf.append(packet)
521                     
522                     if not tnonblock or len(fwbuf) >= maxfwbuf:
523                         break
524             except OSError,e:
525                 # This except handles the entire While block on PURPOSE
526                 # as an optimization (setting a try/except block is expensive)
527                 # The only operation that can raise this exception is os_read
528                 if e.errno not in retrycodes:
529                     raise
530         if remote in rdrdy:
531             try:
532                 try:
533                     for x in xrange(maxbatch):
534                         packet = rread(remote,2000)
535                         
536                         #rr += 1
537                         
538                         if crypto_mode:
539                             packet = decrypt_(packet, crypter)
540                             if not packet:
541                                 continue
542                         elif not packet:
543                             if not udp and packet == "":
544                                 # Connection broken, try to reconnect (or just die)
545                                 raise RuntimeError, "Connection broken"
546                             else:
547                                 continue
548
549                         bkbuf.append(packet)
550                         
551                         if not rnonblock or len(bkbuf) >= maxbkbuf:
552                             break
553                 except OSError,e:
554                     # This except handles the entire While block on PURPOSE
555                     # as an optimization (setting a try/except block is expensive)
556                     # The only operation that can raise this exception is rread
557                     if e.errno not in retrycodes:
558                         raise
559             except Exception, e:
560                 if reconnect is not None:
561                     # in UDP mode, sometimes connected sockets can return a connection refused
562                     # on read. Give the caller a chance to reconnect
563                     remote = reconnect()
564                     if hasattr(remote, 'fileno'):
565                         remote_fd = remote.fileno()
566                 elif not udp:
567                     # in UDP mode, we ignore errors - packet loss man...
568                     raise
569                 traceback.print_exc(file=sys.stderr)
570
571         if bwlimit:
572             tnow = tget()
573             delta = tnow - lastbwtime
574             if delta > 0.001:
575                 delta = int(bwlimit * delta)
576                 if delta > 0:
577                     bwfree = min(bwfree+delta, maxbwfree)
578                     lastbwtime = tnow
579         
580         #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
581
582 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
583     rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
584     retrydelay = 1.0
585     for i in xrange(30):
586         # TERMINATE is a array. An item can be added to TERMINATE, from
587         # outside this function to force termination of the loop
588         if TERMINATE:
589             raise OSError, "Killed"
590         try:
591             rsock.bind((local_addr, local_port))
592             break
593         except socket.error:
594             # wait a while, retry
595             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
596             time.sleep(min(30.0,retrydelay))
597             retrydelay *= 1.1
598     else:
599         rsock.bind((local_addr, local_port))
600     print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
601     print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
602     rsock.connect((peer_addr, peer_port))
603     return rsock
604
605 def udp_handshake(TERMINATE, rsock):
606     endme = False
607     def keepalive():
608         while not endme and not TERMINATE:
609             try:
610                 rsock.send('')
611             except:
612                 pass
613             time.sleep(1)
614         try:
615             rsock.send('')
616         except:
617             pass
618     keepalive_thread = threading.Thread(target=keepalive)
619     keepalive_thread.start()
620     for i in xrange(900):
621         if TERMINATE:
622             raise OSError, "Killed"
623         try:
624             heartbeat = rsock.recv(10)
625             break
626         except:
627             time.sleep(1)
628     else:
629         heartbeat = rsock.recv(10)
630     endme = True
631     keepalive_thread.join()
632
633 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
634     rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
635             peer_port)
636     udp_handshake(TERMINATE, rsock)
637     return rsock 
638
639 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
640     sock = None
641     retrydelay = 1.0
642     # The peer has a firewall that prevents a response to the connect, we 
643     # will be forever blocked in the connect, so we put a reasonable timeout.
644     rsock.settimeout(10) 
645     # We wait for 
646     for i in xrange(30):
647         if stop:
648             break
649         if TERMINATE:
650             raise OSError, "Killed"
651         try:
652             rsock.connect((peer_addr, peer_port))
653             sock = rsock
654             break
655         except socket.error:
656             # wait a while, retry
657             print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
658             time.sleep(min(30.0,retrydelay))
659             retrydelay *= 1.1
660     else:
661         rsock.connect((peer_addr, peer_port))
662         sock = rsock
663     if sock:
664         print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
665         sock.settimeout(0) 
666         
667         print >>sys.stderr, "tcp_connect: disabling NAGLE"
668         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
669     return sock
670
671 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
672     sock = None
673     retrydelay = 1.0
674     # We try to bind to the local virtual interface. 
675     # It might not exist yet so we wait in a loop.
676     for i in xrange(30):
677         if stop:
678             break
679         if TERMINATE:
680             raise OSError, "Killed"
681         try:
682             lsock.bind((local_addr, local_port))
683             break
684         except socket.error:
685             # wait a while, retry
686             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
687             time.sleep(min(30.0,retrydelay))
688             retrydelay *= 1.1
689     else:
690         lsock.bind((local_addr, local_port))
691
692     print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
693     # Now we wait until the other side connects. 
694     # The other side might not be ready yet, so we also wait in a loop for timeouts.
695     timeout = 1
696     lsock.listen(1)
697     for i in xrange(30):
698         if TERMINATE:
699             raise OSError, "Killed"
700         rlist, wlist, xlist = select.select([lsock], [], [], timeout)
701         if stop:
702             break
703         if lsock in rlist:
704             sock,raddr = lsock.accept()
705             print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
706             break
707         timeout += 5
708     return sock
709
710 def tcp_handshake(rsock, listen, hand):
711     # we are going to use a barrier algorithm to decide wich side listen.
712     # each side will "roll a dice" and send the resulting value to the other 
713     # side. 
714     win = False
715     rsock.settimeout(10)
716     try:
717         rsock.send(hand)
718         peer_hand = rsock.recv(4)
719         if not peer_hand:
720             print >>sys.stderr, "tcp_handshake: connection reset by peer"
721             return False
722         else:
723             print >>sys.stderr, "tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand)
724         if hand < peer_hand:
725             if listen:
726                 win = True
727         elif hand > peer_hand:
728             if not listen:
729                 win = True
730     finally:
731         rsock.settimeout(0)
732     return win
733
734 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
735     def listen(stop, hand, lsock, lresult):
736         win = False
737         rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
738         if rsock:
739             win = tcp_handshake(rsock, True, hand)
740             stop.append(True)
741         lresult.append((win, rsock))
742
743     def connect(stop, hand, rsock, rresult):
744         win = False
745         rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
746         if rsock:
747             win = tcp_handshake(rsock, False, hand)
748             stop.append(True)
749         rresult.append((win, rsock))
750   
751     end = False
752     sock = None
753     for i in xrange(0, 50):
754         if end:
755             break
756         if TERMINATE:
757             raise OSError, "Killed"
758         hand = struct.pack("!L", random.randint(0, 2**30))
759         stop = []
760         lresult = []
761         rresult = []
762         lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
763         rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
764         listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
765         connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
766         connect_thread.start()
767         listen_thread.start()
768         connect_thread.join()
769         listen_thread.join()
770         (lwin, lrsock) = lresult[0]
771         (rwin, rrsock) = rresult[0]
772         if not lrsock or not rrsock:
773             if not lrsock:
774                 sock = rrsock
775             if not rrsock:
776                 sock = lrsock
777             end = True
778         # both socket are connected
779         else:
780            if lwin:
781                 sock = lrsock
782                 end = True
783            elif rwin: 
784                 sock = rrsock
785                 end = True
786
787     if not sock:
788         raise OSError, "Error: tcp_establish could not establish connection."
789     return sock
790
791