+def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
+ peer_port)
+ udp_handshake(TERMINATE, rsock)
+ return rsock
+
+def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
+ sock = None
+ retrydelay = 1.0
+ # The peer has a firewall that prevents a response to the connect, we
+ # will be forever blocked in the connect, so we put a reasonable timeout.
+ sock.settimeout(10)
+ # We wait for
+ for i in xrange(30):
+ if stop:
+ break
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ rsock.connect((peer_addr, peer_port))
+ sock = rsock
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ rsock.connect((peer_addr, peer_port))
+ sock = rsock
+ if sock:
+ sock.settimeout(0)
+ return sock
+
+def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
+ sock = None
+ retrydelay = 1.0
+ # We try to bind to the local virtual interface.
+ # It might not exist yet so we wait in a loop.
+ for i in xrange(30):
+ if stop:
+ break
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ lsock.bind((local_addr, local_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
+ time.sleep(min(30.0,retrydelay))
+ retrydelay *= 1.1
+ else:
+ lsock.bind((local_addr, local_port))
+
+ # Now we wait until the other side connects.
+ # The other side might not be ready yet, so we also wait in a loop for timeouts.
+ timeout = 1
+ lsock.listen(1)
+ for i in xrange(30):
+ if TERMINATE:
+ raise OSError, "Killed"
+ rlist, wlist, xlist = select.select([lsock], [], [], timeout)
+ if stop:
+ break
+ if lsock in rlist:
+ sock,raddr = lsock.accept()
+ break
+ timeout += 5
+ return sock
+
+def tcp_handshake(TERMINATE, sock, listen, dice):
+ # we are going to use a barrier algorithm to decide wich side listen.
+ # each side will "roll a dice" and send the resulting value to the other
+ # side.
+ result = None
+ sock.settimeout(5)
+ for i in xrange(100):
+ if TERMINATE:
+ raise OSError, "Killed"
+ try:
+ hand = dice.read()
+ sock.send(hand)
+ peer_hand = sock.recv(1)
+ if hand < peer_hand:
+ if listen:
+ result = sock
+ break
+ elif hand > peer_hand:
+ if not listen:
+ result = sock
+ break
+ else:
+ dice.release()
+ dice.throw()
+ except socket.error:
+ dice.release()
+ break
+ if result:
+ sock.settimeout(0)
+ return result
+
+def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
+ def listen(stop, result, lsock, dice):
+ lsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
+ if lsock:
+ lsock = tcp_handshake(TERMINATE, lsock, True, dice)
+ if lsock:
+ stop[0] = True
+ result[0] = lsock
+
+ def connect(stop, result, rsock, dice):
+ rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
+ if rsock:
+ rsock = tcp_handshake(TERMINATE, rsock, False, dice)
+ if sock:
+ stop[0] = True
+ result[0] = rsock
+
+ dice = Dice()
+ dice.throw()
+ stop = []
+ result = []
+ lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ connect_thread = threading.Thread(target=connect, args=(stop, result, rsock, dice))
+ listen_thread = threading.Thread(target=listen, args=(stop, result, lsock, dice))
+ connect_thread.start()
+ listen_thread.start()
+ connect_thread.join()
+ listen_thread.join()
+ if not result:
+ raise OSError, "Error: tcp_establish could not establish connection."
+ sock = result[0]
+ if sock == lsock:
+ rsock.close()
+ else:
+ lsock.close()
+ return sock
+
+class Dice(object):
+ def __init__(self):
+ self._condition = threading.Condition(threading.Lock())
+ self._readers = 0
+ self._value = None
+
+ def read(self):
+ self._condition.acquire()
+ try:
+ self._readers += 1
+ finally:
+ self._condition.release()
+ return self._value
+
+ def release(self):
+ self._condition.acquire()
+ try:
+ if self._readers > 0:
+ self._readers -= 1
+ if self._readers == 0:
+ self._condition.notifyAll()
+ finally:
+ self._condition.release()
+
+ def throw(self):
+ self._condition.acquire()
+ try:
+ while self._readers > 0:
+ self._condition.wait()
+ self._value = random.randint(1, 6)
+ finally:
+ self._condition.release()
+