Cross-connection fixes all over the place too.
Still couldn't get cross connections working, but that's specific to FileDescriptorNetDevice
import ConfigParser
import os
import collections
+import functools
-ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\]|).\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
+ATTRIBUTE_PATTERN_BASE = re.compile(r"\{#\[(?P<label>[-a-zA-Z0-9._]*)\](?P<expr>(?P<component>\.addr\[[0-9]+\]|\.route\[[0-9]+\]|\.trace\[[0-9]+\])?.\[(?P<attribute>[-a-zA-Z0-9._]*)\])#}")
ATTRIBUTE_PATTERN_GUID_SUB = r"{#[%(guid)s]%(expr)s#}"
COMPONENT_PATTERN = re.compile(r"(?P<kind>[a-z]*)\[(?P<index>.*)\]")
@staticmethod
def _parallel(callables):
- threads = [ threading.Thread(target=callable) for callable in callables ]
+ excs = []
+ def wrap(callable):
+ @functools.wraps(callable)
+ def wrapped(*p, **kw):
+ try:
+ callable(*p, **kw)
+ except Exception,e:
+ import traceback
+ traceback.print_exc(file=sys.stderr)
+ excs.append(e)
+ return wrapped
+ threads = [ threading.Thread(target=wrap(callable)) for callable in callables ]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
+ for exc in excs:
+ raise exc
def start(self):
parser = XmlExperimentParser()
# final netref step, fail if anything's left unresolved
self.do_netrefs(data, fail_if_undefined=True)
+ self._program_testbed_cross_connections(data)
+
# perform do_configure in parallel for al testbeds
# (it's internal configuration for each)
self._parallel([testbed.do_configure
ref_guid = int(label[5:])
if ref_guid:
expr = match.group("expr")
- component = match.group("component")[1:] # skip the dot
+ component = (match.group("component") or "")[1:] # skip the dot
attribute = match.group("attribute")
# split compound components into component kind and index
# eg: 'addr[0]' -> ('addr', '0')
component, component_index = self._netref_component_split(component)
-
+
# find object and resolve expression
for ref_testbed in self._testbeds.itervalues():
if component not in self._NETREF_COMPONENT_GETTERS:
def _program_testbed_controllers(self, element_guids, data):
for guid in element_guids:
(testbed_guid, factory_id) = data.get_box_data(guid)
- testbed = self._testbeds[testbed_guid]
- testbed.defer_create(guid, factory_id)
- for (name, value) in data.get_attribute_data(guid):
- testbed.defer_create_set(guid, name, value)
+ testbed = self._testbeds.get(testbed_guid)
+ if testbed:
+ testbed.defer_create(guid, factory_id)
+ for (name, value) in data.get_attribute_data(guid):
+ testbed.defer_create_set(guid, name, value)
for guid in element_guids:
(testbed_guid, factory_id) = data.get_box_data(guid)
- testbed = self._testbeds[testbed_guid]
- for (connector_type_name, cross_guid, cross_connector_type_name) \
- in data.get_connection_data(guid):
- (testbed_guid, factory_id) = data.get_box_data(guid)
- (cross_testbed_guid, cross_factory_id) = data.get_box_data(
- cross_guid)
- if testbed_guid == cross_testbed_guid:
- testbed.defer_connect(guid, connector_type_name,
- cross_guid, cross_connector_type_name)
- else:
- cross_testbed = self._testbeds[cross_testbed_guid]
- cross_testbed_id = cross_testbed.testbed_id
- testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
- cross_testbed_guid, cross_testbed_id, cross_factory_id,
- cross_connector_type_name)
- # save cross data for later
- self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
+ testbed = self._testbeds.get(testbed_guid)
+ if testbed:
+ for (connector_type_name, cross_guid, cross_connector_type_name) \
+ in data.get_connection_data(guid):
+ (testbed_guid, factory_id) = data.get_box_data(guid)
+ (cross_testbed_guid, cross_factory_id) = data.get_box_data(
cross_guid)
- for trace_id in data.get_trace_data(guid):
- testbed.defer_add_trace(guid, trace_id)
- for (autoconf, address, netprefix, broadcast) in \
- data.get_address_data(guid):
- if address != None:
- testbed.defer_add_address(guid, address, netprefix,
- broadcast)
- for (destination, netprefix, nexthop) in data.get_route_data(guid):
- testbed.defer_add_route(guid, destination, netprefix, nexthop)
+ if testbed_guid == cross_testbed_guid:
+ testbed.defer_connect(guid, connector_type_name,
+ cross_guid, cross_connector_type_name)
+ for trace_id in data.get_trace_data(guid):
+ testbed.defer_add_trace(guid, trace_id)
+ for (autoconf, address, netprefix, broadcast) in \
+ data.get_address_data(guid):
+ if address != None:
+ testbed.defer_add_address(guid, address, netprefix,
+ broadcast)
+ for (destination, netprefix, nexthop) in data.get_route_data(guid):
+ testbed.defer_add_route(guid, destination, netprefix, nexthop)
+
+ def _program_testbed_cross_connections(self, data):
+ data_guids = data.guids
+
+ for guid in data_guids:
+ if not data.is_testbed_data(guid):
+ (testbed_guid, factory_id) = data.get_box_data(guid)
+ testbed = self._testbeds.get(testbed_guid)
+ if testbed:
+ for (connector_type_name, cross_guid, cross_connector_type_name) \
+ in data.get_connection_data(guid):
+ (testbed_guid, factory_id) = data.get_box_data(guid)
+ (cross_testbed_guid, cross_factory_id) = data.get_box_data(
+ cross_guid)
+ if testbed_guid != cross_testbed_guid:
+ cross_testbed = self._testbeds[cross_testbed_guid]
+ cross_testbed_id = cross_testbed.testbed_id
+ testbed.defer_cross_connect(guid, connector_type_name, cross_guid,
+ cross_testbed_guid, cross_testbed_id, cross_factory_id,
+ cross_connector_type_name)
+ # save cross data for later
+ self._add_crossdata(testbed_guid, guid, cross_testbed_guid,
+ cross_guid)
def _add_crossdata(self, testbed_guid, guid, cross_testbed_guid, cross_guid):
if testbed_guid not in self._cross_data:
from nepi.core import metadata
from nepi.core.attributes import Attribute
from nepi.util import validation
+import os.path
### Connection functions ####
testbed_instance.set(fdnd_guid, "LinuxSocketAddress", address)
# Set tun standard contract attributes
- testbed_instance.set(fdnd_guid, "tun_addr", address)
+ testbed_instance.set(fdnd_guid, "tun_addr",
+ os.path.join( testbed_instance.root_directory, address ) )
testbed_instance.set(fdnd_guid, "tun_proto", "fd")
testbed_instance.set(fdnd_guid, "tun_port", 0)
- testbed_instance.set(fdnd_guid, "tun_key", "\xff"*32) # unimportant, fds aren't encrypted
+ testbed_instance.set(fdnd_guid, "tun_key", "\xfa"*32) # unimportant, fds aren't encrypted
### Connector information ###
"mv -f ${BUILD}/target/lib/python*/site-packages/pybindgen ${BUILD}/target/. && "
"rm -rf ${BUILD}/target/lib && "
"cd ../ns3-src && "
- "./waf configure --prefix=${BUILD}/target -d release --disable-examples && "
+ "./waf configure --prefix=${BUILD}/target -d release --disable-examples --high-precision-as-double && "
"./waf &&"
"./waf install && "
"./waf clean"
# These get initialized when the iface is connected to the internet
self.has_internet = False
-
- # Generate an initial random cryptographic key to use for tunnelling
- # Upon connection, both endpoints will agree on a common one based on
- # this one.
- self.tun_key = ( ''.join(map(chr, [
- r.getrandbits(8)
- for i in xrange(32)
- for r in (random.SystemRandom(),) ])
- ).encode("base64").strip() )
def __str__(self):
return "%s<ip:%s/%s up mac:%s>" % (
# They're part of the TUN standard attribute set
self.tun_port = None
self.tun_addr = None
- self.tun_key = None
# These get initialized when the iface is connected to its peer
self.peer_iface = None
# same as peer proto, but for execute-time standard attribute lookups
self.tun_proto = None
+
+
+ # Generate an initial random cryptographic key to use for tunnelling
+ # Upon connection, both endpoints will agree on a common one based on
+ # this one.
+ self.tun_key = ( ''.join(map(chr, [
+ r.getrandbits(8)
+ for i in xrange(32)
+ for r in (random.SystemRandom(),) ])
+ ).encode("base64").strip() )
+
def __str__(self):
return "%s<ip:%s/%s %s%s>" % (
def prepare(self, home_path, listening):
if not self.peer_iface and (self.peer_proto and (listening or (self.peer_addr and self.peer_port))):
# Ad-hoc peer_iface
- self.peer_iface = CrossIface(
+ self.peer_iface = _CrossIface(
self.peer_proto,
self.peer_addr,
self.peer_port)
element = testbed_instance._elements[guid]
# Set custom addresses if any
- if guid in testbed_instance._add_address:
+ if guid in testbed_instance._add_address and not (element.address or element.netmask or element.netprefix):
addresses = testbed_instance._add_address[guid]
for address in addresses:
(address, netprefix, broadcast) = address
"flags": Attribute.DesignOnly,
"validation_function": validation.is_string
}),
- ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP: dict({
- "name": ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,
- "help": "Commands to set up the environment needed to run NEPI testbeds",
- "type": Attribute.STRING,
- "flags": Attribute.Invisible | Attribute.ReadOnly,
- "validation_function": validation.is_string
- }),
"netpipe_mode": dict({
"name": "mode",
"help": "Requirement for package or application to be installed on some node",
"category": "applications",
"create_function": create_dependency,
- "configure_function": configure_dependency,
+ "preconfigure_function": configure_dependency,
"box_attributes": ["depends", "build-depends", "build", "install",
"sources" ],
"connector_types": ["node"],
"help": "Requirement for NEPI inside NEPI - required to run testbed instances inside a node",
"category": "applications",
"create_function": create_nepi_dependency,
- "configure_function": configure_dependency,
+ "preconfigure_function": configure_dependency,
"box_attributes": [ ],
"connector_types": ["node"],
"traces": ["buildlog"]
"help": "Requirement for NS3 inside NEPI - required to run NS3 testbed instances inside a node. It also needs NepiDependency.",
"category": "applications",
"create_function": create_ns3_dependency,
- "configure_function": configure_dependency,
+ "preconfigure_function": configure_dependency,
"box_attributes": [ ],
"connector_types": ["node"],
"traces": ["buildlog"]
@property
def _nepi_testbed_environment_setup(self):
command = cStringIO.StringIO()
- command.write('PYTHONPATH=$PYTHONPATH:%s' % (
+ command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
))
- command.write(' PATH=$PATH:%s' % (
+ command.write(' ; export PATH=$PATH:%s' % (
':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
))
- if self.node.env:
- for envkey, envvals in self.node.env.iteritems():
+ if self.env:
+ for envkey, envvals in self.env.iteritems():
for envval in envvals:
- command.write(' %s=%s' % (envkey, envval))
- command.write(self.command)
+ command.write(' ; export %s=%s' % (envkey, envval))
return command.getvalue()
def build_filters(self, target_filters, filter_map):
ns3_provider = FactoriesProvider(ns3_testbed_id, ns3_testbed_version)
ns3_desc = exp.add_testbed_description(ns3_provider)
- ns3_desc.set_attribute_value("homeDirectory", "tb-ns3")
+ ns3_desc.set_attribute_value("rootDirectory", "tb-ns3")
ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[node1iface].addr[0].[Address]#}")
ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER,
pl.get_attribute_value("slice"))
controller.stop()
controller.shutdown()
+ @test_util.skipUnless(test_util.pl_auth() is not None,
+ "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+ @test_util.skipUnless(os.environ.get('NEPI_FULL_TESTS','').lower() in ('1','yes','true','on'),
+ "Test is expensive, requires NEPI_FULL_TESTS=yes")
+ def test_ns3_in_pl_crossconnect(self):
+ ns3_testbed_id = "ns3"
+ ns3_testbed_version = "3_9_RC3"
+
+ pl, exp = self.make_experiment_desc()
+
+ # Create PL node, ifaces, assign addresses
+ node1 = pl.create("Node")
+ node1.set_attribute_value("hostname", "onelab11.pl.sophia.inria.fr")
+ node1.set_attribute_value("label", "node1")
+ node1.set_attribute_value("emulation", True) # require emulation
+ iface1 = pl.create("NodeInterface")
+ iface1.set_attribute_value("label", "node1iface")
+ tap1 = pl.create("TapInterface")
+ tap1.enable_trace("packets") # for error output
+ tap1.set_attribute_value("label", "node1tap")
+ inet = pl.create("Internet")
+ node1.connector("devs").connect(iface1.connector("node"))
+ node1.connector("devs").connect(tap1.connector("node"))
+ iface1.connector("inet").connect(inet.connector("devs"))
+
+ tap1ip = tap1.add_address()
+ tap1ip.set_attribute_value("Address", "192.168.2.2")
+ tap1ip.set_attribute_value("NetPrefix", 24)
+ tap1ip.set_attribute_value("Broadcast", False)
+
+ # Add NS3 support in node1
+ plnepi = pl.create("NepiDependency")
+ plns3 = pl.create("NS3Dependency")
+ plnepi.connector("node").connect(node1.connector("deps"))
+ plns3.connector("node").connect(node1.connector("deps"))
+
+ # Create NS3 testbed running in node1
+ ns3_provider = FactoriesProvider(ns3_testbed_id, ns3_testbed_version)
+ ns3_desc = exp.add_testbed_description(ns3_provider)
+ ns3_desc.set_attribute_value("rootDirectory", "tb-ns3")
+ ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[node1iface].addr[0].[Address]#}")
+ ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER,
+ pl.get_attribute_value("slice"))
+ ns3_desc.set_attribute_value(DC.DEPLOYMENT_KEY,
+ pl.get_attribute_value("sliceSSHKey"))
+ ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON)
+ ns3_desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH)
+ ns3_desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP,
+ "{#[node1].[%s]#}" % (ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,))
+
+
+ # Create NS3 node that is responsive to pings, connected
+ # to node1 through the Tap interface
+ ns1 = ns3_desc.create("ns3::Node")
+ ipv41 = ns3_desc.create("ns3::Ipv4L3Protocol")
+ arp1 = ns3_desc.create("ns3::ArpL3Protocol")
+ icmp1 = ns3_desc.create("ns3::Icmpv4L4Protocol")
+ ns1.connector("protos").connect(ipv41.connector("node"))
+ ns1.connector("protos").connect(arp1.connector("node"))
+ ns1.connector("protos").connect(icmp1.connector("node"))
+ ns1if = ns3_desc.create("ns3::FileDescriptorNetDevice")
+ ns1if.set_attribute_value("label", "ns1if")
+ ns1.connector("devs").connect(ns1if.connector("node"))
+ tap1.connector("fd->").connect(ns1if.connector("->fd"))
+ ip1 = ns1if.add_address()
+ ip1.set_attribute_value("Address", "192.168.2.3")
+ ip1.set_attribute_value("NetPrefix", 24)
+ ip1.set_attribute_value("Broadcast", False)
+
+ # Create PlanetLab ping application, pinging the NS3 node
+ ping = pl.create("Application")
+ ping.set_attribute_value("command", "ping -qc1 {#[GUID-8].addr[0].[Address]#}")
+ ping.enable_trace("stdout")
+ ping.enable_trace("stderr")
+ ping.connector("node").connect(node1.connector("apps"))
+
+ comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+
+--- .* ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
+"""
+
+ xml = exp.to_xml()
+
+ controller = ExperimentController(xml, self.root_dir)
+ controller.start()
+
+ while not controller.is_finished(ping.guid):
+ time.sleep(0.5)
+
+ ping_result = controller.trace(pl.guid, ping.guid, "stdout")
+ tap_trace = controller.trace(pl.guid, tap1.guid, "packets")
+
+ controller.stop()
+ controller.shutdown()
+
+ # asserts at the end, to make sure there's proper cleanup
+ self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+ "Unexpected trace:\n%s\nTap trace:\n%s\n" % (
+ ping_result,
+ tap_trace) )
+
if __name__ == '__main__':
unittest.main()