From: Alina Quereilhac Date: Wed, 31 Aug 2011 17:38:38 +0000 (+0200) Subject: tcp_handshake works! X-Git-Tag: nepi-3.0.0~253^2 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=ad3318c3f6b0258c3d90b59cdbe6f42f77134b2d;p=nepi.git tcp_handshake works! --- diff --git a/src/nepi/util/tunchannel.py b/src/nepi/util/tunchannel.py index baf39276..cb076c60 100644 --- a/src/nepi/util/tunchannel.py +++ b/src/nepi/util/tunchannel.py @@ -639,118 +639,80 @@ def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port): timeout += 5 return sock -def tcp_handshake(TERMINATE, rsock, listen, dice): +def tcp_handshake(rsock, listen, hand): # we are going to use a barrier algorithm to decide wich side listen. # each side will "roll a dice" and send the resulting value to the other # side. - sock = None - rsock.settimeout(5) - for i in xrange(100): - if TERMINATE: - raise OSError, "Killed" - try: - hand = dice.read() - rsock.send(str(hand)) - peer_hand = rsock.recv(1) - print >>sys.stderr, "tcp_handshake: hand %s, peer_hand %s" % (hand, peer_hand) - if hand < peer_hand: - if listen: - sock = rsock - break - elif hand > peer_hand: - if not listen: - sock = rsock - break - else: - dice.release() - dice.throw() - except socket.error: - dice.release() - break - if sock: - sock.settimeout(0) - return sock + win = False + rsock.settimeout(10) + try: + rsock.send(hand) + peer_hand = rsock.recv(1) + print >>sys.stderr, "tcp_handshake: hand %s, peer_hand %s" % (hand, peer_hand) + if hand < peer_hand: + if listen: + win = True + elif hand > peer_hand: + if not listen: + win = True + except socket.timeout: + pass + rsock.settimeout(0) + return win def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port): - def listen(stop, result, lsock, dice): + def listen(stop, hand, lsock, lresult): + win = False rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port) if rsock: - rsock = tcp_handshake(TERMINATE, rsock, True, dice) - if rsock: - stop.append(True) - result.append(rsock) - rsock.send("GATO") - rsock.settimeout(6) - try: - perro = rsock.recv(5) - except: - perro = "ERROR! TIMEOUT" - rsock.settimeout(0) - print >>sys.stderr, "tcp_establish: listen %s" % perro + win = tcp_handshake(rsock, True, hand) + stop.append(True) + lresult.append((win, rsock)) - def connect(stop, result, rsock, dice): + def connect(stop, hand, rsock, rresult): + win = False rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port) if rsock: - rsock = tcp_handshake(TERMINATE, rsock, False, dice) - if rsock: - stop.append(True) - result.append(rsock) - rsock.send("PERRO") - rsock.settimeout(6) - try: - gato = rsock.recv(4) - except: - gato = "TIMEOUT!!" - rsock.settimeout(0) - print >>sys.stderr, "tcp_establish: connected %s" % gato - - dice = Dice() - dice.throw() - stop = [] - result = [] - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - listen_thread = threading.Thread(target=listen, args=(stop, result, lsock, dice)) - connect_thread = threading.Thread(target=connect, args=(stop, result, rsock, dice)) - connect_thread.start() - listen_thread.start() - connect_thread.join() - listen_thread.join() - if not result: + win = tcp_handshake(rsock, False, hand) + stop.append(True) + rresult.append((win, rsock)) + + end = False + sock = None + while not end: + if TERMINATE: + raise OSError, "Killed" + hand = str(random.randint(1, 6)) + stop = [] + lresult = [] + rresult = [] + lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult)) + connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult)) + connect_thread.start() + listen_thread.start() + connect_thread.join() + listen_thread.join() + (lwin, lrsock) = lresult[0] + (rwin, rrsock) = rresult[0] + if not lrsock or not rrsock: + if not lrsock: + sock = rrsock + if not rrsock: + sock = lrsock + end = True + # both socket are connected + else: + if lwin: + sock = lrsock + end = True + elif rwin: + sock = rrsock + end = True + + if not sock: raise OSError, "Error: tcp_establish could not establish connection." - sock = result[0] return sock -class Dice(object): - def __init__(self): - self._condition = threading.Condition(threading.Lock()) - self._readers = 0 - self._value = None - - def read(self): - self._condition.acquire() - try: - self._readers += 1 - finally: - self._condition.release() - return self._value - - def release(self): - self._condition.acquire() - try: - if self._readers > 0: - self._readers -= 1 - if self._readers == 0: - self._condition.notifyAll() - finally: - self._condition.release() - - def throw(self): - self._condition.acquire() - try: - while self._readers > 0: - self._condition.wait() - self._value = random.randint(1, 6) - finally: - self._condition.release() diff --git a/test/testbeds/planetlab/integration_multi.py b/test/testbeds/planetlab/integration_multi.py index 4be4f0ed..6ed0aeee 100755 --- a/test/testbeds/planetlab/integration_multi.py +++ b/test/testbeds/planetlab/integration_multi.py @@ -71,7 +71,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase): pl_desc.set_attribute_value("tapPortBase", self.port_base) pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests pl_desc.set_attribute_value("dedicatedSlice", True) - pl_desc.set_attribute_value("plLogLevel", "DEBUG") + #pl_desc.set_attribute_value("plLogLevel", "DEBUG") pl_desc2 = exp_desc.add_testbed_description(pl_provider) pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2") @@ -83,7 +83,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase): pl_desc2.set_attribute_value("tapPortBase", self.port_base+500) pl_desc2.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests pl_desc2.set_attribute_value("dedicatedSlice", True) - pl_desc2.set_attribute_value("plLogLevel", "DEBUG") + #pl_desc2.set_attribute_value("plLogLevel", "DEBUG") return pl_desc, pl_desc2, exp_desc @@ -184,7 +184,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase): @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") - def ptest_plpl_crossconnect_udp(self): + def test_plpl_crossconnect_udp(self): self._test_plpl_crossconnect("udp") @test_util.skipUnless(test_util.pl_auth() is not None, @@ -194,12 +194,12 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase): @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") - def ptest_plpl_crossconnect_gre(self): + def test_plpl_crossconnect_gre(self): self._test_plpl_crossconnect("gre") @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)") - def ptest_plpl_crossconnect_udp_recover(self): + def test_plpl_crossconnect_udp_recover(self): self._test_plpl_crossconnect("udp", recover = True)