From: Alina Quereilhac Date: Wed, 21 Jan 2015 00:25:39 +0000 (+0100) Subject: Tunnel between 2 ns-3s in remote PL hosts:q X-Git-Tag: nepi-3.2.0~14 X-Git-Url: http://git.onelab.eu/?p=nepi.git;a=commitdiff_plain;h=db88361f45034b3286b2d1f6967023a64890f6cb Tunnel between 2 ns-3s in remote PL hosts:q --- diff --git a/src/nepi/resources/linux/ns3/p2pfdudptunnel.py b/src/nepi/resources/linux/ns3/p2pfdudptunnel.py index e968ea30..72662b15 100644 --- a/src/nepi/resources/linux/ns3/p2pfdudptunnel.py +++ b/src/nepi/resources/linux/ns3/p2pfdudptunnel.py @@ -31,7 +31,8 @@ import time @clsinit_copy class LinuxNs3FdUdpTunnel(LinuxUdpTunnel): _rtype = "linux::ns3::FdUdpTunnel" - _help = "Constructs a tunnel between two Linux FileDescriptorNetdevices using a UDP connection " + _help = "Constructs a tunnel between two Ns-3 FdNetdevices " \ + "located in remote Linux nodes using a UDP connection " _platform = "linux::ns3" @classmethod @@ -157,7 +158,15 @@ class LinuxNs3FdUdpTunnel(LinuxUdpTunnel): "linux-ns3-fd-udp-connect.py") scripts.append(linux_passfd) - + + # tunnel creation python script + tunchannel = os.path.join(os.path.dirname(__file__), + "..", + "scripts", + "tunchannel.py") + + scripts.append(tunchannel) + # Upload scripts scripts = ";".join(scripts) @@ -327,7 +336,6 @@ class LinuxNs3FdUdpTunnel(LinuxUdpTunnel): address = base64.b64encode(address) command = [""] - # Use pl-vid-udp-connect.py to stablish the tunnel between endpoints command.append("PYTHONPATH=$PYTHONPATH:${SRC}") command.append("python ${SRC}/linux-ns3-fd-udp-connect.py") command.append("-a %s" % address) diff --git a/src/nepi/resources/planetlab/ns3/p2pfdudptunnel.py b/src/nepi/resources/planetlab/ns3/p2pfdudptunnel.py new file mode 100644 index 00000000..5e63b0dc --- /dev/null +++ b/src/nepi/resources/planetlab/ns3/p2pfdudptunnel.py @@ -0,0 +1,379 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import clsinit_copy, ResourceState +from nepi.resources.linux.ns3.p2pfdudptunnel import LinuxNs3P2PFdUdpTunnel +from nepi.util.sshfuncs import ProcStatus +from nepi.util.timefuncs import tnow, tdiffsec + +import base64 +import os +import socket +import time + +@clsinit_copy +class PlanetlabNs3P2PFdUdpTunnel(LinuxUdpTunnel): + _rtype = "planetlab::ns3::P2PFdUdpTunnel" + _help = "Constructs a tunnel between two Ns-3 FdNetdevices " \ + "located in remote PlanetLab nodes using a UDP connection " + _platform = "planetlab::ns3" + + @classmethod + def _register_attributes(cls): + cipher = Attribute("cipher", + "Cipher to encript communication. " + "One of PLAIN, AES, Blowfish, DES, DES3. ", + default = None, + allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"], + type = Types.Enumerate, + flags = Flags.Design) + + cipher_key = Attribute("cipherKey", + "Specify a symmetric encryption key with which to protect " + "packets across the tunnel. python-crypto must be installed " + "on the system." , + flags = Flags.Design) + + txqueuelen = Attribute("txQueueLen", + "Specifies the interface's transmission queue length. " + "Defaults to 1000. ", + type = Types.Integer, + flags = Flags.Design) + + bwlimit = Attribute("bwLimit", + "Specifies the interface's emulated bandwidth in bytes " + "per second.", + type = Types.Integer, + flags = Flags.Design) + + cls._register_attribute(cipher) + cls._register_attribute(cipher_key) + cls._register_attribute(txqueuelen) + cls._register_attribute(bwlimit) + + def __init__(self, ec, guid): + super(PlanetlabNs3P2PFdUdpTunnel, self).__init__(ec, guid) + self._home = "p2p-fd-udp-tunnel-%s" % self.guid + self._pids = dict() + self._fd1 = None + self._fd1node = None + self._fd2 = None + self._fd2node = None + + def log_message(self, msg): + self.get_endpoints() + return " guid %d - PlanetlabNs3P2PFdUdpTunnel - %s - %s - %s " % (self.guid, + self.node1.get("hostname"), + self.node2.get("hostname"), + msg) + + def get_endpoints(self): + """ Returns the list of RM that are endpoints to the tunnel + """ + if not self._fd2 or not self._fd1: + from nepi.resources.ns3.ns3fdnetdevice import NS3BaseFdNetDevice + devices = self.get_connected(NS3BaseFdNetDevice.get_rtype()) + if not devices or len(devices) != 2: + msg = "Tunnel must be connected to exactly two FdNetDevices" + self.error(msg) + raise RuntimeError, msg + + self._fd1 = devices[0] + self._fd2 = devices[1] + + # Set PI headers on + self._fd1.set("EncapsulationMode", "DixPi") + self._fd2.set("EncapsulationMode", "DixPi") + + simu = self._fd1.simulation + from nepi.resources.linux.node import LinuxNode + nodes = simu.get_connected(LinuxNode.get_rtype()) + self._fd1node = nodes[0] + + simu = self._fd2.simulation + from nepi.resources.linux.node import LinuxNode + nodes = simu.get_connected(LinuxNode.get_rtype()) + self._fd2node = nodes[0] + + if self._fd1node.get("hostname") == \ + self._fd2node.get("hostname"): + msg = "Tunnel requires endpoints on different hosts" + self.error(msg) + raise RuntimeError, msg + + return [self._fd1, self._fd2] + + @property + def endpoint1(self): + return self._fd1 + + @property + def endpoint2(self): + return self._fd2 + + @property + def node1(self): + return self._fd1node + + @property + def node2(self): + return self._fd2node + + def endpoint_node(self, endpoint): + node = None + if endpoint == self.endpoint1: + node = self.node1 + else: + node = self.node2 + + return node + + def app_home(self, endpoint): + node = self.endpoint_node(endpoint) + return os.path.join(node.exp_home, self._home) + + def run_home(self, endpoint): + return os.path.join(self.app_home(endpoint), self.ec.run_id) + + def upload_sources(self, endpoint): + scripts = [] + + # vif-passfd python script + fd_udp_connect = os.path.join(os.path.dirname(__file__), + "..", + "scripts", + "pl-fd-udp-connect.py") + + scripts.append(fd_udp_connect) + + # tunnel creation python script + tunchannel = os.path.join(os.path.dirname(__file__), + "..", "..", "linux", + "scripts", + "tunchannel.py") + + scripts.append(tunchannel) + + # Upload scripts + scripts = ";".join(scripts) + + node = self.endpoint_node(endpoint) + node.upload(scripts, + os.path.join(node.src_dir), + overwrite = False) + + def endpoint_mkdir(self, endpoint): + node = self.endpoint_node(endpoint) + run_home = self.run_home(endpoint) + node.mkdir(run_home) + + def initiate_connection(self, endpoint, remote_endpoint): + cipher = self.get("cipher") + cipher_key = self.get("cipherKey") + bwlimit = self.get("bwLimit") + txqueuelen = self.get("txQueueLen") + + # Upload the tunnel creating script + self.upload_sources(endpoint) + + # Request an address to send the file descriptor to the ns-3 simulation + address = endpoint.recv_fd() + + # execute the tunnel creation script + node = self.endpoint_node(remote_endpoint) + port = self.initiate(endpoint, remote_endpoint, address, cipher, + cipher_key, bwlimit, txqueuelen) + + return port + + def establish_connection(self, endpoint, remote_endpoint, port): + self.establish(endpoint, remote_endpoint, port) + + def verify_connection(self, endpoint, remote_endpoint): + self.verify(endpoint) + + def terminate_connection(self, endpoint, remote_endpoint): + # Nothing to do + return + + def check_state_connection(self): + # Make sure the process is still running in background + # No execution errors occurred. Make sure the background + # process with the recorded pid is still running. + + node1 = self.endpoint_node(self.endpoint1) + node2 = self.endpoint_node(self.endpoint2) + run_home1 = self.run_home(self.endpoint1) + run_home2 = self.run_home(self.endpoint1) + (pid1, ppid1) = self._pids[endpoint1] + (pid2, ppid2) = self._pids[endpoint2] + + status1 = node1.status(pid1, ppid1) + status2 = node2.status(pid2, ppid2) + + if status1 == ProcStatus.FINISHED and \ + status2 == ProcStatus.FINISHED: + + # check if execution errors occurred + (out1, err1), proc1 = node1.check_errors(run_home1) + (out2, err2), proc2 = node2.check_errors(run_home2) + + if err1 or err2: + msg = "Error occurred in tunnel" + self.error(msg, err1, err2) + self.fail() + else: + self.set_stopped() + + def wait_local_port(self, endpoint): + """ Waits until the local_port file for the endpoint is generated, + and returns the port number + + """ + return self.wait_file(endpoint, "local_port") + + def wait_result(self, endpoint): + """ Waits until the return code file for the endpoint is generated + + """ + return self.wait_file(endpoint, "ret_file") + + def wait_file(self, endpoint, filename): + """ Waits until file on endpoint is generated """ + result = None + delay = 1.0 + + node = self.endpoint_node(endpoint) + run_home = self.run_home(endpoint) + + for i in xrange(20): + (out, err), proc = node.check_output(run_home, filename) + + if out: + result = out.strip() + break + else: + time.sleep(delay) + delay = delay * 1.5 + else: + msg = "Couldn't retrieve %s" % filename + self.error(msg, out, err) + raise RuntimeError, msg + + return result + + def initiate(self, endpoint, remote_endpoint, address, cipher, cipher_key, + bwlimit, txqueuelen): + + command = self._initiate_command(endpoint, remote_endpoint, + address, cipher, cipher_key, bwlimit, txqueuelen) + + node = self.endpoint_node(endpoint) + run_home = self.run_home(endpoint) + app_home = self.app_home(endpoint) + + # upload command to connect.sh script + shfile = os.path.join(app_home, "fd-udp-connect.sh") + node.upload_command(command, + shfile = shfile, + overwrite = False) + + # invoke connect script + cmd = "bash %s" % shfile + (out, err), proc = node.run(cmd, run_home) + + # check if execution errors occurred + msg = "Failed to connect endpoints " + + if proc.poll(): + self.error(msg, out, err) + raise RuntimeError, msg + + # Wait for pid file to be generated + pid, ppid = node.wait_pid(run_home) + + self._pids[endpoint] = (pid, ppid) + + # Check for error information on the remote machine + (out, err), proc = node.check_errors(run_home) + # Out is what was written in the stderr file + if err: + msg = " Failed to start command '%s' " % command + self.error(msg, out, err) + raise RuntimeError, msg + + port = self.wait_local_port(endpoint) + + return port + + def _initiate_command(self, endpoint, remote_endpoint, address, + cipher, cipher_key, bwlimit, txqueuelen): + local_node = self.endpoint_node(endpoint) + local_run_home = self.run_home(endpoint) + local_app_home = self.app_home(endpoint) + remote_node = self.endpoint_node(remote_endpoint) + + local_ip = local_node.get("ip") + remote_ip = remote_node.get("ip") + + local_port_file = os.path.join(local_run_home, "local_port") + remote_port_file = os.path.join(local_run_home, "remote_port") + ret_file = os.path.join(local_run_home, "ret_file") + + address = base64.b64encode(address) + + command = [""] + command.append("PYTHONPATH=$PYTHONPATH:${SRC}") + command.append("python ${SRC}/pl-fd-udp-connect.py") + command.append("-a %s" % address) + command.append("-p %s " % local_port_file) + command.append("-P %s " % remote_port_file) + command.append("-o %s " % local_ip) + command.append("-O %s " % remote_ip) + command.append("-R %s " % ret_file) + if cipher: + command.append("-c %s " % cipher) + if cipher_key: + command.append("-k %s " % cipher_key) + if txqueuelen: + command.append("-q %s " % txqueuelen) + if bwlimit: + command.append("-b %s " % bwlimit) + + command = " ".join(command) + command = self.replace_paths(command, node=local_node, + app_home=local_app_home, run_home=local_run_home) + + return command + + def establish(self, endpoint, remote_endpoint, port): + node = self.endpoint_node(endpoint) + run_home = self.run_home(endpoint) + + # upload remote port number to file + remote_port = "%s\n" % port + node.upload(remote_port, + os.path.join(run_home, "remote_port"), + text = True, + overwrite = False) + + def verify(self, endpoint): + self.wait_result(endpoint) + diff --git a/src/nepi/resources/planetlab/scripts/pl-fd-udp-connect.py b/src/nepi/resources/planetlab/scripts/pl-fd-udp-connect.py new file mode 100644 index 00000000..b576813f --- /dev/null +++ b/src/nepi/resources/planetlab/scripts/pl-fd-udp-connect.py @@ -0,0 +1,168 @@ +import base64 +import errno +import os +import passfd +import signal +import socket +import time +import tunchannel + +from optparse import OptionParser + +IFF_TUN = 0x0001 +IFF_TAP = 0x0002 + +# Trak SIGTERM, and set global termination flag instead of dying +TERMINATE = [] +STARTED = False + +def _finalize(sig,frame): + global TERMINATE + global STARTED + + if STARTED: + TERMINATE.append(None) + else: + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.kill(os.getpid(), signal.SIGTERM) + +signal.signal(signal.SIGTERM, _finalize) + +# SIGUSR1 suspends forwading, SIGUSR2 resumes forwarding +SUSPEND = [] +def _suspend(sig,frame): + global SUSPEND + if not SUSPEND: + SUSPEND.append(None) +signal.signal(signal.SIGUSR1, _suspend) + +def _resume(sig,frame): + global SUSPEND + if SUSPEND: + SUSPEND.remove(None) +signal.signal(signal.SIGUSR2, _resume) + +def get_options(): + usage = ("usage: %prog -a
-b -c " + "- k -q -p " + "-P -o -O " + "-r ") + + parser = OptionParser(usage = usage) + + parser.add_option("-a", "--address", dest="address", + help="Socket address to send file descriptor to", type="str") + + parser.add_option("-b", "--bwlimit", dest="bwlimit", + help="Specifies the interface's emulated bandwidth in bytes ", + default=None, type="int") + parser.add_option("-q", "--txqueuelen", dest="txqueuelen", + help="Specifies the interface's transmission queue length. ", + default=1000, type="int") + parser.add_option("-c", "--cipher", dest="cipher", + help="Cipher to encript communication. " + "One of PLAIN, AES, Blowfish, DES, DES3. ", + default=None, type="str") + parser.add_option("-k", "--cipher-key", dest="cipher_key", + help="Specify a symmetric encryption key with which to protect " + "packets across the tunnel. python-crypto must be installed " + "on the system." , + default=None, type="str") + + parser.add_option("-p", "--local-port-file", dest="local_port_file", + help = "File where to store the local binded UDP port number ", + default = "local_port_file", type="str") + parser.add_option("-P", "--remote-port-file", dest="remote_port_file", + help = "File where to read the remote UDP port number to connect to", + default = "remote_port_file", type="str") + parser.add_option("-o", "--local-ip", dest="local_ip", + help = "Local host IP", type="str") + parser.add_option("-O", "--remote-ip", dest="remote_ip", + help = "Remote host IP", type="str") + parser.add_option("-R", "--ret-file", dest="ret_file", + help = "File where to store return code (success of connection) ", + default = "ret_file", type="str") + + (options, args) = parser.parse_args() + + address = base64.b64decode(options.address) + + return (address, options.local_port_file, options.remote_port_file, + options.local_ip, options.remote_ip, options.ret_file, + options.bwlimit, options.cipher, options.cipher_key, + options.txqueuelen) + +if __name__ == '__main__': + + (address, local_port_file, remote_port_file, local_ip, remote_ip, + ret_file, bwlimit, cipher, cipher_key, txqueuelen) = get_options() + + # Create a local socket to stablish the tunnel connection + rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + rsock.bind((local_ip, 0)) + (local_host, local_port) = rsock.getsockname() + + # Save local port information to file + f = open(local_port_file, 'w') + f.write("%d\n" % local_port) + f.close() + + # Wait until remote port information is available + while not os.path.exists(remote_port_file): + time.sleep(2) + + remote_port = '' + # Read remote port from file + # Try until something is read... + # xxx: There seems to be a weird behavior where + # even if the file exists and had the port number, + # the read operation returns empty string! + # Maybe a race condition? + for i in xrange(10): + f = open(remote_port_file, 'r') + remote_port = f.read() + f.close() + + if remote_port: + break + + time.sleep(2) + + remote_port = remote_port.strip() + remote_port = int(remote_port) + + # Connect local socket to remote port + rsock.connect((remote_ip, remote_port)) + remote = os.fdopen(rsock.fileno(), 'r+b', 0) + + # create local socket to pass to fd-net-device, the other is for the tunnel + fd, tun = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0) + + # pass one end of the socket pair to the fd-net-device + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.connect(address) + passfd.sendfd(sock, fd, '0') + + # TODO: Test connectivity! + + # Create a ret_file to indicate success + f = open(ret_file, 'w') + f.write("0") + f.close() + + STARTED = True + + # Establish tunnel + tunchannel.tun_fwd(tun, remote, + with_pi = True, # No PI headers + ether_mode = IFF_TAP, # Ns-3 generates ethernet pkts + udp = True, + cipher_key = cipher_key, + cipher = cipher, + TERMINATE = TERMINATE, + SUSPEND = SUSPEND, + tunqueue = txqueuelen, + tunkqueue = 500, + bwlimit = bwlimit + ) + diff --git a/test/resources/planetlab/ns3/cross_ns3_planetlab_fdtunnel_ping.py b/test/resources/planetlab/ns3/cross_ns3_planetlab_fdtunnel_ping.py new file mode 100755 index 00000000..0c05952a --- /dev/null +++ b/test/resources/planetlab/ns3/cross_ns3_planetlab_fdtunnel_ping.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceState, ResourceAction +from nepi.execution.trace import TraceAttr + +from test_utils import skipIfAnyNotAliveWithIdentity + +import os +import time +import unittest + +def add_ns3_node(ec, simu): + node = ec.register_resource("ns3::Node") + ec.set(node, "enableStack", True) + ec.register_connection(node, simu) + + return node + +def add_fd_device(ec, node, ip, prefix): + dev = ec.register_resource("ns3::FdNetDevice") + ec.set(dev, "ip", ip) + ec.set(dev, "prefix", prefix) + ec.register_connection(node, dev) + + return dev + +def add_tap_device(ec, node, ip, prefix): + dev = ec.register_resource("linux::Tap") + ec.set(dev, "ip", ip) + ec.set(dev, "prefix", prefix) + ec.register_connection(node, dev) + + return dev + +def add_point2point_device(ec, node, ip, prefix): + dev = ec.register_resource("ns3::PointToPointNetDevice") + ec.set(dev, "ip", ip) + ec.set(dev, "prefix", prefix) + ec.register_connection(node, dev) + + queue = ec.register_resource("ns3::DropTailQueue") + ec.register_connection(dev, queue) + + return dev + +class LinuxNS3FdNetDeviceTest(unittest.TestCase): + def setUp(self): + self.hostname1 = "onelab4.warsaw.rd.tp.pl" + self.hostname2 = "inriarennes1.irisa.fr" + self.slicename = "inria_nepi" + self.identity = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME']) + + @skipIfAnyNotAliveWithIdentity + def t_cross_tunnel_ping(self, username1, hostname1, identity1, + username2, hostname2, identity2): + ec = ExperimentController(exp_id = "test-ns3-pl-fd-tunnel") + + node1 = ec.register_resource("planetlab::Node") + ec.set(node1, "hostname", hostname1) + ec.set(node1, "username", username1) + ec.set(node1, "identity", identity1) + ec.set(node1, "cleanProcesses", True) + ec.set(node1, "cleanExperiment", True) + + simu1 = ec.register_resource("linux::ns3::Simulation") + ec.set(simu1, "simulatorImplementationType", "ns3::RealtimeSimulatorImpl") + ec.set(simu1, "checksumEnabled", True) + ec.set(simu1, "verbose", True) + #ec.set(simu1, "buildMode", "debug") + #ec.set(simu1, "nsLog", "FdNetDevice") + ec.register_connection(simu1, node1) + + nsnode1 = ec.register_resource("ns3::Node") + ec.set(nsnode1, "enableStack", True) + ec.register_connection(nsnode1, simu1) + + fddev1 = ec.register_resource("ns3::FdNetDevice") + ec.set(fddev1, "ip", "10.0.0.1") + ec.set(fddev1, "prefix", "30") + ec.register_connection(nsnode1, fddev1) + + node2 = ec.register_resource("planetlab::Node") + ec.set(node2, "hostname", hostname2) + ec.set(node2, "username", username2) + ec.set(node2, "identity", identity2) + ec.set(node2, "cleanProcesses", True) + ec.set(node2, "cleanExperiment", True) + + simu2 = ec.register_resource("linux::ns3::Simulation") + ec.set(simu2, "simulatorImplementationType", "ns3::RealtimeSimulatorImpl") + ec.set(simu2, "checksumEnabled", True) + ec.set(simu2, "verbose", True) + #ec.set(simu2, "buildMode", "debug") + #ec.set(simu2, "nsLog", "FdNetDevice") + ec.register_connection(simu2, node2) + + nsnode2 = ec.register_resource("ns3::Node") + ec.set(nsnode2, "enableStack", True) + ec.register_connection(nsnode2, simu2) + + fddev2 = ec.register_resource("ns3::FdNetDevice") + ec.set(fddev2, "ip", "10.0.0.2") + ec.set(fddev2, "prefix", "30") + ec.register_connection(nsnode2, fddev2) + + tunnel = ec.register_resource("planetlab::ns3::P2PFdUdpTunnel") + ec.register_connection(tunnel, fddev1) + ec.register_connection(tunnel, fddev2) + + ping = ec.register_resource("ns3::V4Ping") + ec.set (ping, "Remote", "10.0.0.2") + ec.set (ping, "Interval", "1s") + ec.set (ping, "Verbose", True) + ec.set (ping, "StartTime", "0s") + ec.set (ping, "StopTime", "20s") + ec.register_connection(ping, nsnode1) + + ec.deploy() + + ec.wait_finished([ping]) + + stdout = ec.trace(simu1, "stdout") + + expected = "20 packets transmitted, 20 received, 0% packet loss" + self.assertTrue(stdout.find(expected) > -1) + + ec.shutdown() + + def test_cross_tunnel_ping(self): + self.t_cross_tunnel_ping(self.slicename, self.hostname1, self.identity, + self.slicename, self.hostname2, self.identity) + + +if __name__ == '__main__': + unittest.main() +