#!/usr/bin/env python # -*- coding: utf-8 -*- import getpass from nepi.core.design import ExperimentDescription, FactoriesProvider from nepi.core.execute import ExperimentController from nepi.util import proxy from nepi.util.constants import DeploymentConfiguration as DC, ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP from nepi.testbeds.planetlab import util as plutil from optparse import OptionParser import os import sys import shutil import tempfile import time import struct import socket import operator import ipaddr import gzip import random class PlanetLabMulticastOverlay: testbed_id = "planetlab" slicename = "inria_nepi" plchost = "www.planet-lab.eu" plkey = os.environ.get( "PL_SSH_KEY", "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME'],) ) pluser = os.environ.get("PL_USER") plpass = os.environ.get("PL_PASS") vnet = "192.168.3.0" port_base = 2000 + (os.getpid() % 1000) * 13 def setUp(self): self.root_dir = tempfile.mkdtemp() self.__class__.port_base = self.__class__.port_base + 100 def tearDown(self): try: shutil.rmtree(self.root_dir) except: # retry time.sleep(0.1) shutil.rmtree(self.root_dir) def make_experiment_desc(self): testbed_id = self.testbed_id slicename = self.slicename plchost = self.plchost pl_ssh_key = self.plkey pl_user = self.pluser pl_pwd = self.plpass exp_desc = ExperimentDescription() pl_provider = FactoriesProvider(testbed_id) pl_desc = exp_desc.add_testbed_description(pl_provider) pl_desc.set_attribute_value("homeDirectory", self.root_dir) pl_desc.set_attribute_value("slice", slicename) pl_desc.set_attribute_value("sliceSSHKey", pl_ssh_key) pl_desc.set_attribute_value("authUser", pl_user) pl_desc.set_attribute_value("authPass", pl_pwd) pl_desc.set_attribute_value("plcHost", plchost) pl_desc.set_attribute_value("tapPortBase", self.port_base) pl_desc.set_attribute_value("p2pDeployment", True) pl_desc.set_attribute_value("dedicatedSlice", True) pl_desc.set_attribute_value("plLogLevel", "INFO") netns_provider = FactoriesProvider("netns") netns_desc = exp_desc.add_testbed_description(netns_provider) netns_desc.set_attribute_value("homeDirectory", self.root_dir) netns_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) netns_root_dir = os.path.join(self.root_dir, "netns") os.mkdir(netns_root_dir) netns_desc.set_attribute_value(DC.ROOT_DIRECTORY, netns_root_dir) netns_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) netns_desc.set_attribute_value(DC.USE_SUDO, True) return pl_desc, netns_desc, exp_desc def make_pl_tapnode(self, pl, ip, inet = None, label = None, hostname = None, routes = None, mcast = False, mcastrouter = False): if not isinstance(ip, list): ips = [ip] else: ips = ip node1 = pl.create("Node") if label: node1.set_attribute_value("label", label) if hostname: node1.set_attribute_value("hostname", hostname) iface1 = pl.create("NodeInterface") if label: iface1.set_attribute_value("label", label+"iface") tap1 = [] tap1ip = [] for i,ip in enumerate(ips): _tap1 = pl.create("TapInterface") _tap1.set_attribute_value("multicast", True) _tap1.enable_trace("pcap") # for error output if label: _tap1.set_attribute_value("label", label+"tap"+(str(i+1) if i else "")) _tap1ip = _tap1.add_address() _tap1ip.set_attribute_value("Address", ip) _tap1ip.set_attribute_value("NetPrefix", 32) _tap1ip.set_attribute_value("Broadcast", False) node1.connector("devs").connect(_tap1.connector("node")) tap1.append(_tap1) tap1ip.append(_tap1ip) inet = inet or pl.create("Internet") node1.connector("devs").connect(iface1.connector("node")) iface1.connector("inet").connect(inet.connector("devs")) for destip, destprefix, nexthop in routes: r1 = node1.add_route() r1.set_attribute_value("Destination", destip) r1.set_attribute_value("NetPrefix", destprefix) r1.set_attribute_value("NextHop", nexthop) if mcast: fwd = pl.create("MulticastForwarder") fwd.enable_trace("stderr") fwd.connector("node").connect(node1.connector("apps")) if mcastrouter: mrt = pl.create("MulticastRouter") mrt.connector("fwd").connect(fwd.connector("router")) mrt.enable_trace("stderr") return node1, iface1, tap1, tap1ip, inet def add_vlc_base(self, pl, node): app = pl.create("Application") app.set_attribute_value("rpmFusion", True) app.set_attribute_value("depends", "vlc") app.set_attribute_value("command", "vlc --version") app.enable_trace("stdout") app.enable_trace("stderr") node.connector("apps").connect(app.connector("node")) return app def add_vlc_restreamer(self, pl, node): hostname = node.get_attribute_value("hostname") app = self.add_vlc_base(pl, node) app.set_attribute_value("label","vlc_restreamer_%d" % (node.guid,)) app.set_attribute_value("command", "vlc -vvv -I dummy" " udp://@239.255.12.42" " --sout '#rtp{port=6060,sdp=rtsp://"+hostname+":8080/test.sdp}'") return app def add_vlc_dumper(self, pl, node): app = self.add_vlc_base(pl, node) app.set_attribute_value("label","vlc_dumper_%d" % (node.guid,)) app.set_attribute_value("command", "vlc -vvv -I dummy" " udp://@239.255.12.42" " --sout output") app.enable_trace("output") return app def add_vlc_source(self, netns, node, iflabel): app = netns_desc.create("Application") app.set_attribute_value("user", os.getlogin()) app.set_attribute_value("label","vlc_source_%d" % (node.guid,)) app.set_attribute_value("command", "vlc -vvv -I dummy " +os.path.basename(self.movie_source) +"--miface-addr {#[%s].addr[0].[Address]#} " % (iflabel,) +"--sout '#udp{dst=239.255.12.42,ttl=64}'") app.connector("node").connect(node.connector("apps")) return app def add_net_monitor(self, pl, node): app = pl.create("Application") app.set_attribute_value("label","network_monitor_%d" % (node.guid,)) app.set_attribute_value("command", r"""head -n 2 /proc/net/dev ; while true ; do cat /proc/net/dev | sed -r 's/.*/'"$(date -R)"': \0/' | grep eth0 ; sleep 1 ; done""") app.enable_trace("stdout") node.connector("apps").connect(app.connector("node")) return app def make_ns_in_pl(self, pl, exp, node1, iface1, root): ns3_testbed_id = "ns3" # Add NS3 support in node1 plnepi = pl.create("NepiDependency") plns3 = pl.create("NS3Dependency") plnepi.connector("node").connect(node1.connector("deps")) plns3.connector("node").connect(node1.connector("deps")) # Create NS3 testbed running in node1 ns3_provider = FactoriesProvider(ns3_testbed_id) ns3_desc = exp.add_testbed_description(ns3_provider) ns3_desc.set_attribute_value("rootDirectory", root) ns3_desc.set_attribute_value("SimulatorImplementationType", "ns3::RealtimeSimulatorImpl") ns3_desc.set_attribute_value("ChecksumEnabled", True) ns3_desc.set_attribute_value(DC.DEPLOYMENT_HOST, "{#[%s].addr[0].[Address]#}" % ( iface1.get_attribute_value("label"),)) ns3_desc.set_attribute_value(DC.DEPLOYMENT_USER, pl.get_attribute_value("slice")) ns3_desc.set_attribute_value(DC.DEPLOYMENT_KEY, pl.get_attribute_value("sliceSSHKey")) ns3_desc.set_attribute_value(DC.DEPLOYMENT_MODE, DC.MODE_DAEMON) ns3_desc.set_attribute_value(DC.DEPLOYMENT_COMMUNICATION, DC.ACCESS_SSH) ns3_desc.set_attribute_value(DC.DEPLOYMENT_ENVIRONMENT_SETUP, "{#[%s].[%s]#}" % ( node1.get_attribute_value("label"), ATTR_NEPI_TESTBED_ENVIRONMENT_SETUP,)) ns3_desc.set_attribute_value(DC.LOG_LEVEL, DC.DEBUG_LEVEL) return ns3_desc def make_netns_node(self, netns_desc): node = netns_desc.create("Node") node.set_attribute_value("forward_X11", True) return node def make_pl_netns_connection(self, pl_desc, pl_node, netns_desc, netns_node, netns_iface_label, vnet): base = struct.unpack('!L',socket.inet_aton(vnet))[0] netns_addr = socket.inet_ntoa(struct.pack('!L',(base | 1))) pl_addr = socket.inet_ntoa(struct.pack('!L',(base | 2))) pl_tap = pl_desc.create("TunInterface") pl_tap.set_attribute_value("multicast", True) #pl_tap.set_attribute_value("tun_cipher", "PLAIN") #pl_tap.enable_trace("pcap") #pl_tap.enable_trace("packets") addr = pl_tap.add_address() adrr.set_attribute_value("Address", pl_addr) addr.set_attribute_value("NetPrefix", 32) addr.set_attribute_value("Broadcast", False) pl_node.connector("devs").connect(pl_tap.connector("node")) netns_tap = netns_desc.create("TunNodeInterface") netns_tap.set_attribute_value("label", netns_iface_label) netns_tap.set_attribute_value("up", True) netns_tap.set_attribute_value("mtu", 1448) addr = netns_tap.add_address() adrr.set_attribute_value("Address", netns_addr) addr.set_attribute_value("NetPrefix", 32) addr.set_attribute_value("Broadcast", False) route = netns_node.add_route() route.set_attribute_value("Destination", vnet) r1.set_attribute_value("NetPrefix", 24) r1.set_attribute_value("NextHop", pl_addr) netns_node.connector("devs").connect(netns_tap.connector("node")) netns_tunchannel = netns_desc.create("TunChannel") #netns_tunchannel.set_attribute_value("tun_cipher", "PLAIN") netns_tunchannel.connector("->fd").connect(netns_tap.connector("fd->")) pl_tap.connector("tcp").connect(netns_tunchannel.connector("tcp")) def make_pl_overlay(self, numnodes, num_wifi): ns3_testbed_id = "ns3" pl, netns, exp = self.make_experiment_desc() # We'll make a distribution spanning tree using prefix matching as a distance api = plutil.getAPI(self.pluser, self.plpass) nodes = plutil.getNodes(api, numnodes, operatingSystem = 'f12') root = min(nodes, key=operator.attrgetter('hostname')) links = list(plutil.getSpanningTree(nodes, root=root)) for node in nodes: node.vif_ips = set() node.children = [] node.childips = set() # Build an explicit tree for slave, master in links: master.children.append(slave) # We have to assign IPs and routes. # The IP will be assigned sequentially, depth-first. # This will result in rather compact routing rules nextip = [128-numnodes] def traverse(traverse, node, parent=None, base=struct.unpack('!L',socket.inet_aton(self.vnet))[0]): if nextip[0] >= 254: raise RuntimeError, "Too many IPs to assign!" node.vif_addr = base | (nextip[0]) nips = 1+len(node.children) # one vif per child, plus one for the parent nextip[0] += nips for i in xrange(nips): node.vif_ips.add(node.vif_addr+i) if parent: parent.childips.update(node.vif_ips) for i,child in enumerate(node.children): traverse(traverse, child, node, base) if parent: parent.childips.update(node.childips) traverse(traverse, root) def printtree(printtree, node, indent=''): print indent, '-', socket.inet_ntoa(struct.pack('!L',node.vif_addr)), node.country, node.city, node.site for child in node.children: childips = map(ipaddr.IPAddress, child.childips) childnets = ipaddr.collapse_address_list(childips) cip = ipaddr.IPAddress(child.vif_addr) for cnet in childnets: print indent, '|- R', cnet, '->', cip printtree(printtree, child, indent+' | ') printtree(printtree, root) inet = pl.create("Internet") def maketree(maketree, node, parent=None, parentIp=None): routes = [] ctaps = [] for i,child in enumerate(node.children): childips = map(ipaddr.IPAddress, child.childips) childnets = ipaddr.collapse_address_list(childips) cip = ipaddr.IPAddress(child.vif_addr) pip = ipaddr.IPAddress(node.vif_addr+1+i) for cnet in childnets: routes.append((cnet.ip.exploded, cnet.prefixlen, cip.exploded)) ctaps.append( maketree(maketree, child, node, pip) ) if parentIp: routes.append((self.vnet,24,parentIp)) if not parent: label = "root" else: label = None ips = [ ipaddr.IPAddress(node.vif_addr+i) for i in xrange(1+len(node.children)) ] node1, iface1, tap1, tap1ip, _ = self.make_pl_tapnode(pl, ips, inet, hostname = node.hostname, routes = routes, mcastrouter = bool(node.children), mcast = True, label = label ) for tap, ctap in zip(tap1[1:], ctaps): tap.connector("udp").connect(ctap.connector("udp")) self.add_net_monitor(pl, node1) self.add_vlc_restreamer(pl, node1) if random.random() < 0.1 and parent: self.add_vlc_dumper(pl, node1) return tap1[0] maketree(maketree, root) # create a netns node and connect it to the root pl node pl_root = exp_desc.get_element_by_label("root") netns_source = self.make_netns_node(netns) iflabel = "source-iface" self.make_pl_netns_connection(pl_desc, pl_root, netns_desc, netns_source, iflabel, self.vnet) self.add_vlc_source(netns, netns_n, iflabel) xml = exp.to_xml() test_dir = "./results" try: controller = ExperimentController(xml, self.root_dir) controller.start() print >>sys.stderr, "Press CTRL-C to shut down" try: while True: time.sleep(10) except KeyboardInterrupt: pass # download results for testbed_guid, guids in controller.traces_info().iteritems(): for guid, traces in guids.iteritems(): for name, data in traces.iteritems(): path = data["filepath"] if not path: continue print >>sys.stderr, "Downloading trace", path filepath = os.path.join(test_dir, path) try: trace = controller.trace(guid, name) except: traceback.print_exc(file=sys.stderr) continue try: if not os.path.exists(os.path.dirname(filepath)): os.makedirs(os.path.dirname(filepath)) except: traceback.print_exc(file=sys.stderr) try: if len(trace) >= 2**20: # Bigger than 1M, compress tracefile = gzip.GzipFile(filepath+".gz", "wb") else: tracefile = open(filepath,"wb") try: tracefile.write(trace) finally: tracefile.close() except: traceback.print_exc(file=sys.stderr) finally: try: controller.stop() except: import traceback traceback.print_exc() try: controller.shutdown() except: import traceback traceback.print_exc() if __name__ == '__main__': usage = "usage: %prog -n number_sta -m movie -u user" parser = OptionParser(usage=usage) parser.add_option("-u", "--user", dest="user", help="Valid linux system user (not root).", type="str") parser.add_option("-m", "--movie", dest="movie", help="Path to movie file to play", type="str") parser.add_option("-n", "--nsta", dest="nsta", help="Number of wifi stations", type="int") parser.add_option("-N", "--nodes", dest="nsta", help="Number of overlay nodes", type="int") parser.add_option("-a", "--base_addr", dest="base_addr", help="Base address segment for the experiment", type="str") parser.add_option("-s", "--slicename", dest="slicename", help="PlanetLab slice", type="str") (options, args) = parser.parse_args() if not options.movie: parser.error("Missing 'movie' option.") if options.user == 'root': parser.error("Missing or invalid 'user' option.") if options.nsta and options.nsta > 8: parser.error("Try a number of stations under 9.") exp = PlanetLabMulticastOverlay() exp.movie_source = options.movie try: exp.setUp() exp.make_pl_overlay(50, 8) finally: exp.tearDown()