3 from nepi.util import tunchannel
9 class TunnChannelTestCase(unittest.TestCase):
10 def test_send_suspend_terminate(self):
11 def tun_fwd(local, remote, TERMINATE, SUSPEND, STOPPED):
12 tunchannel.tun_fwd(local, remote, True, True, None, True,
13 TERMINATE, SUSPEND, None)
20 s1, s2 = socket.socketpair()
21 s3, s4 = socket.socketpair()
24 t = threading.Thread(target=tun_fwd, args=[s2, s3, TERMINATE, SUSPEND, STOPPED])
29 rtxt = s4.recv(len(txt))
31 self.assertTrue(rtxt == txt[4:])
33 # Let's try to suspend execution now
34 cond = threading.Condition()
35 SUSPEND.insert(0, cond)
37 txt = "0000|suspended"
42 rtxt = s4.recv(len(txt))
43 except socket.timeout:
46 self.assertTrue(rtxt == "timeout")
48 # Let's see if we can resume and receive the message
55 rtxt = s4.recv(len(txt))
56 self.assertTrue(rtxt == txt[4:])
59 TERMINATE.append(None)
61 txt = "0000|never received"
66 rtxt = s4.recv(len(txt))
67 except socket.timeout:
70 self.assertTrue(rtxt == "timeout")
71 self.assertTrue(STOPPED)
73 if __name__ == '__main__':