for i in xrange(0 if retry else 4):
try:
return func(*p, **kw)
- except select.error, args:
+ except (select.error, socket.error), args:
if args[0] == errno.EINTR:
continue
else:
while '\n' not in chunk:
try:
chunk = conn.recv(1024)
- except socket.timeout:
- continue
- except OSError, e:
- if e.errno != errno.EINTR:
+ except (OSError, socket.error), e:
+ if e[0] != errno.EINTR:
raise
else:
continue
def send_to_server(self, data):
try:
self._ctrl_sock.send(data)
- except IOError, e:
- if e.errno == errno.EPIPE:
+ except (IOError, socket.error), e:
+ if e[0] == errno.EPIPE:
self.connect()
self._ctrl_sock.send(data)
else:
while '\n' not in chunk:
try:
chunk = self._ctrl_sock.recv(1024)
- except OSError, e:
- if e.errno != errno.EINTR:
+ except (OSError, socket.error), e:
+ if e[0] != errno.EINTR:
raise
- if chunk == '':
- continue
+ continue
if chunk:
data.append(chunk)
else:
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
- def test_local_if(self):
+ def _test_if(self, daemonize_testbed, controller_access_configuration):
exp_desc = ExperimentDescription()
testbed_id = "netns"
user = getpass.getuser()
app.set_attribute_value("user", user)
app.connector("node").connect(node1.connector("apps"))
app.enable_trace("stdout")
+
+ if daemonize_testbed:
+ netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+ netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
xml = exp_desc.to_xml()
- controller = ExperimentController(xml, self.root_dir)
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
- ping_result = controller.trace(app.guid, "stdout")
- comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
+ if controller_access_configuration:
+ controller = proxy.create_experiment_controller(xml,
+ controller_access_configuration)
+ else:
+ controller = ExperimentController(xml, self.root_dir)
+
+ try:
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ ping_result = controller.trace(app.guid, "stdout")
+ comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
--- 10.0.0.2 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
"""
- self.assertTrue(ping_result.startswith(comp_result))
- controller.stop()
- controller.shutdown()
+ self.assertTrue(ping_result.startswith(comp_result))
+ finally:
+ controller.stop()
+ controller.shutdown()
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
- def test_all_daemonized_if(self):
- exp_desc = ExperimentDescription()
- testbed_id = "netns"
- user = getpass.getuser()
- netns_provider = FactoriesProvider(testbed_id)
- netns_desc = exp_desc.add_testbed_description(netns_provider)
- netns_desc.set_attribute_value("homeDirectory", self.root_dir)
- #netns_desc.set_attribute_value("enableDebug", True)
- node1 = netns_desc.create("Node")
- node2 = netns_desc.create("Node")
- iface1 = netns_desc.create("NodeInterface")
- iface1.set_attribute_value("up", True)
- node1.connector("devs").connect(iface1.connector("node"))
- ip1 = iface1.add_address()
- ip1.set_attribute_value("Address", "10.0.0.1")
- iface2 = netns_desc.create("NodeInterface")
- iface2.set_attribute_value("up", True)
- node2.connector("devs").connect(iface2.connector("node"))
- ip2 = iface2.add_address()
- ip2.set_attribute_value("Address", "10.0.0.2")
- switch = netns_desc.create("Switch")
- switch.set_attribute_value("up", True)
- iface1.connector("switch").connect(switch.connector("devs"))
- iface2.connector("switch").connect(switch.connector("devs"))
- app = netns_desc.create("Application")
- app.set_attribute_value("command", "ping -qc1 10.0.0.2")
- app.set_attribute_value("user", user)
- app.connector("node").connect(node1.connector("apps"))
- app.enable_trace("stdout")
-
- netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
- inst_root_dir = os.path.join(self.root_dir, "instance")
- os.mkdir(inst_root_dir)
- netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
-
- xml = exp_desc.to_xml()
+ def test_local_if(self):
+ self._test_if(
+ daemonize_testbed = False,
+ controller_access_configuration = None)
+ @test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
+ def test_all_daemonized_if(self):
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
- controller = proxy.create_experiment_controller(xml, access_config)
-
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
- ping_result = controller.trace(app.guid, "stdout")
- comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
-
---- 10.0.0.2 ping statistics ---
-1 packets transmitted, 1 received, 0% packet loss, time 0ms
-"""
- self.assertTrue(ping_result.startswith(comp_result))
- controller.stop()
- controller.shutdown()
+
+ self._test_if(
+ daemonize_testbed = True,
+ controller_access_configuration = access_config)
@test_util.skipUnless(os.getuid() == 0, "Test requires root privileges")
def test_all_ssh_daemonized_if(self):
- exp_desc = ExperimentDescription()
- testbed_id = "netns"
env = test_util.test_environment()
- user = getpass.getuser()
- netns_provider = FactoriesProvider(testbed_id)
- netns_desc = exp_desc.add_testbed_description(netns_provider)
- netns_desc.set_attribute_value("homeDirectory", self.root_dir)
- #netns_desc.set_attribute_value("enableDebug", True)
- node1 = netns_desc.create("Node")
- node2 = netns_desc.create("Node")
- iface1 = netns_desc.create("NodeInterface")
- iface1.set_attribute_value("up", True)
- node1.connector("devs").connect(iface1.connector("node"))
- ip1 = iface1.add_address()
- ip1.set_attribute_value("Address", "10.0.0.1")
- iface2 = netns_desc.create("NodeInterface")
- iface2.set_attribute_value("up", True)
- node2.connector("devs").connect(iface2.connector("node"))
- ip2 = iface2.add_address()
- ip2.set_attribute_value("Address", "10.0.0.2")
- switch = netns_desc.create("Switch")
- switch.set_attribute_value("up", True)
- iface1.connector("switch").connect(switch.connector("devs"))
- iface2.connector("switch").connect(switch.connector("devs"))
- app = netns_desc.create("Application")
- app.set_attribute_value("command", "ping -qc1 10.0.0.2")
- app.set_attribute_value("user", user)
- app.connector("node").connect(node1.connector("apps"))
- app.enable_trace("stdout")
-
- netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
- inst_root_dir = os.path.join(self.root_dir, "instance")
- os.mkdir(inst_root_dir)
- netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
-
- xml = exp_desc.to_xml()
-
+
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
access_config.set_attribute_value(DC.USE_AGENT, True)
- controller = proxy.create_experiment_controller(xml, access_config)
-
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
- ping_result = controller.trace(app.guid, "stdout")
- comp_result = """PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
-
---- 10.0.0.2 ping statistics ---
-1 packets transmitted, 1 received, 0% packet loss, time 0ms
-"""
- self.assertTrue(ping_result.startswith(comp_result))
- controller.stop()
- controller.shutdown()
+
+ self._test_if(
+ daemonize_testbed = True,
+ controller_access_configuration = access_config)
def tearDown(self):
try:
def setUp(self):
self.root_dir = tempfile.mkdtemp()
- @test_util.skipUnless(test_util.ns3_usable(),
- "Test requires working ns-3 bindings")
- def test_local_if(self):
+ def _test_if(self, daemonize_testbed, controller_access_configuration):
exp_desc = ExperimentDescription()
testbed_id = "ns3"
ns3_provider = FactoriesProvider(testbed_id)
app.set_attribute_value("StopTime", "20s")
app.connector("node").connect(node1.connector("apps"))
+ if daemonize_testbed:
+ ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ ns3_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+ ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
xml = exp_desc.to_xml()
- controller = ExperimentController(xml, self.root_dir)
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
- ping_result = controller.trace(iface2.guid, "P2PAsciiTrace")
- comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)"
- if ping_result.find(comp_result) == -1:
- self.fail("Unexpected trace: %s" % (ping_result,))
- controller.stop()
- controller.shutdown()
+
+ if controller_access_configuration:
+ controller = ExperimentController(xml, self.root_dir)
+ else:
+ controller = proxy.create_experiment_controller(xml,
+ controller_access_configuration)
+
+ try:
+ controller.start()
+ while not controller.is_finished(app.guid):
+ time.sleep(0.5)
+ ping_result = controller.trace(iface2.guid, "P2PAsciiTrace")
+ comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)"
+ if ping_result.find(comp_result) == -1:
+ self.fail("Unexpected trace: %s" % (ping_result,))
+ finally:
+ controller.stop()
+ controller.shutdown()
@test_util.skipUnless(test_util.ns3_usable(),
"Test requires working ns-3 bindings")
- def test_all_daemonized_if(self):
- exp_desc = ExperimentDescription()
- testbed_id = "ns3"
- ns3_provider = FactoriesProvider(testbed_id)
- ns3_desc = exp_desc.add_testbed_description(ns3_provider)
- ns3_desc.set_attribute_value("homeDirectory", self.root_dir)
-
- node1 = ns3_desc.create("ns3::Node")
- ipv41 = ns3_desc.create("ns3::Ipv4L3Protocol")
- arp1 = ns3_desc.create("ns3::ArpL3Protocol")
- icmp1 = ns3_desc.create("ns3::Icmpv4L4Protocol")
- node1.connector("protos").connect(ipv41.connector("node"))
- node1.connector("protos").connect(arp1.connector("node"))
- node1.connector("protos").connect(icmp1.connector("node"))
- iface1 = ns3_desc.create("ns3::PointToPointNetDevice")
- queue1 = ns3_desc.create("ns3::DropTailQueue")
- node1.connector("devs").connect(iface1.connector("node"))
- iface1.connector("queue").connect(queue1.connector("dev"))
- trace1 = iface1.enable_trace("P2PAsciiTrace")
- ip1 = iface1.add_address()
- ip1.set_attribute_value("Address", "10.0.0.1")
-
- node2 = ns3_desc.create("ns3::Node")
- ipv42 = ns3_desc.create("ns3::Ipv4L3Protocol")
- arp2 = ns3_desc.create("ns3::ArpL3Protocol")
- icmp2 = ns3_desc.create("ns3::Icmpv4L4Protocol")
- node2.connector("protos").connect(ipv42.connector("node"))
- node2.connector("protos").connect(arp2.connector("node"))
- node2.connector("protos").connect(icmp2.connector("node"))
- iface2 = ns3_desc.create("ns3::PointToPointNetDevice")
- queue2 = ns3_desc.create("ns3::DropTailQueue")
- node2.connector("devs").connect(iface2.connector("node"))
- iface2.connector("queue").connect(queue2.connector("dev"))
- trace2 = iface2.enable_trace("P2PAsciiTrace")
- ip2 = iface2.add_address()
- ip2.set_attribute_value("Address", "10.0.0.2")
+ def test_local_if(self):
+ self._test_if(
+ daemonize_testbed = False,
+ controller_access_configuration = None)
- chan = ns3_desc.create("ns3::PointToPointChannel")
- iface1.connector("chan").connect(chan.connector("dev2"))
- iface2.connector("chan").connect(chan.connector("dev2"))
+ @test_util.skipUnless(test_util.ns3_usable(),
+ "Test requires working ns-3 bindings")
+ def test_all_daemonized_if(self):
+ access_config = proxy.AccessConfiguration()
+ access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
+ access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
- app = ns3_desc.create("ns3::V4Ping")
- app.set_attribute_value("Remote", "10.0.0.2")
- app.set_attribute_value("StartTime", "0s")
- app.set_attribute_value("StopTime", "20s")
- app.connector("node").connect(node1.connector("apps"))
+ self._test_if(
+ daemonize_testbed = True,
+ controller_access_configuration = access_config)
- ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
- inst_root_dir = os.path.join(self.root_dir, "instance")
- os.mkdir(inst_root_dir)
- ns3_desc.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
- ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+ @test_util.skipUnless(test_util.ns3_usable(),
+ "Test requires working ns-3 bindings")
+ def test_all_ssh_daemonized_if(self):
+ env = test_util.test_environment()
- xml = exp_desc.to_xml()
access_config = proxy.AccessConfiguration()
access_config.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
access_config.set_attribute_value(DC.ROOT_DIRECTORY, self.root_dir)
access_config.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
- controller = proxy.create_experiment_controller(xml, access_config)
-
- controller.start()
- while not controller.is_finished(app.guid):
- time.sleep(0.5)
-
- ping_result = controller.trace(iface2.guid, "P2PAsciiTrace")
- comp_result = "- 19.021 /NodeList/1/DeviceList/0/$ns3::PointToPointNetDevice/TxQueue/Dequeue ns3::PppHeader (Point-to-Point Protocol: IP (0x0021)) ns3::Ipv4Header (tos 0x0 ttl 64 id 19 protocol 1 offset 0 flags [none] length: 84 10.0.0.2 > 10.0.0.1) ns3::Icmpv4Header (type=0, code=0) ns3::Icmpv4Echo (identifier=0, sequence=19)"
- if ping_result.find(comp_result) == -1:
- self.fail("Unexpected trace: %s" % (ping_result,))
- controller.stop()
- controller.shutdown()
+ access_config.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+ access_config.set_attribute_value(DC.DEPLOYMENT_PORT, env.port)
+ access_config.set_attribute_value(DC.USE_AGENT, True)
+
+ self._test_if(
+ daemonize_testbed = True,
+ controller_access_configuration = access_config)
def tearDown(self):
try:
from nepi.core.design import ExperimentDescription, FactoriesProvider
from nepi.core.execute import ExperimentController
from nepi.util import proxy
+from nepi.util.constants import DeploymentConfiguration as DC
import os
import shutil
import tempfile
pl_desc.set_attribute_value("plcHost", plchost)
return pl_desc, exp_desc
-
- @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
- def test_simple(self):
+
+ def _test_simple(self, daemonize_testbed, controller_access_configuration):
pl, exp = self.make_experiment_desc()
node1 = pl.create("Node")
app.enable_trace("stdout")
app.connector("node").connect(node1.connector("apps"))
- xml = exp.to_xml()
-
- controller = ExperimentController(xml, self.root_dir)
+ if daemonize_testbed:
+ pl.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ inst_root_dir = os.path.join(self.root_dir, "instance")
+ os.mkdir(inst_root_dir)
+ pl.set_attribute_value(DC.ROOT_DIRECTORY, inst_root_dir)
+ pl.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL)
+
+ if controller_access_configuration:
+ controller = proxy.create_experiment_controller(xml,
+ controller_access_configuration)
+ else:
+ controller = ExperimentController(xml, self.root_dir)
+
try:
controller.start()
while not controller.is_finished(app.guid):
controller.stop()
controller.shutdown()
-
@test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
def test_spanning_deployment(self):
pl, exp = self.make_experiment_desc()
finally:
controller.stop()
controller.shutdown()
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_simple(self):
+ self._test_simple(
+ daemonize_testbed = False,
+ controller_access_configuration = None)
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_simple_daemonized(self):
+ access_config = proxy.AccessConfiguration({
+ DC.DEPLOYMENT_MODE : DC.MODE_DAEMON,
+ DC.ROOT_DIRECTORY : self.root_dir,
+ })
+
+ self._test_simple(
+ daemonize_testbed = False,
+ controller_access_configuration = access_config)
+
+ @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ def test_simple_ssh(self):
+ env = test_util.test_environment()
+
+ access_config = proxy.AccessConfiguration({
+ DC.DEPLOYMENT_MODE : DC.MODE_DAEMON,
+ DC.ROOT_DIRECTORY : self.root_dir,
+ DC.LOG_LEVEL : DC.DEBUG_LEVEL,
+ DC.DEPLOYMENT_COMMUNICATION : DC.ACCESS_SSH,
+ DC.DEPLOYMENT_PORT : env.port,
+ DC.USE_AGENT : True,
+ })
+
+ self._test_simple(
+ daemonize_testbed = False,
+ controller_access_configuration = access_config)
if __name__ == '__main__':