tcp_handshake works!
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 31 Aug 2011 17:38:38 +0000 (19:38 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 31 Aug 2011 17:38:38 +0000 (19:38 +0200)
src/nepi/util/tunchannel.py
test/testbeds/planetlab/integration_multi.py

index baf3927..cb076c6 100644 (file)
@@ -639,118 +639,80 @@ def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
         timeout += 5
     return sock
 
-def tcp_handshake(TERMINATE, rsock, listen, dice):
+def tcp_handshake(rsock, listen, hand):
     # we are going to use a barrier algorithm to decide wich side listen.
     # each side will "roll a dice" and send the resulting value to the other 
     # side. 
-    sock = None
-    rsock.settimeout(5)
-    for i in xrange(100):
-        if TERMINATE:
-            raise OSError, "Killed"
-        try:
-            hand = dice.read()
-            rsock.send(str(hand))
-            peer_hand = rsock.recv(1)
-            print >>sys.stderr, "tcp_handshake: hand %s, peer_hand %s" % (hand, peer_hand)
-            if hand < peer_hand:
-                if listen:
-                    sock = rsock
-                break   
-            elif hand > peer_hand:
-                if not listen:
-                    sock = rsock
-                break
-            else:
-                dice.release()
-                dice.throw()
-        except socket.error:
-            dice.release()
-            break
-    if sock:
-        sock.settimeout(0)
-    return sock
+    win = False
+    rsock.settimeout(10)
+    try:
+        rsock.send(hand)
+        peer_hand = rsock.recv(1)
+        print >>sys.stderr, "tcp_handshake: hand %s, peer_hand %s" % (hand, peer_hand)
+        if hand < peer_hand:
+            if listen:
+                win = True
+        elif hand > peer_hand:
+            if not listen:
+                win = True
+    except socket.timeout:
+        pass
+    rsock.settimeout(0)
+    return win
 
 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
-    def listen(stop, result, lsock, dice):
+    def listen(stop, hand, lsock, lresult):
+        win = False
         rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
         if rsock:
-            rsock = tcp_handshake(TERMINATE, rsock, True, dice)
-            if rsock:
-                stop.append(True)
-                result.append(rsock)
-                rsock.send("GATO")
-                rsock.settimeout(6)
-                try:
-                    perro = rsock.recv(5)
-                except:
-                    perro = "ERROR! TIMEOUT"
-                rsock.settimeout(0)
-                print >>sys.stderr, "tcp_establish: listen %s" % perro
+            win = tcp_handshake(rsock, True, hand)
+            stop.append(True)
+        lresult.append((win, rsock))
 
-    def connect(stop, result, rsock, dice):
+    def connect(stop, hand, rsock, rresult):
+        win = False
         rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
         if rsock:
-            rsock = tcp_handshake(TERMINATE, rsock, False, dice)
-            if rsock:
-                stop.append(True)
-                result.append(rsock)
-                rsock.send("PERRO")
-                rsock.settimeout(6)
-                try:
-                    gato = rsock.recv(4)
-                except:
-                    gato = "TIMEOUT!!"
-                rsock.settimeout(0)
-                print >>sys.stderr, "tcp_establish: connected %s" % gato
-    
-    dice = Dice()
-    dice.throw()
-    stop = []
-    result = []
-    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-    rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-    listen_thread = threading.Thread(target=listen, args=(stop, result, lsock, dice))
-    connect_thread = threading.Thread(target=connect, args=(stop, result, rsock, dice))
-    connect_thread.start()
-    listen_thread.start()
-    connect_thread.join()
-    listen_thread.join()
-    if not result:
+            win = tcp_handshake(rsock, False, hand)
+            stop.append(True)
+        rresult.append((win, rsock))
+  
+    end = False
+    sock = None
+    while not end:
+        if TERMINATE:
+            raise OSError, "Killed"
+        hand = str(random.randint(1, 6))
+        stop = []
+        lresult = []
+        rresult = []
+        lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+        rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+        listen_thread = threading.Thread(target=listen, args=(stop, hand, lsock, lresult))
+        connect_thread = threading.Thread(target=connect, args=(stop, hand, rsock, rresult))
+        connect_thread.start()
+        listen_thread.start()
+        connect_thread.join()
+        listen_thread.join()
+        (lwin, lrsock) = lresult[0]
+        (rwin, rrsock) = rresult[0]
+        if not lrsock or not rrsock:
+            if not lrsock:
+                sock = rrsock
+            if not rrsock:
+                sock = lrsock
+            end = True
+        # both socket are connected
+        else:
+           if lwin:
+                sock = lrsock
+                end = True
+           elif rwin: 
+                sock = rrsock
+                end = True
+
+    if not sock:
         raise OSError, "Error: tcp_establish could not establish connection."
-    sock = result[0]
     return sock
 
-class Dice(object):
-    def __init__(self):
-        self._condition = threading.Condition(threading.Lock())
-        self._readers = 0
-        self._value = None
-
-    def read(self):
-        self._condition.acquire()
-        try:
-            self._readers += 1
-        finally:
-            self._condition.release()
-        return self._value
-
-    def release(self):
-        self._condition.acquire()
-        try:
-            if self._readers > 0:
-                self._readers -= 1
-            if self._readers == 0:
-                self._condition.notifyAll()
-        finally:
-            self._condition.release()
-
-    def throw(self):
-        self._condition.acquire()
-        try:
-            while self._readers > 0:
-                self._condition.wait()
-            self._value = random.randint(1, 6)
-        finally:
-            self._condition.release()
 
index 4be4f0e..6ed0aee 100755 (executable)
@@ -71,7 +71,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
         pl_desc.set_attribute_value("tapPortBase", self.port_base)
         pl_desc.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
         pl_desc.set_attribute_value("dedicatedSlice", True)
-        pl_desc.set_attribute_value("plLogLevel", "DEBUG")
+        #pl_desc.set_attribute_value("plLogLevel", "DEBUG")
 
         pl_desc2 = exp_desc.add_testbed_description(pl_provider)
         pl_desc2.set_attribute_value("homeDirectory", self.root_dir+"v2")
@@ -83,7 +83,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
         pl_desc2.set_attribute_value("tapPortBase", self.port_base+500)
         pl_desc2.set_attribute_value("p2pDeployment", False) # it's interactive, we don't want it in tests
         pl_desc2.set_attribute_value("dedicatedSlice", True)
-        pl_desc2.set_attribute_value("plLogLevel", "DEBUG")
+        #pl_desc2.set_attribute_value("plLogLevel", "DEBUG")
         
         return pl_desc, pl_desc2, exp_desc
     
@@ -184,7 +184,7 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
 
     @test_util.skipUnless(test_util.pl_auth() is not None, 
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
-    def ptest_plpl_crossconnect_udp(self):
+    def test_plpl_crossconnect_udp(self):
         self._test_plpl_crossconnect("udp")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, 
@@ -194,12 +194,12 @@ class PlanetLabMultiIntegrationTestCase(unittest.TestCase):
 
     @test_util.skipUnless(test_util.pl_auth() is not None, 
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
-    def ptest_plpl_crossconnect_gre(self):
+    def test_plpl_crossconnect_gre(self):
         self._test_plpl_crossconnect("gre")
 
     @test_util.skipUnless(test_util.pl_auth() is not None, 
         "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
-    def ptest_plpl_crossconnect_udp_recover(self):
+    def test_plpl_crossconnect_udp_recover(self):
         self._test_plpl_crossconnect("udp", 
             recover = True)