735d08cde3d8a75cb8118b4073ef1a59a0c36403
[nepi.git] / src / nepi / util / tunchannel.py
1 import select
2 import sys
3 import os
4 import struct
5 import socket
6 import threading
7 import traceback
8 import errno
9 import fcntl
10 import random
11 import traceback
12 import functools
13 import collections
14 import ctypes
15 import time
16
17 def ipfmt(ip):
18     ipbytes = map(ord,ip.decode("hex"))
19     return '.'.join(map(str,ipbytes))
20
21 tagtype = {
22     '0806' : 'arp',
23     '0800' : 'ipv4',
24     '8870' : 'jumbo',
25     '8863' : 'PPPoE discover',
26     '8864' : 'PPPoE',
27     '86dd' : 'ipv6',
28 }
29 def etherProto(packet, len=len):
30     if len(packet) > 14:
31         if packet[12] == "\x81" and packet[13] == "\x00":
32             # tagged
33             return packet[16:18]
34         else:
35             # untagged
36             return packet[12:14]
37     # default: ip
38     return "\x08\x00"
39 def formatPacket(packet, ether_mode):
40     if ether_mode:
41         stripped_packet = etherStrip(packet)
42         if not stripped_packet:
43             packet = packet.encode("hex")
44             if len(packet) < 28:
45                 return "malformed eth " + packet.encode("hex")
46             else:
47                 if packet[24:28] == "8100":
48                     # tagged
49                     ethertype = tagtype.get(packet[32:36], 'eth')
50                     return ethertype + " " + ( '-'.join( (
51                         packet[0:12], # MAC dest
52                         packet[12:24], # MAC src
53                         packet[24:32], # VLAN tag
54                         packet[32:36], # Ethertype/len
55                         packet[36:], # Payload
56                     ) ) )
57                 else:
58                     # untagged
59                     ethertype = tagtype.get(packet[24:28], 'eth')
60                     return ethertype + " " + ( '-'.join( (
61                         packet[0:12], # MAC dest
62                         packet[12:24], # MAC src
63                         packet[24:28], # Ethertype/len
64                         packet[28:], # Payload
65                     ) ) )
66         else:
67             packet = stripped_packet
68     packet = packet.encode("hex")
69     if len(packet) < 48:
70         return "malformed ip " + packet
71     else:
72         return "ip " + ( '-'.join( (
73             packet[0:1], #version
74             packet[1:2], #header length
75             packet[2:4], #diffserv/ECN
76             packet[4:8], #total length
77             packet[8:12], #ident
78             packet[12:16], #flags/fragment offs
79             packet[16:18], #ttl
80             packet[18:20], #ip-proto
81             packet[20:24], #checksum
82             ipfmt(packet[24:32]), # src-ip
83             ipfmt(packet[32:40]), # dst-ip
84             packet[40:48] if (int(packet[1],16) > 5) else "", # options
85             packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
86         ) ) )
87
88 def _packetReady(buf, ether_mode=False, len=len, str=str):
89     if not buf:
90         return False
91         
92     rv = False
93     while not rv:
94         if len(buf[0]) < 4:
95             rv = False
96         elif ether_mode:
97             rv = True
98         else:
99             _,totallen = struct.unpack('HH',buf[0][:4])
100             totallen = socket.htons(totallen)
101             rv = len(buf[0]) >= totallen
102         if not rv and len(buf) > 1:
103             # collapse only first two buffers
104             # as needed, to mantain len(buf) meaningful
105             p1 = buf.popleft()
106             buf[0] = p1+str(buf[0])
107         else:
108             return rv
109     return rv
110
111 def _pullPacket(buf, ether_mode=False, len=len, buffer=buffer):
112     if ether_mode:
113         return buf.popleft()
114     else:
115         _,totallen = struct.unpack('HH',buf[0][:4])
116         totallen = socket.htons(totallen)
117         if len(buf[0]) > totallen:
118             rv = buffer(buf[0],0,totallen)
119             buf[0] = buffer(buf[0],totallen)
120         else:
121             rv = buf.popleft()
122         return rv
123
124 def etherStrip(buf, buffer=buffer, len=len):
125     if len(buf) < 14:
126         return ""
127     if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
128         # tagged ethernet frame
129         return buffer(buf, 18)
130     elif buf[12:14] == '\x08\x00':
131         # untagged ethernet frame
132         return buffer(buf, 14)
133     else:
134         return ""
135
136 def etherWrap(packet):
137     return ''.join((
138         "\x00"*6*2 # bogus src and dst mac
139         +"\x08\x00", # IPv4
140         packet, # payload
141         "\x00"*4, # bogus crc
142     ))
143
144 def piStrip(buf, len=len):
145     if len(buf) < 4:
146         return buf
147     else:
148         return buffer(buf,4)
149     
150 def piWrap(buf, ether_mode, etherProto=etherProto):
151     if ether_mode:
152         proto = etherProto(buf)
153     else:
154         proto = "\x08\x00"
155     return ''.join((
156         "\x00\x00", # PI: 16 bits flags
157         proto, # 16 bits proto
158         buf,
159     ))
160
161 _padmap = [ chr(padding) * padding for padding in xrange(127) ]
162 del padding
163
164 def encrypt(packet, crypter, len=len, padmap=_padmap):
165     # pad
166     padding = crypter.block_size - len(packet) % crypter.block_size
167     packet += padmap[padding]
168     
169     # encrypt
170     return crypter.encrypt(packet)
171
172 def decrypt(packet, crypter, ord=ord):
173     if packet:
174         # decrypt
175         packet = crypter.decrypt(packet)
176         
177         # un-pad
178         padding = ord(packet[-1])
179         if not (0 < padding <= crypter.block_size):
180             # wrong padding
181             raise RuntimeError, "Truncated packet"
182         packet = packet[:-padding]
183     
184     return packet
185
186 def nonblock(fd):
187     try:
188         fl = fcntl.fcntl(fd, fcntl.F_GETFL)
189         fl |= os.O_NONBLOCK
190         fcntl.fcntl(fd, fcntl.F_SETFL, fl)
191         return True
192     except:
193         traceback.print_exc(file=sys.stderr)
194         # Just ignore
195         return False
196
197 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
198         cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, bwlimit=None,
199         len=len, max=max, min=min, buffer=buffer, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
200         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
201     crypto_mode = False
202     crypter = None
203
204     try:
205         if cipher_key and cipher:
206             import Crypto.Cipher
207             import hashlib
208             __import__('Crypto.Cipher.'+cipher)
209             
210             ciphername = cipher
211             cipher = getattr(Crypto.Cipher, cipher)
212             hashed_key = hashlib.sha256(cipher_key).digest()
213             if getattr(cipher, 'key_size'):
214                 hashed_key = hashed_key[:cipher.key_size]
215             elif ciphername == 'DES3':
216                 hashed_key = hashed_key[:24]
217             crypter = cipher.new(
218                 hashed_key, 
219                 cipher.MODE_ECB)
220             crypto_mode = True
221     except:
222         traceback.print_exc(file=sys.stderr)
223         crypto_mode = False
224         crypter = None
225
226     if stderr is not None:
227         if crypto_mode:
228             print >>stderr, "Packets are transmitted in CIPHER"
229         else:
230             print >>stderr, "Packets are transmitted in PLAINTEXT"
231     
232     if hasattr(remote, 'fileno'):
233         remote_fd = remote.fileno()
234         if rwrite is None:
235             def rwrite(remote, packet, os_write=os.write):
236                 return os_write(remote_fd, packet)
237         if rread is None:
238             def rread(remote, maxlen, os_read=os.read):
239                 return os_read(remote_fd, maxlen)
240  
241     rnonblock = nonblock(remote)
242     tnonblock = nonblock(tun)
243     
244     # Pick up TUN/TAP writing method
245     if with_pi:
246         try:
247             import iovec
248             
249             # We have iovec, so we can skip PI injection
250             # and use iovec which does it natively
251             if ether_mode:
252                 twrite = iovec.ethpiwrite
253                 tread = iovec.piread2
254             else:
255                 twrite = iovec.ippiwrite
256                 tread = iovec.piread2
257         except ImportError:
258             # We have to inject PI headers pythonically
259             def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
260                 return oswrite(fd, piWrap(packet, ether_mode))
261             
262             # For reading, we strip PI headers with buffer slicing and that's it
263             def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
264                 return piStrip(osread(fd, maxlen))
265     else:
266         # No need to inject PI headers
267         twrite = os.write
268         tread = os.read
269     
270     encrypt_ = encrypt
271     decrypt_ = decrypt
272     xrange_ = xrange
273
274     if accept_local is not None:
275         def tread(fd, maxlen, _tread=tread, accept=accept_local):
276             packet = _tread(fd, maxlen)
277             if accept(packet, 0):
278                 return packet
279             else:
280                 return None
281
282     if accept_remote is not None:
283         if crypto_mode:
284             def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
285                 packet = decrypt_(packet, crypter)
286                 if accept(packet, 1):
287                     return packet
288                 else:
289                     return None
290         else:
291             def rread(fd, maxlen, _rread=rread, accept=accept_remote):
292                 packet = _rread(fd, maxlen)
293                 if accept(packet, 1):
294                     return packet
295                 else:
296                     return None
297     
298     maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
299     tunhurry = max(0,maxbkbuf/2)
300     
301     if queueclass is None:
302         queueclass = collections.deque
303         maxbatch = 2000
304         maxtbatch = 50
305     else:
306         maxfwbuf = maxbkbuf = 2000000000
307         maxbatch = 50
308         maxtbatch = 30
309         tunhurry = 30
310     
311     fwbuf = queueclass()
312     bkbuf = queueclass()
313     nfwbuf = 0
314     nbkbuf = 0
315     
316     # backwards queue functions
317     # they may need packet inspection to 
318     # reconstruct packet boundaries
319     if ether_mode or udp:
320         packetReady = bool
321         pullPacket = queueclass.popleft
322         reschedule = queueclass.appendleft
323     else:
324         packetReady = _packetReady
325         pullPacket = _pullPacket
326         reschedule = queueclass.appendleft
327     
328     # forward queue functions
329     # no packet inspection needed
330     fpacketReady = bool
331     fpullPacket = queueclass.popleft
332     freschedule = queueclass.appendleft
333     
334     tunfd = tun.fileno()
335     os_read = os.read
336     os_write = os.write
337     
338     tget = time.time
339     maxbwfree = bwfree = 1500 * tunqueue
340     lastbwtime = tget()
341     
342     remoteok = True
343     
344     
345     while not TERMINATE:
346         wset = []
347         if packetReady(bkbuf):
348             wset.append(tun)
349         if remoteok and fpacketReady(fwbuf) and (not bwlimit or bwfree > 0):
350             wset.append(remote)
351         
352         rset = []
353         if len(fwbuf) < maxfwbuf:
354             rset.append(tun)
355         if remoteok and len(bkbuf) < maxbkbuf:
356             rset.append(remote)
357         
358         if remoteok:
359             eset = (tun,remote)
360         else:
361             eset = (tun,)
362         
363         try:
364             rdrdy, wrdy, errs = select(rset,wset,eset,1)
365         except selecterror, e:
366             if e.args[0] == errno.EINTR:
367                 # just retry
368                 continue
369             else:
370                 traceback.print_exc(file=sys.stderr)
371                 raise
372
373         # check for errors
374         if errs:
375             if reconnect is not None and remote in errs and tun not in errs:
376                 remote = reconnect()
377                 if hasattr(remote, 'fileno'):
378                     remote_fd = remote.fileno()
379             elif udp and remote in errs and tun not in errs:
380                 # In UDP mode, those are always transient errors
381                 # Usually, an error will imply a read-ready socket
382                 # that will raise an "Connection refused" error, so
383                 # disable read-readiness just for now, and retry
384                 # the select
385                 remoteok = False
386                 continue
387             else:
388                 break
389         else:
390             remoteok = True
391         
392         # check to see if we can write
393         #rr = wr = rt = wt = 0
394         if remote in wrdy:
395             sent = 0
396             try:
397                 try:
398                     for x in xrange(maxbatch):
399                         packet = pullPacket(fwbuf)
400
401                         if crypto_mode:
402                             packet = encrypt_(packet, crypter)
403                         
404                         sentnow = rwrite(remote, packet)
405                         sent += sentnow
406                         #wr += 1
407                         
408                         if not udp and 0 <= sentnow < len(packet):
409                             # packet partially sent
410                             # reschedule the remaining part
411                             # this doesn't happen ever in udp mode
412                             freschedule(fwbuf, buffer(packet,sentnow))
413                         
414                         if not rnonblock or not fpacketReady(fwbuf):
415                             break
416                 except OSError,e:
417                     # This except handles the entire While block on PURPOSE
418                     # as an optimization (setting a try/except block is expensive)
419                     # The only operation that can raise this exception is rwrite
420                     if e.errno in retrycodes:
421                         # re-schedule packet
422                         freschedule(fwbuf, packet)
423                     else:
424                         raise
425             except:
426                 if reconnect is not None:
427                     # in UDP mode, sometimes connected sockets can return a connection refused.
428                     # Give the caller a chance to reconnect
429                     remote = reconnect()
430                     if hasattr(remote, 'fileno'):
431                         remote_fd = remote.fileno()
432                 elif not udp:
433                     # in UDP mode, we ignore errors - packet loss man...
434                     raise
435                 #traceback.print_exc(file=sys.stderr)
436             
437             if bwlimit:
438                 bwfree -= sent
439         if tun in wrdy:
440             try:
441                 for x in xrange(maxtbatch):
442                     packet = pullPacket(bkbuf)
443                     twrite(tunfd, packet)
444                     #wt += 1
445                     
446                     # Do not inject packets into the TUN faster than they arrive, unless we're falling
447                     # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
448                     # don't block either (they're always ready to write), so if we flood the device 
449                     # we'll have high packet loss.
450                     if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
451                         break
452                 else:
453                     if slowlocal:
454                         # Give some time for the kernel to process the packets
455                         time.sleep(0)
456             except OSError,e:
457                 # This except handles the entire While block on PURPOSE
458                 # as an optimization (setting a try/except block is expensive)
459                 # The only operation that can raise this exception is os_write
460                 if e.errno in retrycodes:
461                     # re-schedule packet
462                     reschedule(bkbuf, packet)
463                 else:
464                     raise
465         
466         # check incoming data packets
467         if tun in rdrdy:
468             try:
469                 for x in xrange(maxbatch):
470                     packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
471                     if not packet:
472                         continue
473                     #rt += 1
474                     fwbuf.append(packet)
475                     
476                     if not tnonblock or len(fwbuf) >= maxfwbuf:
477                         break
478             except OSError,e:
479                 # This except handles the entire While block on PURPOSE
480                 # as an optimization (setting a try/except block is expensive)
481                 # The only operation that can raise this exception is os_read
482                 if e.errno not in retrycodes:
483                     raise
484         if remote in rdrdy:
485             try:
486                 try:
487                     for x in xrange(maxbatch):
488                         packet = rread(remote,2000)
489                         
490                         #rr += 1
491                         
492                         if crypto_mode:
493                             packet = decrypt_(packet, crypter)
494                             if not packet:
495                                 continue
496                         elif not packet:
497                             if not udp and packet == "":
498                                 # Connection broken, try to reconnect (or just die)
499                                 raise RuntimeError, "Connection broken"
500                             else:
501                                 continue
502
503                         bkbuf.append(packet)
504                         
505                         if not rnonblock or len(bkbuf) >= maxbkbuf:
506                             break
507                 except OSError,e:
508                     # This except handles the entire While block on PURPOSE
509                     # as an optimization (setting a try/except block is expensive)
510                     # The only operation that can raise this exception is rread
511                     if e.errno not in retrycodes:
512                         raise
513             except Exception, e:
514                 if reconnect is not None:
515                     # in UDP mode, sometimes connected sockets can return a connection refused
516                     # on read. Give the caller a chance to reconnect
517                     remote = reconnect()
518                     if hasattr(remote, 'fileno'):
519                         remote_fd = remote.fileno()
520                 elif not udp:
521                     # in UDP mode, we ignore errors - packet loss man...
522                     raise
523                 traceback.print_exc(file=sys.stderr)
524
525         if bwlimit:
526             tnow = tget()
527             delta = tnow - lastbwtime
528             if delta > 0.001:
529                 delta = int(bwlimit * delta)
530                 if delta > 0:
531                     bwfree = min(bwfree+delta, maxbwfree)
532                     lastbwtime = tnow
533         
534         #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
535
536 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
537     rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
538     retrydelay = 1.0
539     for i in xrange(30):
540         # TERMINATE is a array. An item can be added to TERMINATE, from
541         # outside this function to force termination of the loop
542         if TERMINATE:
543             raise OSError, "Killed"
544         try:
545             rsock.bind((local_addr, local_port))
546             break
547         except socket.error:
548             # wait a while, retry
549             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
550             time.sleep(min(30.0,retrydelay))
551             retrydelay *= 1.1
552     else:
553         rsock.bind((local_addr, local_port))
554     print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
555     print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
556     rsock.connect((peer_addr, peer_port))
557     return rsock
558
559 def udp_handshake(TERMINATE, rsock):
560     endme = False
561     def keepalive():
562         while not endme and not TERMINATE:
563             try:
564                 rsock.send('')
565             except:
566                 pass
567             time.sleep(1)
568         try:
569             rsock.send('')
570         except:
571             pass
572     keepalive_thread = threading.Thread(target=keepalive)
573     keepalive_thread.start()
574     for i in xrange(900):
575         if TERMINATE:
576             raise OSError, "Killed"
577         try:
578             heartbeat = rsock.recv(10)
579             break
580         except:
581             time.sleep(1)
582     else:
583         heartbeat = rsock.recv(10)
584     endme = True
585     keepalive_thread.join()
586
587 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
588     rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
589             peer_port)
590     udp_handshake(TERMINATE, rsock)
591     return rsock 
592
593 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
594     sock = None
595     retrydelay = 1.0
596     # The peer has a firewall that prevents a response to the connect, we 
597     # will be forever blocked in the connect, so we put a reasonable timeout.
598     rsock.settimeout(10) 
599     # We wait for 
600     for i in xrange(30):
601         if stop:
602             break
603         if TERMINATE:
604             raise OSError, "Killed"
605         try:
606             rsock.connect((peer_addr, peer_port))
607             sock = rsock
608             break
609         except socket.error:
610             # wait a while, retry
611             print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
612             time.sleep(min(30.0,retrydelay))
613             retrydelay *= 1.1
614     else:
615         rsock.connect((peer_addr, peer_port))
616         sock = rsock
617     if sock:
618         print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
619         sock.settimeout(0) 
620         
621         print >>sys.stderr, "tcp_connect: disabling NAGLE"
622         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
623     return sock
624
625 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
626     sock = None
627     retrydelay = 1.0
628     # We try to bind to the local virtual interface. 
629     # It might not exist yet so we wait in a loop.
630     for i in xrange(30):
631         if stop:
632             break
633         if TERMINATE:
634             raise OSError, "Killed"
635         try:
636             lsock.bind((local_addr, local_port))
637             break
638         except socket.error:
639             # wait a while, retry
640             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
641             time.sleep(min(30.0,retrydelay))
642             retrydelay *= 1.1
643     else:
644         lsock.bind((local_addr, local_port))
645
646     print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
647     # Now we wait until the other side connects. 
648     # The other side might not be ready yet, so we also wait in a loop for timeouts.
649     timeout = 1
650     lsock.listen(1)
651     for i in xrange(30):
652         if TERMINATE:
653             raise OSError, "Killed"
654         rlist, wlist, xlist = select.select([lsock], [], [], timeout)
655         if stop:
656             break
657         if lsock in rlist:
658             sock,raddr = lsock.accept()
659             print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
660             break
661         timeout += 5
662     return sock
663
664 def tcp_handshake(rsock, listen, hand):
665     # we are going to use a barrier algorithm to decide wich side listen.
666     # each side will "roll a dice" and send the resulting value to the other 
667     # side. 
668     win = False
669     rsock.settimeout(10)
670     try:
671         rsock.send(hand)
672         peer_hand = rsock.recv(4)
673         if not peer_hand:
674             print >>sys.stderr, "tcp_handshake: connection reset by peer"
675             return False
676         else:
677             print >>sys.stderr, "tcp_handshake: hand %r, peer_hand %r" % (hand, peer_hand)
678         if hand < peer_hand:
679             if listen:
680                 win = True
681         elif hand > peer_hand:
682             if not listen:
683                 win = True
684     finally:
685         rsock.settimeout(0)
686     return win
687
688 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
689     def listen(stop, hand, lsock, lresult):
690         win = False
691         rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
692         if rsock:
693             win = tcp_handshake(rsock, True, hand)
694             stop.append(True)
695         lresult.append((win, rsock))
696
697     def connect(stop, hand, rsock, rresult):
698         win = False
699         rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
700         if rsock:
701             win = tcp_handshake(rsock, False, hand)
702             stop.append(True)
703         rresult.append((win, rsock))
704   
705     end = False
706     sock = None
707     for i in xrange(0, 50):
708         if end:
709             break
710         if TERMINATE:
711             raise OSError, "Killed"
712         hand = struct.pack("!L", random.randint(0, 2**30))
713         stop = []
714         lresult = []
715         rresult = []
716         lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
717         rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
718         listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
719         connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
720         connect_thread.start()
721         listen_thread.start()
722         connect_thread.join()
723         listen_thread.join()
724         (lwin, lrsock) = lresult[0]
725         (rwin, rrsock) = rresult[0]
726         if not lrsock or not rrsock:
727             if not lrsock:
728                 sock = rrsock
729             if not rrsock:
730                 sock = lrsock
731             end = True
732         # both socket are connected
733         else:
734            if lwin:
735                 sock = lrsock
736                 end = True
737            elif rwin: 
738                 sock = rrsock
739                 end = True
740
741     if not sock:
742         raise OSError, "Error: tcp_establish could not establish connection."
743     return sock
744
745