applied the except and raise fixers to the master branch to close the gap with py3
[nepi.git] / src / nepi / resources / linux / scripts / tunchannel.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Claudio Freire <claudio-daniel.freire@inria.fr>
19 #
20
21 from __future__ import print_function
22
23 import select
24 import sys
25 import os
26 import struct
27 import socket
28 import threading
29 import traceback
30 import errno
31 import fcntl
32 import random
33 import traceback
34 import functools
35 import collections
36 import ctypes
37 import time
38
39 def ipfmt(ip):
40     ipbytes = map(ord,ip.decode("hex"))
41     return '.'.join(map(str,ipbytes))
42
43 tagtype = {
44     '0806' : 'arp',
45     '0800' : 'ipv4',
46     '8870' : 'jumbo',
47     '8863' : 'PPPoE discover',
48     '8864' : 'PPPoE',
49     '86dd' : 'ipv6',
50 }
51
52 def etherProto(packet, len=len):
53     if len(packet) > 14:
54         if packet[12] == "\x81" and packet[13] == "\x00":
55             # tagged
56             return packet[16:18]
57         else:
58             # untagged
59             return packet[12:14]
60     # default: ip
61     return "\x08\x00"
62
63 def formatPacket(packet, ether_mode):
64     if ether_mode:
65         stripped_packet = etherStrip(packet)
66         if not stripped_packet:
67             packet = packet.encode("hex")
68             if len(packet) < 28:
69                 return "malformed eth " + packet.encode("hex")
70             else:
71                 if packet[24:28] == "8100":
72                     # tagged
73                     ethertype = tagtype.get(packet[32:36], 'eth')
74                     return ethertype + " " + ( '-'.join( (
75                         packet[0:12], # MAC dest
76                         packet[12:24], # MAC src
77                         packet[24:32], # VLAN tag
78                         packet[32:36], # Ethertype/len
79                         packet[36:], # Payload
80                     ) ) )
81                 else:
82                     # untagged
83                     ethertype = tagtype.get(packet[24:28], 'eth')
84                     return ethertype + " " + ( '-'.join( (
85                         packet[0:12], # MAC dest
86                         packet[12:24], # MAC src
87                         packet[24:28], # Ethertype/len
88                         packet[28:], # Payload
89                     ) ) )
90         else:
91             packet = stripped_packet
92     packet = packet.encode("hex")
93     if len(packet) < 48:
94         return "malformed ip " + packet
95     else:
96         return "ip " + ( '-'.join( (
97             packet[0:1], #version
98             packet[1:2], #header length
99             packet[2:4], #diffserv/ECN
100             packet[4:8], #total length
101             packet[8:12], #ident
102             packet[12:16], #flags/fragment offs
103             packet[16:18], #ttl
104             packet[18:20], #ip-proto
105             packet[20:24], #checksum
106             ipfmt(packet[24:32]), # src-ip
107             ipfmt(packet[32:40]), # dst-ip
108             packet[40:48] if (int(packet[1],16) > 5) else "", # options
109             packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
110         ) ) )
111
112 def _packetReady(buf, ether_mode=False, len=len, str=str):
113     if not buf:
114         return False
115         
116     rv = False
117     while not rv:
118         if len(buf[0]) < 4:
119             rv = False
120         elif ether_mode:
121             rv = True
122         else:
123             _,totallen = struct.unpack('HH',buf[0][:4])
124             totallen = socket.htons(totallen)
125             rv = len(buf[0]) >= totallen
126         if not rv and len(buf) > 1:
127             # collapse only first two buffers
128             # as needed, to mantain len(buf) meaningful
129             p1 = buf.popleft()
130             buf[0] = p1+str(buf[0])
131         else:
132             return rv
133     return rv
134
135 def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer):
136     if ether_mode:
137         return buf.popleft()
138     else:
139         _,totallen = struct.unpack('HH',buf[0][:4])
140         totallen = socket.htons(totallen)
141         if len(buf[0]) > totallen:
142             rv = buffer(buf[0],0,totallen)
143             buf[0] = buffer(buf[0],totallen)
144         else:
145             rv = buf.popleft()
146         return rv
147
148 def etherStrip(buf, buffer=buffer, len=len):
149     if len(buf) < 14:
150         return ""
151     if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
152         # tagged ethernet frame
153         return buffer(buf, 18)
154     elif buf[12:14] == '\x08\x00':
155         # untagged ethernet frame
156         return buffer(buf, 14)
157     else:
158         return ""
159
160 def etherWrap(packet):
161     return ''.join((
162         "\x00"*6*2 # bogus src and dst mac
163         +"\x08\x00", # IPv4
164         packet, # payload
165         "\x00"*4, # bogus crc
166     ))
167
168 def piStrip(buf, len=len):
169     if len(buf) < 4:
170         return buf
171     else:
172         return buffer(buf,4)
173     
174 def piWrap(buf, ether_mode, etherProto=etherProto):
175     if ether_mode:
176         proto = etherProto(buf)
177     else:
178         proto = "\x08\x00"
179     return ''.join((
180         "\x00\x00", # PI: 16 bits flags
181         proto, # 16 bits proto
182         buf,
183     ))
184
185 _padmap = [ chr(padding) * padding for padding in xrange(127) ]
186 del padding
187
188 def encrypt(packet, crypter, len=len, padmap=_padmap):
189     # pad
190     padding = crypter.block_size - len(packet) % crypter.block_size
191     packet += padmap[padding]
192     
193     # encrypt
194     return crypter.encrypt(packet)
195
196 def decrypt(packet, crypter, ord=ord):
197     if packet:
198         # decrypt
199         packet = crypter.decrypt(packet)
200         
201         # un-pad
202         padding = ord(packet[-1])
203         if not (0 < padding <= crypter.block_size):
204             # wrong padding
205             raise RuntimeError("Truncated packet %s")
206         packet = packet[:-padding]
207     
208     return packet
209
210 def nonblock(fd):
211     try:
212         fl = fcntl.fcntl(fd, fcntl.F_GETFL)
213         fl |= os.O_NONBLOCK
214         fcntl.fcntl(fd, fcntl.F_SETFL, fl)
215         return True
216     except:
217         traceback.print_exc(file=sys.stderr)
218         # Just ignore
219         return False
220
221 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND,
222         stderr = sys.stderr, reconnect = None, rwrite = None, rread = None,
223         tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None, 
224         accept_remote = None, slowlocal = True, queueclass = None, 
225         bwlimit = None, len = len, max = max, min = min, buffer = buffer,
226         OSError = OSError, select = select.select, selecterror = select.error, 
227         os = os, socket = socket,
228         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
229     crypto_mode = False
230     crypter = None
231
232     try:
233         if cipher_key and cipher:
234             import Crypto.Cipher
235             import hashlib
236             __import__('Crypto.Cipher.'+cipher)
237             
238             ciphername = cipher
239             cipher = getattr(Crypto.Cipher, cipher)
240             hashed_key = hashlib.sha256(cipher_key).digest()
241
242             if ciphername == 'AES':
243                 hashed_key = hashed_key[:16]
244             elif ciphername == 'Blowfish':
245                 hashed_key = hashed_key[:24]
246             elif ciphername == 'DES':
247                 hashed_key = hashed_key[:8]
248             elif ciphername == 'DES3':
249                 hashed_key = hashed_key[:24]
250
251             crypter = cipher.new(
252                 hashed_key, 
253                 cipher.MODE_ECB)
254             crypto_mode = True
255     except:
256         # We don't want decription to work only on one side,
257         # This could break things really bad
258         #crypto_mode = False
259         #crypter = None
260         traceback.print_exc(file=sys.stderr)
261         raise
262
263     if stderr is not None:
264         if crypto_mode:
265             print("Packets are transmitted in CIPHER", file=stderr)
266         else:
267             print("Packets are transmitted in PLAINTEXT", file=stderr)
268     
269     if hasattr(remote, 'fileno'):
270         remote_fd = remote.fileno()
271         if rwrite is None:
272             def rwrite(remote, packet, os_write=os.write):
273                 return os_write(remote_fd, packet)
274         if rread is None:
275             def rread(remote, maxlen, os_read=os.read):
276                 return os_read(remote_fd, maxlen)
277  
278     rnonblock = nonblock(remote)
279     tnonblock = nonblock(tun)
280     
281     # Pick up TUN/TAP writing method
282     if with_pi:
283         try:
284             import iovec
285             
286             # We have iovec, so we can skip PI injection
287             # and use iovec which does it natively
288             if ether_mode:
289                 twrite = iovec.ethpiwrite
290                 tread = iovec.piread2
291             else:
292                 twrite = iovec.ippiwrite
293                 tread = iovec.piread2
294         except ImportError:
295             # We have to inject PI headers pythonically
296             def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
297                 return oswrite(fd, piWrap(packet, ether_mode))
298             
299             # For reading, we strip PI headers with buffer slicing and that's it
300             def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
301                 return piStrip(osread(fd, maxlen))
302     else:
303         # No need to inject PI headers
304         twrite = os.write
305         tread = os.read
306     
307     encrypt_ = encrypt
308     decrypt_ = decrypt
309     xrange_ = xrange
310
311     if accept_local is not None:
312         def tread(fd, maxlen, _tread=tread, accept=accept_local):
313             packet = _tread(fd, maxlen)
314             if accept(packet, 0):
315                 return packet
316             else:
317                 return None
318
319     if accept_remote is not None:
320         if crypto_mode:
321             def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
322                 packet = decrypt_(packet, crypter)
323                 if accept(packet, 1):
324                     return packet
325                 else:
326                     return None
327         else:
328             def rread(fd, maxlen, _rread=rread, accept=accept_remote):
329                 packet = _rread(fd, maxlen)
330                 if accept(packet, 1):
331                     return packet
332                 else:
333                     return None
334     
335     maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
336     tunhurry = max(0,maxbkbuf/2)
337     
338     if queueclass is None:
339         queueclass = collections.deque
340         maxbatch = 2000
341         maxtbatch = 50
342     else:
343         maxfwbuf = maxbkbuf = 2000000000
344         maxbatch = 50
345         maxtbatch = 30
346         tunhurry = 30
347     
348     fwbuf = queueclass()
349     bkbuf = queueclass()
350     nfwbuf = 0
351     nbkbuf = 0
352     
353     # backwards queue functions
354     # they may need packet inspection to 
355     # reconstruct packet boundaries
356     if ether_mode or udp:
357         packetReady = bool
358         pullPacket = queueclass.popleft
359         reschedule = queueclass.appendleft
360     else:
361         packetReady = _packetReady
362         pullPacket = _pullPacket
363         reschedule = queueclass.appendleft
364     
365     # forward queue functions
366     # no packet inspection needed
367     fpacketReady = bool
368     fpullPacket = queueclass.popleft
369     freschedule = queueclass.appendleft
370     
371     tunfd = tun.fileno()
372     os_read = os.read
373     os_write = os.write
374     
375     tget = time.time
376     maxbwfree = bwfree = 1500 * tunqueue
377     lastbwtime = tget()
378     
379     remoteok = True
380     
381     
382     while not TERMINATE:
383         # The SUSPEND flag has been set. This means we need to wait on
384         # the SUSPEND condition until it is released.
385         while SUSPEND and not TERMINATE:
386             time.sleep(0.5)
387
388         wset = []
389         if packetReady(bkbuf):
390             wset.append(tun)
391         if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0):
392             wset.append(remote)
393         
394         rset = []
395         if len(fwbuf) < maxfwbuf:
396             rset.append(tun)
397         if remoteok and len(bkbuf) < maxbkbuf:
398             rset.append(remote)
399         
400         if remoteok:
401             eset = (tun,remote)
402         else:
403             eset = (tun,)
404         
405         try:
406             rdrdy, wrdy, errs = select(rset,wset,eset,1)
407         except selecterror as e:
408             if e.args[0] == errno.EINTR:
409                 # just retry
410                 continue
411             else:
412                 traceback.print_exc(file=sys.stderr)
413                 # If the SUSPEND flag has been set, then the TUN will be in a bad
414                 # state and the select error should be ignores.
415                 if SUSPEND:
416                     continue
417                 else:
418                     raise
419
420         # check for errors
421         if errs:
422             if reconnect is not None and remote in errs and tun not in errs:
423                 remote = reconnect()
424                 if hasattr(remote, 'fileno'):
425                     remote_fd = remote.fileno()
426             elif udp and remote in errs and tun not in errs:
427                 # In UDP mode, those are always transient errors
428                 # Usually, an error will imply a read-ready socket
429                 # that will raise an "Connection refused" error, so
430                 # disable read-readiness just for now, and retry
431                 # the select
432                 remoteok = False
433                 continue
434             else:
435                 break
436         else:
437             remoteok = True
438         
439         # check to see if we can write
440         #rr = wr = rt = wt = 0
441         if remote in wrdy:
442             sent = 0
443             try:
444                 try:
445                     for x in xrange(maxbatch):
446                         packet = pullPacket(fwbuf)
447
448                         if crypto_mode:
449                             packet = encrypt_(packet, crypter)
450                         
451                         sentnow = rwrite(remote, packet)
452                         sent += sentnow
453                         #wr += 1
454                         
455                         if not udp and 0 <= sentnow < len(packet):
456                             # packet partially sent
457                             # reschedule the remaining part
458                             # this doesn't happen ever in udp mode
459                             freschedule(fwbuf, buffer(packet,sentnow))
460                         
461                         if not rnonblock or not fpacketReady(fwbuf):
462                             break
463                 except OSError as e:
464                     # This except handles the entire While block on PURPOSE
465                     # as an optimization (setting a try/except block is expensive)
466                     # The only operation that can raise this exception is rwrite
467                     if e.errno in retrycodes:
468                         # re-schedule packet
469                         freschedule(fwbuf, packet)
470                     else:
471                         raise
472             except:
473                 if reconnect is not None:
474                     # in UDP mode, sometimes connected sockets can return a connection refused.
475                     # Give the caller a chance to reconnect
476                     remote = reconnect()
477                     if hasattr(remote, 'fileno'):
478                         remote_fd = remote.fileno()
479                 elif not udp:
480                     # in UDP mode, we ignore errors - packet loss man...
481                     raise
482                 #traceback.print_exc(file=sys.stderr)
483             
484             if bwlimit:
485                 bwfree -= sent
486         if tun in wrdy:
487             try:
488                 for x in xrange(maxtbatch):
489                     packet = pullPacket(bkbuf)
490                     twrite(tunfd, packet)
491                     #wt += 1
492                     
493                     # Do not inject packets into the TUN faster than they arrive, unless we're falling
494                     # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
495                     # don't block either (they're always ready to write), so if we flood the device 
496                     # we'll have high packet loss.
497                     if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
498                         break
499                 else:
500                     if slowlocal:
501                         # Give some time for the kernel to process the packets
502                         time.sleep(0)
503             except OSError as e:
504                 # This except handles the entire While block on PURPOSE
505                 # as an optimization (setting a try/except block is expensive)
506                 # The only operation that can raise this exception is os_write
507                 if e.errno in retrycodes:
508                     # re-schedule packet
509                     reschedule(bkbuf, packet)
510                 else:
511                     raise
512         
513         # check incoming data packets
514         if tun in rdrdy:
515             try:
516                 for x in xrange(maxbatch):
517                     packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
518                     if not packet:
519                         continue
520                     #rt += 1
521                     fwbuf.append(packet)
522                     
523                     if not tnonblock or len(fwbuf) >= maxfwbuf:
524                         break
525             except OSError as e:
526                 # This except handles the entire While block on PURPOSE
527                 # as an optimization (setting a try/except block is expensive)
528                 # The only operation that can raise this exception is os_read
529                 if e.errno not in retrycodes:
530                     raise
531         if remote in rdrdy:
532             try:
533                 try:
534                     for x in xrange(maxbatch):
535                         packet = rread(remote,2000)
536                         
537                         #rr += 1
538                         
539                         if crypto_mode:
540                             packet = decrypt_(packet, crypter)
541                             if not packet:
542                                 continue
543                         elif not packet:
544                             if not udp and packet == "":
545                                 # Connection broken, try to reconnect (or just die)
546                                 raise RuntimeError("Connection broken")
547                             else:
548                                 continue
549
550                         bkbuf.append(packet)
551                         
552                         if not rnonblock or len(bkbuf) >= maxbkbuf:
553                             break
554                 except OSError as e:
555                     # This except handles the entire While block on PURPOSE
556                     # as an optimization (setting a try/except block is expensive)
557                     # The only operation that can raise this exception is rread
558                     if e.errno not in retrycodes:
559                         raise
560             except Exception as e:
561                 if reconnect is not None:
562                     # in UDP mode, sometimes connected sockets can return a connection refused
563                     # on read. Give the caller a chance to reconnect
564                     remote = reconnect()
565                     if hasattr(remote, 'fileno'):
566                         remote_fd = remote.fileno()
567                 elif not udp:
568                     # in UDP mode, we ignore errors - packet loss man...
569                     raise
570                 traceback.print_exc(file=sys.stderr)
571
572         if bwlimit:
573             tnow = tget()
574             delta = tnow - lastbwtime
575             if delta > 0.001:
576                 delta = int(bwlimit * delta)
577                 if delta > 0:
578                     bwfree = min(bwfree+delta, maxbwfree)
579                     lastbwtime = tnow
580         
581         #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
582
583 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
584     rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
585     retrydelay = 1.0
586     for i in xrange(30):
587         # TERMINATE is a array. An item can be added to TERMINATE, from
588         # outside this function to force termination of the loop
589         if TERMINATE:
590             raise OSError("Killed")
591         try:
592             rsock.bind((local_addr, local_port))
593             break
594         except socket.error:
595             # wait a while, retry
596             print("%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),), file=sys.stderr)
597             time.sleep(min(30.0,retrydelay))
598             retrydelay *= 1.1
599     else:
600         rsock.bind((local_addr, local_port))
601     print("Listening UDP at: %s:%d" % (local_addr, local_port), file=sys.stderr)
602     print("Connecting UDP to: %s:%d" % (peer_addr, peer_port), file=sys.stderr)
603     rsock.connect((peer_addr, peer_port))
604     return rsock
605
606 def udp_handshake(TERMINATE, rsock):
607     endme = False
608     def keepalive():
609         while not endme and not TERMINATE:
610             try:
611                 rsock.send('')
612             except:
613                 pass
614             time.sleep(1)
615         try:
616             rsock.send('')
617         except:
618             pass
619     keepalive_thread = threading.Thread(target=keepalive)
620     keepalive_thread.start()
621     for i in xrange(900):
622         if TERMINATE:
623             raise OSError("Killed")
624         try:
625             heartbeat = rsock.recv(10)
626             break
627         except:
628             time.sleep(1)
629     else:
630         heartbeat = rsock.recv(10)
631     endme = True
632     keepalive_thread.join()
633
634 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
635     rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
636             peer_port)
637     udp_handshake(TERMINATE, rsock)
638     return rsock 
639
640 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
641     sock = None
642     retrydelay = 1.0
643     # The peer has a firewall that prevents a response to the connect, we 
644     # will be forever blocked in the connect, so we put a reasonable timeout.
645     rsock.settimeout(10) 
646     # We wait for 
647     for i in xrange(30):
648         if stop:
649             break
650         if TERMINATE:
651             raise OSError("Killed")
652         try:
653             rsock.connect((peer_addr, peer_port))
654             sock = rsock
655             break
656         except socket.error:
657             # wait a while, retry
658             print("%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),), file=sys.stderr)
659             time.sleep(min(30.0,retrydelay))
660             retrydelay *= 1.1
661     else:
662         rsock.connect((peer_addr, peer_port))
663         sock = rsock
664     if sock:
665         print("tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port), file=sys.stderr)
666         sock.settimeout(0) 
667         
668         print("tcp_connect: disabling NAGLE", file=sys.stderr)
669         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
670     return sock
671
672 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
673     sock = None
674     retrydelay = 1.0
675     # We try to bind to the local virtual interface. 
676     # It might not exist yet so we wait in a loop.
677     for i in xrange(30):
678         if stop:
679             break
680         if TERMINATE:
681             raise OSError("Killed")
682         try:
683             lsock.bind((local_addr, local_port))
684             break
685         except socket.error:
686             # wait a while, retry
687             print("%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),), file=sys.stderr)
688             time.sleep(min(30.0,retrydelay))
689             retrydelay *= 1.1
690     else:
691         lsock.bind((local_addr, local_port))
692
693     print("tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port), file=sys.stderr)
694     # Now we wait until the other side connects. 
695     # The other side might not be ready yet, so we also wait in a loop for timeouts.
696     timeout = 1
697     lsock.listen(1)
698     for i in xrange(30):
699         if TERMINATE:
700             raise OSError("Killed")
701         rlist, wlist, xlist = select.select([lsock], [], [], timeout)
702         if stop:
703             break
704         if lsock in rlist:
705             sock,raddr = lsock.accept()
706             print("tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port), file=sys.stderr)
707             break
708         timeout += 5
709     return sock
710
711 def tcp_handshake(rsock, listen, hand):
712     # we are going to use a barrier algorithm to decide wich side listen.
713     # each side will "roll a dice" and send the resulting value to the other 
714     # side. 
715     win = False
716     rsock.settimeout(10)
717     try:
718         rsock.send(hand)
719         peer_hand = rsock.recv(4)
720         if not peer_hand:
721             print("tcp_handshake: connection reset by peer", file=sys.stderr)
722             return False
723         else:
724             print("tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand), file=sys.stderr)
725         if hand < peer_hand:
726             if listen:
727                 win = True
728         elif hand > peer_hand:
729             if not listen:
730                 win = True
731     finally:
732         rsock.settimeout(0)
733     return win
734
735 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
736     def listen(stop, hand, lsock, lresult):
737         win = False
738         rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
739         if rsock:
740             win = tcp_handshake(rsock, True, hand)
741             stop.append(True)
742         lresult.append((win, rsock))
743
744     def connect(stop, hand, rsock, rresult):
745         win = False
746         rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
747         if rsock:
748             win = tcp_handshake(rsock, False, hand)
749             stop.append(True)
750         rresult.append((win, rsock))
751   
752     end = False
753     sock = None
754     for i in xrange(0, 50):
755         if end:
756             break
757         if TERMINATE:
758             raise OSError("Killed")
759         hand = struct.pack("!L", random.randint(0, 2**30))
760         stop = []
761         lresult = []
762         rresult = []
763         lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
764         rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
765         listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
766         connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
767         connect_thread.start()
768         listen_thread.start()
769         connect_thread.join()
770         listen_thread.join()
771         (lwin, lrsock) = lresult[0]
772         (rwin, rrsock) = rresult[0]
773         if not lrsock or not rrsock:
774             if not lrsock:
775                 sock = rrsock
776             if not rrsock:
777                 sock = lrsock
778             end = True
779         # both socket are connected
780         else:
781            if lwin:
782                 sock = lrsock
783                 end = True
784            elif rwin: 
785                 sock = rrsock
786                 end = True
787
788     if not sock:
789         raise OSError("Error: tcp_establish could not establish connection.")
790     return sock