self.assertTrue(netpipe_stats, "Unavailable netpipe stats")
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
- def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None, Filter1args=None, Filter2args=None, PLREX=None):
+ def _pingtest(self, TunClass, ConnectionProto, Cipher, Filter1=None, Filter2=None, Filter1args=None, Filter2args=None, PLREX=None, flood=False):
instance = self.make_instance()
instance.defer_create(2, "Node")
instance.defer_create_set(7, "tun_cipher", Cipher)
instance.defer_add_trace(7, "packets")
instance.defer_add_address(7, self.net_prefix+".2", 24, False)
+ if flood:
+ instance.defer_create_set(7, "bwlimit", 128)
instance.defer_connect(2, "devs", 7, "node")
instance.defer_create(8, TunClass)
instance.defer_create_set(8, "tun_cipher", Cipher)
instance.defer_add_address(8, self.net_prefix+".3", 24, False)
instance.defer_connect(3, "devs", 8, "node")
instance.defer_create(9, "Application")
- instance.defer_create_set(9, "command", "ping -qc10 {#[GUID-8].addr[0].[Address]#}")
+ if flood:
+ instance.defer_create_set(9, "command", "sudo -S ping -s 1000 -l 1000 -qfc1000 {#[GUID-8].addr[0].[Address]#} ; sleep 20 ; ping -qc10 {#[GUID-8].addr[0].[Address]#}")
+ else:
+ instance.defer_create_set(9, "command", "ping -qc10 {#[GUID-8].addr[0].[Address]#}")
instance.defer_add_trace(9, "stdout")
instance.defer_add_trace(9, "stderr")
instance.defer_connect(9, "node", 2, "apps")
--- .* ping statistics ---
10 packets transmitted, [0-9]+ received,.* %s%% packet loss, time \d*ms.*
""" % (plr,)
+ if flood:
+ comp_result = ".*" + comp_result
try:
instance.do_setup()
pass
# asserts at the end, to make sure there's proper cleanup
- self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+ self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE|re.DOTALL),
"Unexpected trace:\n%s\nPackets @ source:\n%s\nPackets @ target:\n%s" % (
ping_result,
packets1,
def test_tun_ping_gre(self):
self._pingtest("TunInterface", "gre", "PLAIN")
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_tun_ping_flood(self):
+ self._pingtest("TunInterface", "tcp", "PLAIN", flood = True)
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_tun_ping_flood_udp(self):
+ self._pingtest("TunInterface", "udp", "PLAIN", flood = True)
+
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_tap_ping(self):
self._pingtest("TapInterface", "tcp", "AES")