From 203b596db5be0a3bde9ca5e1ce7728c83e7b2382 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Sun, 2 Oct 2011 17:17:06 +0200 Subject: [PATCH] continuing overlay testing --- examples/multicast_overlay.py | 449 ++++++++++++++++++++++++++++++++++ examples/wireless_overlay.py | 4 +- src/nepi/core/design.py | 11 + 3 files changed, 462 insertions(+), 2 deletions(-) create mode 100644 examples/multicast_overlay.py diff --git a/examples/multicast_overlay.py b/examples/multicast_overlay.py new file mode 100644 index 00000000..7372f92a --- /dev/null +++ b/examples/multicast_overlay.py @@ -0,0 +1,449 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import getpass +from nepi.core.design import ExperimentDescription, FactoriesProvider +from nepi.core.execute import ExperimentController +from nepi.util import proxy +from nepi.util.constants import DeploymentConfiguration as DC, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP +from nepi.testbeds.planetlab import util as plutil +from optparse import OptionParser +import os +import sys +import shutil +import tempfile +import time +import struct +import socket +import operator +import ipaddr +import gzip +import random + +class PlanetLabMulticastOverlay: + testbed_id = "planetlab" + slicename = "inria_nepi" + plchost = "www.planet-lab.eu" + plkey = os.environ.get( + "PL_SSH_KEY", + "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) ) + pluser = os.environ.get("PL_USER") + plpass = os.environ.get("PL_PASS") + vnet = "192.168.3.0" + + port_base = 2000 + (os.getpid() % 1000) * 13 + + def setUp(self): + self.root_dir = tempfile.mkdtemp() + self.__class__.port_base = self.__class__.port_base + 100 + + def tearDown(self): + try: + shutil.rmtree(self.root_dir) + except: + # retry + time.sleep(0.1) + shutil.rmtree(self.root_dir) + + def make_experiment_desc(self): + testbed_id = self.testbed_id + slicename = self.slicename + plchost = self.plchost + pl_ssh_key = self.plkey + pl_user = self.pluser + pl_pwd = self.plpass + + exp_desc = ExperimentDescription() + pl_provider = FactoriesProvider(testbed_id) + pl_desc = exp_desc.add_testbed_description(pl_provider) + pl_desc.set_attribute_value("homeDirectory", self.root_dir) + pl_desc.set_attribute_value("slice", slicename) + pl_desc.set_attribute_value("sliceSSHKey", pl_ssh_key) + pl_desc.set_attribute_value("authUser", pl_user) + pl_desc.set_attribute_value("authPass", pl_pwd) + pl_desc.set_attribute_value("plcHost", plchost) + pl_desc.set_attribute_value("tapPortBase", self.port_base) + pl_desc.set_attribute_value("p2pDeployment", True) + pl_desc.set_attribute_value("dedicatedSlice", True) + pl_desc.set_attribute_value("plLogLevel", "INFO") + + netns_provider = FactoriesProvider("netns") + netns_desc = exp_desc.add_testbed_description(netns_provider) + netns_desc.set_attribute_value("homeDirectory", self.root_dir) + netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + netns_root_dir = os.path.join(self.root_dir, "netns") + os.mkdir(netns_root_dir) + netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, netns_root_dir) + netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + netns_desc.set_attribute_value(DC.USE_SUDO, True) + + return pl_desc, netns_desc, exp_desc + + + def make_pl_tapnode(self, pl, ip, inet = None, label = None, hostname = None, routes = None, mcast = False, mcastrouter = False): + if not isinstance(ip, list): + ips = [ip] + else: + ips = ip + node1 = pl.create("Node") + if label: + node1.set_attribute_value("label", label) + if hostname: + node1.set_attribute_value("hostname", hostname) + iface1 = pl.create("NodeInterface") + if label: + iface1.set_attribute_value("label", label+"iface") + tap1 = [] + tap1ip = [] + for i,ip in enumerate(ips): + _tap1 = pl.create("TapInterface") + _tap1.set_attribute_value("multicast", True) + _tap1.enable_trace("pcap") # for error output + if label: + _tap1.set_attribute_value("label", label+"tap"+(str(i+1) if i else "")) + + _tap1ip = _tap1.add_address() + _tap1ip.set_attribute_value("Address", ip) + _tap1ip.set_attribute_value("NetPrefix", 32) + _tap1ip.set_attribute_value("Broadcast", False) + + node1.connector("devs").connect(_tap1.connector("node")) + + tap1.append(_tap1) + tap1ip.append(_tap1ip) + + inet = inet or pl.create("Internet") + node1.connector("devs").connect(iface1.connector("node")) + iface1.connector("inet").connect(inet.connector("devs")) + + for destip, destprefix, nexthop in routes: + r1 = node1.add_route() + r1.set_attribute_value("Destination", destip) + r1.set_attribute_value("NetPrefix", destprefix) + r1.set_attribute_value("NextHop", nexthop) + + if mcast: + fwd = pl.create("MulticastForwarder") + fwd.enable_trace("stderr") + fwd.connector("node").connect(node1.connector("apps")) + if mcastrouter: + mrt = pl.create("MulticastRouter") + mrt.connector("fwd").connect(fwd.connector("router")) + mrt.enable_trace("stderr") + + return node1, iface1, tap1, tap1ip, inet + + def add_vlc_base(self, pl, node): + app = pl.create("Application") + app.set_attribute_value("rpmFusion", True) + app.set_attribute_value("depends", "vlc") + app.set_attribute_value("command", "vlc --version") + app.enable_trace("stdout") + app.enable_trace("stderr") + node.connector("apps").connect(app.connector("node")) + return app + + def add_vlc_restreamer(self, pl, node): + hostname = node.get_attribute_value("hostname") + app = self.add_vlc_base(pl, node) + app.set_attribute_value("label","vlc_restreamer_%d" % (node.guid,)) + app.set_attribute_value("command", + "vlc -vvv -I dummy" + " udp://@239.255.12.42" + " --sout '#rtp{port=6060,sdp=rtsp://"+hostname+":8080/test.sdp}'") + return app + + def add_vlc_dumper(self, pl, node): + app = self.add_vlc_base(pl, node) + app.set_attribute_value("label","vlc_dumper_%d" % (node.guid,)) + app.set_attribute_value("command", + "vlc -vvv -I dummy" + " udp://@239.255.12.42" + " --sout output") + app.enable_trace("output") + return app + + def add_vlc_source(self, netns, node, iflabel): + app = netns_desc.create("Application") + app.set_attribute_value("user", os.getlogin()) + app.set_attribute_value("label","vlc_source_%d" % (node.guid,)) + app.set_attribute_value("command", + "vlc -vvv -I dummy " + +os.path.basename(self.movie_source) + +"--miface-addr {#[%s].addr[0].[Address]#} " % (iflabel,) + +"--sout '#udp{dst=239.255.12.42,ttl=64}'") + app.connector("node").connect(node.connector("apps")) + return app + + def add_net_monitor(self, pl, node): + app = pl.create("Application") + app.set_attribute_value("label","network_monitor_%d" % (node.guid,)) + app.set_attribute_value("command", + r"""head -n 2 /proc/net/dev ; while true ; do cat /proc/net/dev | sed -r 's/.*/'"$(date -R)"': \0/' | grep eth0 ; sleep 1 ; done""") + app.enable_trace("stdout") + node.connector("apps").connect(app.connector("node")) + return app + + def make_ns_in_pl(self, pl, exp, node1, iface1, root): + ns3_testbed_id = "ns3" + + # Add NS3 support in node1 + plnepi = pl.create("NepiDependency") + plns3 = pl.create("NS3Dependency") + plnepi.connector("node").connect(node1.connector("deps")) + plns3.connector("node").connect(node1.connector("deps")) + + # Create NS3 testbed running in node1 + ns3_provider = FactoriesProvider(ns3_testbed_id) + ns3_desc = exp.add_testbed_description(ns3_provider) + ns3_desc.set_attribute_value("rootDirectory", root) + ns3_desc.set_attribute_value("SimulatorImplementationType", "ns3::RealtimeSimulatorImpl") + ns3_desc.set_attribute_value("ChecksumEnabled", True) + ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[%s].addr[0].[Address]#}" % ( + iface1.get_attribute_value("label"),)) + ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER, + pl.get_attribute_value("slice")) + ns3_desc.set_attribute_value(DC.DEPLOYMENT_KEY, + pl.get_attribute_value("sliceSSHKey")) + ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) + ns3_desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) + ns3_desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, + "{#[%s].[%s]#}" % ( + node1.get_attribute_value("label"), + ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,)) + ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) + + return ns3_desc + + def make_netns_node(self, netns_desc): + node = netns_desc.create("Node") + node.set_attribute_value("forward_X11", True) + return node + + def make_pl_netns_connection(self, pl_desc, pl_node, netns_desc, + netns_node, netns_iface_label, vnet): + base = struct.unpack('!L',socket.inet_aton(vnet))[0] + netns_addr = socket.inet_ntoa(struct.pack('!L',(base | 1))) + pl_addr = socket.inet_ntoa(struct.pack('!L',(base | 2))) + + pl_tap = pl_desc.create("TunInterface") + pl_tap.set_attribute_value("multicast", True) + #pl_tap.set_attribute_value("tun_cipher", "PLAIN") + #pl_tap.enable_trace("pcap") + #pl_tap.enable_trace("packets") + addr = pl_tap.add_address() + adrr.set_attribute_value("Address", pl_addr) + addr.set_attribute_value("NetPrefix", 32) + addr.set_attribute_value("Broadcast", False) + pl_node.connector("devs").connect(pl_tap.connector("node")) + + netns_tap = netns_desc.create("TunNodeInterface") + netns_tap.set_attribute_value("label", netns_iface_label) + netns_tap.set_attribute_value("up", True) + netns_tap.set_attribute_value("mtu", 1448) + addr = netns_tap.add_address() + adrr.set_attribute_value("Address", netns_addr) + addr.set_attribute_value("NetPrefix", 32) + addr.set_attribute_value("Broadcast", False) + route = netns_node.add_route() + route.set_attribute_value("Destination", vnet) + r1.set_attribute_value("NetPrefix", 24) + r1.set_attribute_value("NextHop", pl_addr) + netns_node.connector("devs").connect(netns_tap.connector("node")) + + netns_tunchannel = netns_desc.create("TunChannel") + #netns_tunchannel.set_attribute_value("tun_cipher", "PLAIN") + netns_tunchannel.connector("->fd").connect(netns_tap.connector("fd->")) + pl_tap.connector("tcp").connect(netns_tunchannel.connector("tcp")) + + def make_pl_overlay(self, numnodes, num_wifi): + ns3_testbed_id = "ns3" + + pl, netns, exp = self.make_experiment_desc() + # We'll make a distribution spanning tree using prefix matching as a distance + api = plutil.getAPI(self.pluser, self.plpass) + nodes = plutil.getNodes(api, numnodes, operatingSystem = 'f12') + root = min(nodes, key=operator.attrgetter('hostname')) + links = list(plutil.getSpanningTree(nodes, root=root)) + + for node in nodes: + node.vif_ips = set() + node.children = [] + node.childips = set() + + # Build an explicit tree + for slave, master in links: + master.children.append(slave) + + # We have to assign IPs and routes. + # The IP will be assigned sequentially, depth-first. + # This will result in rather compact routing rules + nextip = [128-numnodes] + def traverse(traverse, node, parent=None, base=struct.unpack('!L',socket.inet_aton(self.vnet))[0]): + if nextip[0] >= 254: + raise RuntimeError, "Too many IPs to assign!" + + node.vif_addr = base | (nextip[0]) + nips = 1+len(node.children) # one vif per child, plus one for the parent + nextip[0] += nips + + for i in xrange(nips): + node.vif_ips.add(node.vif_addr+i) + + if parent: + parent.childips.update(node.vif_ips) + + for i,child in enumerate(node.children): + traverse(traverse, child, node, base) + + if parent: + parent.childips.update(node.childips) + + traverse(traverse, root) + + def printtree(printtree, node, indent=''): + print indent, '-', socket.inet_ntoa(struct.pack('!L',node.vif_addr)), node.country, node.city, node.site + for child in node.children: + childips = map(ipaddr.IPAddress, child.childips) + childnets = ipaddr.collapse_address_list(childips) + cip = ipaddr.IPAddress(child.vif_addr) + for cnet in childnets: + print indent, '|- R', cnet, '->', cip + printtree(printtree, child, indent+' | ') + printtree(printtree, root) + + inet = pl.create("Internet") + + def maketree(maketree, node, parent=None, parentIp=None): + routes = [] + ctaps = [] + for i,child in enumerate(node.children): + childips = map(ipaddr.IPAddress, child.childips) + childnets = ipaddr.collapse_address_list(childips) + cip = ipaddr.IPAddress(child.vif_addr) + pip = ipaddr.IPAddress(node.vif_addr+1+i) + for cnet in childnets: + routes.append((cnet.ip.exploded, cnet.prefixlen, cip.exploded)) + ctaps.append( maketree(maketree, child, node, pip) ) + if parentIp: + routes.append((self.vnet,24,parentIp)) + + if not parent: + label = "root" + else: + label = None + ips = [ ipaddr.IPAddress(node.vif_addr+i) for i in xrange(1+len(node.children)) ] + node1, iface1, tap1, tap1ip, _ = self.make_pl_tapnode(pl, ips, inet, + hostname = node.hostname, + routes = routes, + mcastrouter = bool(node.children), + mcast = True, + label = label ) + + for tap, ctap in zip(tap1[1:], ctaps): + tap.connector("udp").connect(ctap.connector("udp")) + + self.add_net_monitor(pl, node1) + self.add_vlc_restreamer(pl, node1) + if random.random() < 0.1 and parent: + self.add_vlc_dumper(pl, node1) + + return tap1[0] + maketree(maketree, root) + + # create a netns node and connect it to the root pl node + pl_root = exp_desc.get_element_by_label("root") + netns_source = self.make_netns_node(netns) + iflabel = "source-iface" + self.make_pl_netns_connection(pl_desc, pl_root, netns_desc, + netns_source, iflabel, self.vnet) + self.add_vlc_source(netns, netns_n, iflabel) + + xml = exp.to_xml() + test_dir = "./results" + + try: + controller = ExperimentController(xml, self.root_dir) + controller.start() + + print >>sys.stderr, "Press CTRL-C to shut down" + try: + while True: + time.sleep(10) + except KeyboardInterrupt: + pass + + # download results + for testbed_guid, guids in controller.traces_info().iteritems(): + for guid, traces in guids.iteritems(): + for name, data in traces.iteritems(): + path = data["filepath"] + + if not path: + continue + + print >>sys.stderr, "Downloading trace", path + + filepath = os.path.join(test_dir, path) + + try: + trace = controller.trace(guid, name) + except: + traceback.print_exc(file=sys.stderr) + continue + try: + if not os.path.exists(os.path.dirname(filepath)): + os.makedirs(os.path.dirname(filepath)) + except: + traceback.print_exc(file=sys.stderr) + + try: + if len(trace) >= 2**20: + # Bigger than 1M, compress + tracefile = gzip.GzipFile(filepath+".gz", "wb") + else: + tracefile = open(filepath,"wb") + try: + tracefile.write(trace) + finally: + tracefile.close() + except: + traceback.print_exc(file=sys.stderr) + finally: + try: + controller.stop() + except: + import traceback + traceback.print_exc() + try: + controller.shutdown() + except: + import traceback + traceback.print_exc() + + +if __name__ == '__main__': + usage = "usage: %prog -n number_sta -m movie -u user" + parser = OptionParser(usage=usage) + parser.add_option("-u", "--user", dest="user", help="Valid linux system user (not root).", type="str") + parser.add_option("-m", "--movie", dest="movie", help="Path to movie file to play", type="str") + parser.add_option("-n", "--nsta", dest="nsta", help="Number of wifi stations", type="int") + parser.add_option("-N", "--nodes", dest="nsta", help="Number of overlay nodes", type="int") + parser.add_option("-a", "--base_addr", dest="base_addr", help="Base address segment for the experiment", type="str") + parser.add_option("-s", "--slicename", dest="slicename", help="PlanetLab slice", type="str") + (options, args) = parser.parse_args() + if not options.movie: + parser.error("Missing 'movie' option.") + if options.user == 'root': + parser.error("Missing or invalid 'user' option.") + if options.nsta and options.nsta > 8: + parser.error("Try a number of stations under 9.") + + exp = PlanetLabMulticastOverlay() + exp.movie_source = options.movie + try: + exp.setUp() + exp.make_pl_overlay(50, 8) + finally: + exp.tearDown() + diff --git a/examples/wireless_overlay.py b/examples/wireless_overlay.py index 1fcb6a67..d735f560 100644 --- a/examples/wireless_overlay.py +++ b/examples/wireless_overlay.py @@ -268,7 +268,7 @@ class WirelessOverlay(object): def add_pl_netns_connection(self, pl_desc, pl_node, pl_addr, netns_desc, netns_node, netns_addr): pl_tap = pl_desc.create("TunInterface") - pl_tap.set_attribute_value("tun_cipher", "PLAIN") + #pl_tap.set_attribute_value("tun_cipher", "PLAIN") pl_tap.set_attribute_value("multicast", True) #pl_tap.enable_trace("pcap") #pl_tap.enable_trace("packets") @@ -280,7 +280,7 @@ class WirelessOverlay(object): self.add_ip_address(netns_tap, netns_addr, 30) netns_node.connector("devs").connect(netns_tap.connector("node")) netns_tunchannel = netns_desc.create("TunChannel") - netns_tunchannel.set_attribute_value("tun_cipher", "PLAIN") + #netns_tunchannel.set_attribute_value("tun_cipher", "PLAIN") netns_tunchannel.connector("->fd").connect(netns_tap.connector("fd->")) pl_tap.connector("tcp").connect(netns_tunchannel.connector("tcp")) diff --git a/src/nepi/core/design.py b/src/nepi/core/design.py index 2aad756e..f40b54b8 100644 --- a/src/nepi/core/design.py +++ b/src/nepi/core/design.py @@ -363,6 +363,17 @@ class ExperimentDescription(object): if box: return box return None + def get_element_by_label(self, label): + for tbd_desc in self._testbed_descriptions.values(): + l = tbd_desc.get_attribute_value("label") + if label == l: + return tbd_desc + for box in tbd_desc.boxes: + l = box.get_attribute_value("label") + if label == l: + return tbd_desc + return None + def add_testbed_description(self, provider, guid = None): testbed_description = TestbedDescription(self._guid_generator, provider, guid) -- 2.45.2