Avoid mismtaching cryptography configuration in both extremes of a tunnel.
[nepi.git] / src / nepi / util / tunchannel.py
1 import select
2 import sys
3 import os
4 import struct
5 import socket
6 import threading
7 import traceback
8 import errno
9 import fcntl
10 import random
11 import traceback
12 import functools
13 import collections
14 import ctypes
15 import time
16
17 def ipfmt(ip):
18     ipbytes = map(ord,ip.decode("hex"))
19     return '.'.join(map(str,ipbytes))
20
21 tagtype = {
22     '0806' : 'arp',
23     '0800' : 'ipv4',
24     '8870' : 'jumbo',
25     '8863' : 'PPPoE discover',
26     '8864' : 'PPPoE',
27     '86dd' : 'ipv6',
28 }
29
30 def etherProto(packet, len=len):
31     if len(packet) > 14:
32         if packet[12] == "\x81" and packet[13] == "\x00":
33             # tagged
34             return packet[16:18]
35         else:
36             # untagged
37             return packet[12:14]
38     # default: ip
39     return "\x08\x00"
40
41 def formatPacket(packet, ether_mode):
42     if ether_mode:
43         stripped_packet = etherStrip(packet)
44         if not stripped_packet:
45             packet = packet.encode("hex")
46             if len(packet) < 28:
47                 return "malformed eth " + packet.encode("hex")
48             else:
49                 if packet[24:28] == "8100":
50                     # tagged
51                     ethertype = tagtype.get(packet[32:36], 'eth')
52                     return ethertype + " " + ( '-'.join( (
53                         packet[0:12], # MAC dest
54                         packet[12:24], # MAC src
55                         packet[24:32], # VLAN tag
56                         packet[32:36], # Ethertype/len
57                         packet[36:], # Payload
58                     ) ) )
59                 else:
60                     # untagged
61                     ethertype = tagtype.get(packet[24:28], 'eth')
62                     return ethertype + " " + ( '-'.join( (
63                         packet[0:12], # MAC dest
64                         packet[12:24], # MAC src
65                         packet[24:28], # Ethertype/len
66                         packet[28:], # Payload
67                     ) ) )
68         else:
69             packet = stripped_packet
70     packet = packet.encode("hex")
71     if len(packet) < 48:
72         return "malformed ip " + packet
73     else:
74         return "ip " + ( '-'.join( (
75             packet[0:1], #version
76             packet[1:2], #header length
77             packet[2:4], #diffserv/ECN
78             packet[4:8], #total length
79             packet[8:12], #ident
80             packet[12:16], #flags/fragment offs
81             packet[16:18], #ttl
82             packet[18:20], #ip-proto
83             packet[20:24], #checksum
84             ipfmt(packet[24:32]), # src-ip
85             ipfmt(packet[32:40]), # dst-ip
86             packet[40:48] if (int(packet[1],16) > 5) else "", # options
87             packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
88         ) ) )
89
90 def _packetReady(buf, ether_mode=False, len=len, str=str):
91     if not buf:
92         return False
93         
94     rv = False
95     while not rv:
96         if len(buf[0]) < 4:
97             rv = False
98         elif ether_mode:
99             rv = True
100         else:
101             _,totallen = struct.unpack('HH',buf[0][:4])
102             totallen = socket.htons(totallen)
103             rv = len(buf[0]) >= totallen
104         if not rv and len(buf) > 1:
105             # collapse only first two buffers
106             # as needed, to mantain len(buf) meaningful
107             p1 = buf.popleft()
108             buf[0] = p1+str(buf[0])
109         else:
110             return rv
111     return rv
112
113 def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer):
114     if ether_mode:
115         return buf.popleft()
116     else:
117         _,totallen = struct.unpack('HH',buf[0][:4])
118         totallen = socket.htons(totallen)
119         if len(buf[0]) > totallen:
120             rv = buffer(buf[0],0,totallen)
121             buf[0] = buffer(buf[0],totallen)
122         else:
123             rv = buf.popleft()
124         return rv
125
126 def etherStrip(buf, buffer=buffer, len=len):
127     if len(buf) < 14:
128         return ""
129     if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
130         # tagged ethernet frame
131         return buffer(buf, 18)
132     elif buf[12:14] == '\x08\x00':
133         # untagged ethernet frame
134         return buffer(buf, 14)
135     else:
136         return ""
137
138 def etherWrap(packet):
139     return ''.join((
140         "\x00"*6*2 # bogus src and dst mac
141         +"\x08\x00", # IPv4
142         packet, # payload
143         "\x00"*4, # bogus crc
144     ))
145
146 def piStrip(buf, len=len):
147     if len(buf) < 4:
148         return buf
149     else:
150         return buffer(buf,4)
151     
152 def piWrap(buf, ether_mode, etherProto=etherProto):
153     if ether_mode:
154         proto = etherProto(buf)
155     else:
156         proto = "\x08\x00"
157     return ''.join((
158         "\x00\x00", # PI: 16 bits flags
159         proto, # 16 bits proto
160         buf,
161     ))
162
163 _padmap = [ chr(padding) * padding for padding in xrange(127) ]
164 del padding
165
166 def encrypt(packet, crypter, len=len, padmap=_padmap):
167     # pad
168     padding = crypter.block_size - len(packet) % crypter.block_size
169     packet += padmap[padding]
170     
171     # encrypt
172     return crypter.encrypt(packet)
173
174 def decrypt(packet, crypter, ord=ord):
175     if packet:
176         # decrypt
177         packet = crypter.decrypt(packet)
178         
179         # un-pad
180         padding = ord(packet[-1])
181         if not (0 < padding <= crypter.block_size):
182             # wrong padding
183             raise RuntimeError, "Truncated packet %s"
184         packet = packet[:-padding]
185     
186     return packet
187
188 def nonblock(fd):
189     try:
190         fl = fcntl.fcntl(fd, fcntl.F_GETFL)
191         fl |= os.O_NONBLOCK
192         fcntl.fcntl(fd, fcntl.F_SETFL, fl)
193         return True
194     except:
195         traceback.print_exc(file=sys.stderr)
196         # Just ignore
197         return False
198
199 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, SUSPEND,
200         stderr = sys.stderr, reconnect = None, rwrite = None, rread = None,
201         tunqueue = 1000, tunkqueue = 1000, cipher = 'AES', accept_local = None, 
202         accept_remote = None, slowlocal = True, queueclass = None, 
203         bwlimit = None, len = len, max = max, min = min, buffer = buffer,
204         OSError = OSError, select = select.select, selecterror = select.error, 
205         os = os, socket = socket,
206         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
207     crypto_mode = False
208     crypter = None
209
210     try:
211         if cipher_key and cipher:
212             import Crypto.Cipher
213             import hashlib
214             __import__('Crypto.Cipher.'+cipher)
215             
216             ciphername = cipher
217             cipher = getattr(Crypto.Cipher, cipher)
218             hashed_key = hashlib.sha256(cipher_key).digest()
219
220             if ciphername == 'AES':
221                 hashed_key = hashed_key[:16]
222             elif ciphername == 'Blowfish':
223                 hashed_key = hashed_key[:24]
224             elif ciphername == 'DES':
225                 hashed_key = hashed_key[:8]
226             elif ciphername == 'DES3':
227                 hashed_key = hashed_key[:24]
228
229             crypter = cipher.new(
230                 hashed_key, 
231                 cipher.MODE_ECB)
232             crypto_mode = True
233     except:
234         # We don't want decription to work only on one side,
235         # This could break things really bad
236         #crypto_mode = False
237         #crypter = None
238         traceback.print_exc(file=sys.stderr)
239         raise
240
241     if stderr is not None:
242         if crypto_mode:
243             print >>stderr, "Packets are transmitted in CIPHER"
244         else:
245             print >>stderr, "Packets are transmitted in PLAINTEXT"
246     
247     if hasattr(remote, 'fileno'):
248         remote_fd = remote.fileno()
249         if rwrite is None:
250             def rwrite(remote, packet, os_write=os.write):
251                 return os_write(remote_fd, packet)
252         if rread is None:
253             def rread(remote, maxlen, os_read=os.read):
254                 return os_read(remote_fd, maxlen)
255  
256     rnonblock = nonblock(remote)
257     tnonblock = nonblock(tun)
258     
259     # Pick up TUN/TAP writing method
260     if with_pi:
261         try:
262             import iovec
263             
264             # We have iovec, so we can skip PI injection
265             # and use iovec which does it natively
266             if ether_mode:
267                 twrite = iovec.ethpiwrite
268                 tread = iovec.piread2
269             else:
270                 twrite = iovec.ippiwrite
271                 tread = iovec.piread2
272         except ImportError:
273             # We have to inject PI headers pythonically
274             def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
275                 return oswrite(fd, piWrap(packet, ether_mode))
276             
277             # For reading, we strip PI headers with buffer slicing and that's it
278             def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
279                 return piStrip(osread(fd, maxlen))
280     else:
281         # No need to inject PI headers
282         twrite = os.write
283         tread = os.read
284     
285     encrypt_ = encrypt
286     decrypt_ = decrypt
287     xrange_ = xrange
288
289     if accept_local is not None:
290         def tread(fd, maxlen, _tread=tread, accept=accept_local):
291             packet = _tread(fd, maxlen)
292             if accept(packet, 0):
293                 return packet
294             else:
295                 return None
296
297     if accept_remote is not None:
298         if crypto_mode:
299             def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
300                 packet = decrypt_(packet, crypter)
301                 if accept(packet, 1):
302                     return packet
303                 else:
304                     return None
305         else:
306             def rread(fd, maxlen, _rread=rread, accept=accept_remote):
307                 packet = _rread(fd, maxlen)
308                 if accept(packet, 1):
309                     return packet
310                 else:
311                     return None
312     
313     maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
314     tunhurry = max(0,maxbkbuf/2)
315     
316     if queueclass is None:
317         queueclass = collections.deque
318         maxbatch = 2000
319         maxtbatch = 50
320     else:
321         maxfwbuf = maxbkbuf = 2000000000
322         maxbatch = 50
323         maxtbatch = 30
324         tunhurry = 30
325     
326     fwbuf = queueclass()
327     bkbuf = queueclass()
328     nfwbuf = 0
329     nbkbuf = 0
330     
331     # backwards queue functions
332     # they may need packet inspection to 
333     # reconstruct packet boundaries
334     if ether_mode or udp:
335         packetReady = bool
336         pullPacket = queueclass.popleft
337         reschedule = queueclass.appendleft
338     else:
339         packetReady = _packetReady
340         pullPacket = _pullPacket
341         reschedule = queueclass.appendleft
342     
343     # forward queue functions
344     # no packet inspection needed
345     fpacketReady = bool
346     fpullPacket = queueclass.popleft
347     freschedule = queueclass.appendleft
348     
349     tunfd = tun.fileno()
350     os_read = os.read
351     os_write = os.write
352     
353     tget = time.time
354     maxbwfree = bwfree = 1500 * tunqueue
355     lastbwtime = tget()
356     
357     remoteok = True
358     
359     
360     while not TERMINATE:
361         # The SUSPEND flag has been set. This means we need to wait on
362         # the SUSPEND condition until it is released.
363         while SUSPEND and not TERMINATE:
364             time.sleep(0.5)
365
366         wset = []
367         if packetReady(bkbuf):
368             wset.append(tun)
369         if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0):
370             wset.append(remote)
371         
372         rset = []
373         if len(fwbuf) < maxfwbuf:
374             rset.append(tun)
375         if remoteok and len(bkbuf) < maxbkbuf:
376             rset.append(remote)
377         
378         if remoteok:
379             eset = (tun,remote)
380         else:
381             eset = (tun,)
382         
383         try:
384             rdrdy, wrdy, errs = select(rset,wset,eset,1)
385         except selecterror, e:
386             if e.args[0] == errno.EINTR:
387                 # just retry
388                 continue
389             else:
390                 traceback.print_exc(file=sys.stderr)
391                 # If the SUSPEND flag has been set, then the TUN will be in a bad
392                 # state and the select error should be ignores.
393                 if SUSPEND:
394                     continue
395                 else:
396                     raise
397
398         # check for errors
399         if errs:
400             if reconnect is not None and remote in errs and tun not in errs:
401                 remote = reconnect()
402                 if hasattr(remote, 'fileno'):
403                     remote_fd = remote.fileno()
404             elif udp and remote in errs and tun not in errs:
405                 # In UDP mode, those are always transient errors
406                 # Usually, an error will imply a read-ready socket
407                 # that will raise an "Connection refused" error, so
408                 # disable read-readiness just for now, and retry
409                 # the select
410                 remoteok = False
411                 continue
412             else:
413                 break
414         else:
415             remoteok = True
416         
417         # check to see if we can write
418         #rr = wr = rt = wt = 0
419         if remote in wrdy:
420             sent = 0
421             try:
422                 try:
423                     for x in xrange(maxbatch):
424                         packet = pullPacket(fwbuf)
425
426                         if crypto_mode:
427                             packet = encrypt_(packet, crypter)
428                         
429                         sentnow = rwrite(remote, packet)
430                         sent += sentnow
431                         #wr += 1
432                         
433                         if not udp and 0 <= sentnow < len(packet):
434                             # packet partially sent
435                             # reschedule the remaining part
436                             # this doesn't happen ever in udp mode
437                             freschedule(fwbuf, buffer(packet,sentnow))
438                         
439                         if not rnonblock or not fpacketReady(fwbuf):
440                             break
441                 except OSError,e:
442                     # This except handles the entire While block on PURPOSE
443                     # as an optimization (setting a try/except block is expensive)
444                     # The only operation that can raise this exception is rwrite
445                     if e.errno in retrycodes:
446                         # re-schedule packet
447                         freschedule(fwbuf, packet)
448                     else:
449                         raise
450             except:
451                 if reconnect is not None:
452                     # in UDP mode, sometimes connected sockets can return a connection refused.
453                     # Give the caller a chance to reconnect
454                     remote = reconnect()
455                     if hasattr(remote, 'fileno'):
456                         remote_fd = remote.fileno()
457                 elif not udp:
458                     # in UDP mode, we ignore errors - packet loss man...
459                     raise
460                 #traceback.print_exc(file=sys.stderr)
461             
462             if bwlimit:
463                 bwfree -= sent
464         if tun in wrdy:
465             try:
466                 for x in xrange(maxtbatch):
467                     packet = pullPacket(bkbuf)
468                     twrite(tunfd, packet)
469                     #wt += 1
470                     
471                     # Do not inject packets into the TUN faster than they arrive, unless we're falling
472                     # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
473                     # don't block either (they're always ready to write), so if we flood the device 
474                     # we'll have high packet loss.
475                     if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
476                         break
477                 else:
478                     if slowlocal:
479                         # Give some time for the kernel to process the packets
480                         time.sleep(0)
481             except OSError,e:
482                 # This except handles the entire While block on PURPOSE
483                 # as an optimization (setting a try/except block is expensive)
484                 # The only operation that can raise this exception is os_write
485                 if e.errno in retrycodes:
486                     # re-schedule packet
487                     reschedule(bkbuf, packet)
488                 else:
489                     raise
490         
491         # check incoming data packets
492         if tun in rdrdy:
493             try:
494                 for x in xrange(maxbatch):
495                     packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
496                     if not packet:
497                         continue
498                     #rt += 1
499                     fwbuf.append(packet)
500                     
501                     if not tnonblock or len(fwbuf) >= maxfwbuf:
502                         break
503             except OSError,e:
504                 # This except handles the entire While block on PURPOSE
505                 # as an optimization (setting a try/except block is expensive)
506                 # The only operation that can raise this exception is os_read
507                 if e.errno not in retrycodes:
508                     raise
509         if remote in rdrdy:
510             try:
511                 try:
512                     for x in xrange(maxbatch):
513                         packet = rread(remote,2000)
514                         
515                         #rr += 1
516                         
517                         if crypto_mode:
518                             packet = decrypt_(packet, crypter)
519                             if not packet:
520                                 continue
521                         elif not packet:
522                             if not udp and packet == "":
523                                 # Connection broken, try to reconnect (or just die)
524                                 raise RuntimeError, "Connection broken"
525                             else:
526                                 continue
527
528                         bkbuf.append(packet)
529                         
530                         if not rnonblock or len(bkbuf) >= maxbkbuf:
531                             break
532                 except OSError,e:
533                     # This except handles the entire While block on PURPOSE
534                     # as an optimization (setting a try/except block is expensive)
535                     # The only operation that can raise this exception is rread
536                     if e.errno not in retrycodes:
537                         raise
538             except Exception, e:
539                 if reconnect is not None:
540                     # in UDP mode, sometimes connected sockets can return a connection refused
541                     # on read. Give the caller a chance to reconnect
542                     remote = reconnect()
543                     if hasattr(remote, 'fileno'):
544                         remote_fd = remote.fileno()
545                 elif not udp:
546                     # in UDP mode, we ignore errors - packet loss man...
547                     raise
548                 traceback.print_exc(file=sys.stderr)
549
550         if bwlimit:
551             tnow = tget()
552             delta = tnow - lastbwtime
553             if delta > 0.001:
554                 delta = int(bwlimit * delta)
555                 if delta > 0:
556                     bwfree = min(bwfree+delta, maxbwfree)
557                     lastbwtime = tnow
558         
559         #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
560
561 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
562     rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
563     retrydelay = 1.0
564     for i in xrange(30):
565         # TERMINATE is a array. An item can be added to TERMINATE, from
566         # outside this function to force termination of the loop
567         if TERMINATE:
568             raise OSError, "Killed"
569         try:
570             rsock.bind((local_addr, local_port))
571             break
572         except socket.error:
573             # wait a while, retry
574             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
575             time.sleep(min(30.0,retrydelay))
576             retrydelay *= 1.1
577     else:
578         rsock.bind((local_addr, local_port))
579     print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
580     print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
581     rsock.connect((peer_addr, peer_port))
582     return rsock
583
584 def udp_handshake(TERMINATE, rsock):
585     endme = False
586     def keepalive():
587         while not endme and not TERMINATE:
588             try:
589                 rsock.send('')
590             except:
591                 pass
592             time.sleep(1)
593         try:
594             rsock.send('')
595         except:
596             pass
597     keepalive_thread = threading.Thread(target=keepalive)
598     keepalive_thread.start()
599     for i in xrange(900):
600         if TERMINATE:
601             raise OSError, "Killed"
602         try:
603             heartbeat = rsock.recv(10)
604             break
605         except:
606             time.sleep(1)
607     else:
608         heartbeat = rsock.recv(10)
609     endme = True
610     keepalive_thread.join()
611
612 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
613     rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
614             peer_port)
615     udp_handshake(TERMINATE, rsock)
616     return rsock 
617
618 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
619     sock = None
620     retrydelay = 1.0
621     # The peer has a firewall that prevents a response to the connect, we 
622     # will be forever blocked in the connect, so we put a reasonable timeout.
623     rsock.settimeout(10) 
624     # We wait for 
625     for i in xrange(30):
626         if stop:
627             break
628         if TERMINATE:
629             raise OSError, "Killed"
630         try:
631             rsock.connect((peer_addr, peer_port))
632             sock = rsock
633             break
634         except socket.error:
635             # wait a while, retry
636             print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
637             time.sleep(min(30.0,retrydelay))
638             retrydelay *= 1.1
639     else:
640         rsock.connect((peer_addr, peer_port))
641         sock = rsock
642     if sock:
643         print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
644         sock.settimeout(0) 
645         
646         print >>sys.stderr, "tcp_connect: disabling NAGLE"
647         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
648     return sock
649
650 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
651     sock = None
652     retrydelay = 1.0
653     # We try to bind to the local virtual interface. 
654     # It might not exist yet so we wait in a loop.
655     for i in xrange(30):
656         if stop:
657             break
658         if TERMINATE:
659             raise OSError, "Killed"
660         try:
661             lsock.bind((local_addr, local_port))
662             break
663         except socket.error:
664             # wait a while, retry
665             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
666             time.sleep(min(30.0,retrydelay))
667             retrydelay *= 1.1
668     else:
669         lsock.bind((local_addr, local_port))
670
671     print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
672     # Now we wait until the other side connects. 
673     # The other side might not be ready yet, so we also wait in a loop for timeouts.
674     timeout = 1
675     lsock.listen(1)
676     for i in xrange(30):
677         if TERMINATE:
678             raise OSError, "Killed"
679         rlist, wlist, xlist = select.select([lsock], [], [], timeout)
680         if stop:
681             break
682         if lsock in rlist:
683             sock,raddr = lsock.accept()
684             print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
685             break
686         timeout += 5
687     return sock
688
689 def tcp_handshake(rsock, listen, hand):
690     # we are going to use a barrier algorithm to decide wich side listen.
691     # each side will "roll a dice" and send the resulting value to the other 
692     # side. 
693     win = False
694     rsock.settimeout(10)
695     try:
696         rsock.send(hand)
697         peer_hand = rsock.recv(4)
698         if not peer_hand:
699             print >>sys.stderr, "tcp_handshake: connection reset by peer"
700             return False
701         else:
702             print >>sys.stderr, "tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand)
703         if hand < peer_hand:
704             if listen:
705                 win = True
706         elif hand > peer_hand:
707             if not listen:
708                 win = True
709     finally:
710         rsock.settimeout(0)
711     return win
712
713 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
714     def listen(stop, hand, lsock, lresult):
715         win = False
716         rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
717         if rsock:
718             win = tcp_handshake(rsock, True, hand)
719             stop.append(True)
720         lresult.append((win, rsock))
721
722     def connect(stop, hand, rsock, rresult):
723         win = False
724         rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
725         if rsock:
726             win = tcp_handshake(rsock, False, hand)
727             stop.append(True)
728         rresult.append((win, rsock))
729   
730     end = False
731     sock = None
732     for i in xrange(0, 50):
733         if end:
734             break
735         if TERMINATE:
736             raise OSError, "Killed"
737         hand = struct.pack("!L", random.randint(0, 2**30))
738         stop = []
739         lresult = []
740         rresult = []
741         lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
742         rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
743         listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
744         connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
745         connect_thread.start()
746         listen_thread.start()
747         connect_thread.join()
748         listen_thread.join()
749         (lwin, lrsock) = lresult[0]
750         (rwin, rrsock) = rresult[0]
751         if not lrsock or not rrsock:
752             if not lrsock:
753                 sock = rrsock
754             if not rrsock:
755                 sock = lrsock
756             end = True
757         # both socket are connected
758         else:
759            if lwin:
760                 sock = lrsock
761                 end = True
762            elif rwin: 
763                 sock = rrsock
764                 end = True
765
766     if not sock:
767         raise OSError, "Error: tcp_establish could not establish connection."
768     return sock
769
770