- tun_connect retries on binding error (happens regularly during tests and may happen in Real Life (tm) too)
- re-enabled wrongfully disabled ns3-in-pl tests
- added prestart global synchronization step, which solves many cross-testbed synchronization issues
def __init__(self, factory_id, create_function, start_function,
stop_function, status_function,
configure_function, preconfigure_function,
+ prestart_function,
allow_addresses = False, has_addresses = False,
allow_routes = False, has_routes = False):
super(Factory, self).__init__()
self._status_function = status_function
self._configure_function = configure_function
self._preconfigure_function = preconfigure_function
+ self._prestart_function = prestart_function
self._connector_types = dict()
self._traces = list()
self._box_attributes = AttributesMap()
def create_function(self):
return self._create_function
+ @property
+ def prestart_function(self):
+ return self._prestart_function
+
@property
def start_function(self):
return self._start_function
"""After do_configure elements are configured"""
raise NotImplementedError
+ def do_prestart(self):
+ """Before do_start elements are prestart-configured"""
+ raise NotImplementedError
+
def do_cross_connect_init(self, cross_data):
"""
After do_cross_connect_init initiation of all external connections
cross_data = self._get_cross_data(guid)
testbed.do_cross_connect_compl(cross_data)
+ # Last chance to configure (parallel on all testbeds)
+ self._parallel([testbed.do_prestart
+ for testbed in self._testbeds.itervalues()])
+
# start experiment (parallel start on all testbeds)
self._parallel([testbed.start
for testbed in self._testbeds.itervalues()])
"""
return self.configure_order
+ @property
+ def prestart_order(self):
+ """ list of factory ids that indicates the order in which the elements
+ should be prestart-configured.
+
+ Default: same as configure_order
+ """
+ return self.configure_order
+
@property
def start_order(self):
""" list of factory ids that indicates the order in which the elements
(just after connections are made,
just before netrefs are resolved)
"configure_function": function for element configuration,
+ "prestart_function": function for pre-start
+ element configuration (just before starting applications),
+ useful for synchronization of background setup tasks or
+ lazy instantiation or configuration of attributes
+ that require connection/cross-connection state before
+ being created.
+ After this point, all applications should be able to run.
"factory_attributes": list of references to attribute_ids,
"box_attributes": list of regerences to attribute_ids,
"traces": list of references to trace_id
def preconfigure_order(self):
return self._metadata.preconfigure_order
+ @property
+ def prestart_order(self):
+ return self._metadata.prestart_order
+
@property
def start_order(self):
return self._metadata.start_order
status_function = info.get("status_function")
configure_function = info.get("configure_function")
preconfigure_function = info.get("preconfigure_function")
+ prestart_function = info.get("prestart_function")
allow_addresses = info.get("allow_addresses", False)
allow_routes = info.get("allow_routes", False)
has_addresses = info.get("has_addresses", False)
factory = Factory(factory_id, create_function, start_function,
stop_function, status_function,
configure_function, preconfigure_function,
+ prestart_function,
allow_addresses, has_addresses,
allow_routes, has_routes)
self._status = TESTBED_STATUS_SETUP
def do_create(self):
- guids = dict()
- # order guids (elements) according to factory_id
- for guid, factory_id in self._create.iteritems():
- if not factory_id in guids:
- guids[factory_id] = list()
- guids[factory_id].append(guid)
- # create elements following the factory_id order
- for factory_id in self._metadata.create_order:
- # omit the factories that have no element to create
- if factory_id not in guids:
- continue
- factory = self._factories[factory_id]
- for guid in guids[factory_id]:
- factory.create_function(self, guid)
- parameters = self._get_parameters(guid)
- for name, value in parameters.iteritems():
- self.set(guid, name, value)
+ def set_params(self, guid):
+ parameters = self._get_parameters(guid)
+ for name, value in parameters.iteritems():
+ self.set(guid, name, value)
+
+ self._do_in_factory_order(
+ 'create_function',
+ self._metadata.create_order,
+ postaction = set_params )
self._status = TESTBED_STATUS_CREATED
def _do_connect(self, init = True):
self._do_connect(init = False)
self._status = TESTBED_STATUS_CONNECTED
- def do_preconfigure(self):
- guids = dict()
+ def _do_in_factory_order(self, action, order, postaction = None):
+ guids = collections.defaultdict(list)
# order guids (elements) according to factory_id
for guid, factory_id in self._create.iteritems():
- if not factory_id in guids:
- guids[factory_id] = list()
guids[factory_id].append(guid)
# configure elements following the factory_id order
- for factory_id in self._metadata.preconfigure_order:
+ for factory_id in order:
# omit the factories that have no element to create
if factory_id not in guids:
continue
factory = self._factories[factory_id]
- if not factory.preconfigure_function:
+ if not getattr(factory, action):
continue
for guid in guids[factory_id]:
- factory.preconfigure_function(self, guid)
+ getattr(factory, action)(self, guid)
+ if postaction:
+ postaction(self, guid)
+
+ def do_preconfigure(self):
+ self._do_in_factory_order(
+ 'preconfigure_function',
+ self._metadata.preconfigure_order )
def do_configure(self):
- guids = dict()
- # order guids (elements) according to factory_id
- for guid, factory_id in self._create.iteritems():
- if not factory_id in guids:
- guids[factory_id] = list()
- guids[factory_id].append(guid)
- # configure elements following the factory_id order
- for factory_id in self._metadata.configure_order:
- # omit the factories that have no element to create
- if factory_id not in guids:
- continue
- factory = self._factories[factory_id]
- if not factory.configure_function:
- continue
- for guid in guids[factory_id]:
- factory.configure_function(self, guid)
+ self._do_in_factory_order(
+ 'configure_function',
+ self._metadata.configure_order )
self._status = TESTBED_STATUS_CONFIGURED
+ def do_prestart(self):
+ self._do_in_factory_order(
+ 'prestart_function',
+ self._metadata.prestart_order )
+
def _do_cross_connect(self, cross_data, init = True):
for guid, cross_connections in self._cross_connect.iteritems():
factory = self._get_factory(guid)
return factory.box_attributes.attributes_list
def start(self, time = TIME_NOW):
- # Plan everything
- # - group by factory_id
- # - enqueue task callables
- plan = collections.defaultdict(list)
-
- for guid, factory_id in self._create.iteritems():
- factory = self._factories[factory_id]
- start_function = factory.start_function
- if start_function:
- plan[factory_id].append((start_function, guid))
-
- # Execute plan, following the factory_id order
- for factory_id in self._metadata.start_order:
- if factory_id in plan:
- for start_function, guid in plan[factory_id]:
- start_function(self, guid)
-
+ self._do_in_factory_order(
+ 'start_function',
+ self._metadata.start_order )
self._status = TESTBED_STATUS_STARTED
#action: NotImplementedError
def stop(self, time = TIME_NOW):
- for guid, factory_id in self._create.iteritems():
- factory = self._factories[factory_id]
- stop_function = factory.stop_function
- if stop_function:
- stop_function(self, guid)
+ self._do_in_factory_order(
+ 'stop_function',
+ reversed(self._metadata.start_order) )
self._status = TESTBED_STATUS_STOPPED
def status(self, guid = None):
configure_order = [ INTERNET, NODE, NODEIFACE, TAPIFACE, TUNIFACE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
-# Start node after ifaces, because the node needs the ifaces in order to set up routes
+# Start (and prestart) node after ifaces, because the node needs the ifaces in order to set up routes
start_order = [ INTERNET, NODEIFACE, TAPIFACE, TUNIFACE, NODE, NETPIPE, NEPIDEPENDENCY, NS3DEPENDENCY, DEPENDENCY, APPLICATION ]
factories_info = dict({
"category": "topology",
"create_function": create_node,
"preconfigure_function": configure_node,
- "start_function": configure_node_routes,
+ "prestart_function": configure_node_routes,
"box_attributes": [
"forward_X11",
"hostname",
"create_function": create_tuniface,
"preconfigure_function": preconfigure_tuniface,
"configure_function": postconfigure_tuniface,
- "start_function": wait_tuniface,
+ "prestart_function": wait_tuniface,
"box_attributes": [
"up", "device_name", "mtu", "snat", "pointopoint",
"txqueuelen",
"create_function": create_tapiface,
"preconfigure_function": preconfigure_tuniface,
"configure_function": postconfigure_tuniface,
- "start_function": wait_tuniface,
+ "prestart_function": wait_tuniface,
"box_attributes": [
"up", "device_name", "mtu", "snat", "pointopoint",
"txqueuelen",
def configure_order(self):
return configure_order
+ @property
+ def prestart_order(self):
+ return start_order
+
@property
def start_order(self):
return start_order
help =
"Specify a symmetric encryption key with which to protect packets across "
"the tunnel. python-crypto must be installed on the system." )
+parser.add_option(
+ "-N", "--no-capture", dest="no_capture",
+ action = "store_true",
+ default = False,
+ help = "If specified, packets won't be logged to standard error "
+ "(default is to log them to standard error). " )
(options, remaining_args) = parser.parse_args(sys.argv[1:])
ether_mode = tun_name.startswith('tap'),
cipher_key = options.cipher_key,
udp = options.udp,
- TERMINATE = TERMINATE)
+ TERMINATE = TERMINATE,
+ stderr = open("/dev/null","w") if options.no_capture
+ else sys.stderr
+ )
print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.udp)
print >>sys.stderr, "Connecting to: %s:%d" % (remaining_args[0],options.port)
rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
- rsock.bind((hostaddr,options.udp))
+ for i in xrange(30):
+ try:
+ rsock.bind((hostaddr,options.udp))
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "Could not bind. Retrying in a sec..."
+ time.sleep(1)
+ else:
+ rsock.bind((hostaddr,options.udp))
rsock.connect((remaining_args[0],options.port))
else:
print >>sys.stderr, "Error: need a remote endpoint in UDP mode"
else:
print >>sys.stderr, "Listening at: %s:%d" % (hostaddr,options.port)
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- lsock.bind((hostaddr,options.port))
+ for i in xrange(30):
+ try:
+ lsock.bind((hostaddr,options.port))
+ break
+ except socket.error:
+ # wait a while, retry
+ print >>sys.stderr, "Could not bind. Retrying in a sec..."
+ time.sleep(1)
+ else:
+ lsock.bind((hostaddr,options.port))
lsock.listen(1)
rsock,raddr = lsock.accept()
remote = os.fdopen(rsock.fileno(), 'r+b', 0)
args.extend(("-P",str(local_p2p)))
if local_txq:
args.extend(("-Q",str(local_txq)))
+ if not local_cap:
+ args.append("-N")
if extra_args:
args.extend(map(str,extra_args))
if not listen and check_proto != 'fd':
pidfile = './pid',
home = self.home_path,
stdin = '/dev/null',
- stdout = 'capture' if local_cap else '/dev/null',
+ stdout = 'capture',
stderr = rspawn.STDOUT,
sudo = True,
time.sleep(1.0)
# Wait for the connection to be established
- if local.capture:
- for spin in xrange(30):
- if self.status() != rspawn.RUNNING:
- break
-
- (out,err),proc = server.popen_ssh_command(
- "cd %(home)s ; grep -c Connected capture" % dict(
- home = server.shell_escape(self.home_path)),
- host = local.node.hostname,
- port = None,
- user = local.node.slicename,
- agent = None,
- ident_key = local.node.ident_path,
- server_key = local.node.server_key
- )
-
- if proc.wait():
- break
-
- if out.strip() != '0':
- break
-
- time.sleep(1.0)
+ for spin in xrange(30):
+ if self.status() != rspawn.RUNNING:
+ break
+
+ (out,err),proc = server.popen_ssh_command(
+ "cd %(home)s ; grep -c Connected capture" % dict(
+ home = server.shell_escape(self.home_path)),
+ host = local.node.hostname,
+ port = None,
+ user = local.node.slicename,
+ agent = None,
+ ident_key = local.node.ident_path,
+ server_key = local.node.server_key
+ )
+
+ if proc.wait():
+ break
+
+ if out.strip() != '0':
+ break
+
+ time.sleep(1.0)
@property
def if_name(self):
if not self._if_name:
# Inspect the trace to check the assigned iface
local = self.local()
- if local and local.capture:
+ if local:
for spin in xrange(30):
(out,err),proc = server.popen_ssh_command(
"cd %(home)s ; grep 'Using tun:' capture | head -1" % dict(
TESTBED_VERSION = 36
EXPERIMENT_SET = 37
EXPERIMENT_GET = 38
+DO_PRESTART = 39
instruction_text = dict({
OK: "OK",
def do_preconfigure(self):
self._testbed.do_preconfigure()
+ @Marshalling.handles(DO_PRESTART)
+ @Marshalling.args()
+ @Marshalling.retvoid
+ def do_prestart(self):
+ self._testbed.do_prestart()
+
@Marshalling.handles(DO_CROSS_CONNECT_INIT)
@Marshalling.args( Marshalling.Decoders.pickled_data )
@Marshalling.retvoid
import socket
import select
import weakref
+import time
from tunchannel import tun_fwd
if udp:
# listen on udp port
rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
- rsock.bind((local_addr,local_port))
+ for i in xrange(30):
+ try:
+ rsock.bind((local_addr,local_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ time.sleep(1)
+ else:
+ rsock.bind((local_addr,local_port))
rsock.connect((peer_addr,peer_port))
remote = os.fdopen(rsock.fileno(), 'r+b', 0)
elif listen:
# accept tcp connections
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- lsock.bind((local_addr,local_port))
+ for i in xrange(30):
+ try:
+ lsock.bind((local_addr,local_port))
+ break
+ except socket.error:
+ # wait a while, retry
+ time.sleep(1)
+ else:
+ lsock.bind((local_addr,local_port))
lsock.listen(1)
rsock,raddr = lsock.accept()
remote = os.fdopen(rsock.fileno(), 'r+b', 0)
instance.do_connect_compl()
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(7) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_connect_compl()
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(6) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_connect_compl()
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(11) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_connect_compl()
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(17) != STATUS_FINISHED:
time.sleep(0.1)
instance.do_connect_compl()
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(27) != STATUS_FINISHED:
time.sleep(0.1)
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(7) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(5) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(10) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(6) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(8) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(9) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(12) != STATUS_FINISHED:
time.sleep(0.5)
instance.do_preconfigure()
instance.do_configure()
+ instance.do_prestart()
instance.start()
while instance.status(12) != STATUS_FINISHED:
time.sleep(0.5)
"Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
@test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
"Test is expensive, requires NEPI_FULL_TESTS=yes")
- def _test_ns3_in_pl(self):
+ def test_ns3_in_pl(self):
ns3_testbed_id = "ns3"
ns3_testbed_version = "3_9_RC3"
"Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
@test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
"Test is expensive, requires NEPI_FULL_TESTS=yes")
- def _test_ns3_in_pl_crossconnect(self):
+ def test_ns3_in_pl_crossconnect(self):
pl, exp = self.make_experiment_desc()
# Create PL node, ifaces, assign addresses
"Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
@test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
"Test is expensive, requires NEPI_FULL_TESTS=yes")
- def _test_ns3_in_pl_snat(self):
+ def test_ns3_in_pl_snat(self):
pl, exp = self.make_experiment_desc()
# Create PL node, ifaces, assign addresses