From: Alina Quereilhac Date: Mon, 28 Jul 2014 16:50:48 +0000 (+0200) Subject: Making UdpTunnel inherite from abstract Tunnel RM X-Git-Tag: nepi-3.2.0~116 X-Git-Url: http://git.onelab.eu/?p=nepi.git;a=commitdiff_plain;h=4ec5c9e5454b68a3dab82a5073aee50231706706 Making UdpTunnel inherite from abstract Tunnel RM --- diff --git a/src/nepi/resources/linux/gretunnel.py b/src/nepi/resources/linux/gretunnel.py new file mode 100644 index 00000000..85620886 --- /dev/null +++ b/src/nepi/resources/linux/gretunnel.py @@ -0,0 +1,291 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import clsinit_copy, ResourceState, \ + reschedule_delay +from nepi.resources.linux.application import LinuxApplication +from nepi.util.sshfuncs import ProcStatus +from nepi.util.timefuncs import tnow, tdiffsec + +import os +import socket +import time + +@clsinit_copy +class LinuxGRETunnel(LinuxApplication): + _rtype = "LinuxGRETunnel" + _help = "Constructs a tunnel between two Linux endpoints using a UDP connection " + _backend = "linux" + + @classmethod + def _register_attributes(cls): + bwlimit = Attribute("bwLimit", + "Specifies the interface's emulated bandwidth in bytes " + "per second.", + type = Types.Integer, + flags = Flags.Design) + + cls._register_attribute(bwlimit) + + def __init__(self, ec, guid): + super(LinuxGRETunnel, self).__init__(ec, guid) + + def log_message(self, msg): + return " guid %d - GRE tunnel %s - %s - %s " % (self.guid, + self.endpoint1.node.get("hostname"), + self.endpoint2.node.get("hostname"), + msg) + + def get_endpoints(self): + """ Returns the list of RM that are endpoints to the tunnel + """ + connected = [] + for guid in self.connections: + rm = self.ec.get_resource(guid) + if hasattr(rm, "udp_connect_command"): + connected.append(rm) + return connected + + @property + def endpoint1(self): + endpoints = self.get_endpoints() + if endpoints: return endpoints[0] + return None + + @property + def endpoint2(self): + endpoints = self.get_endpoints() + if endpoints and len(endpoints) > 1: return endpoints[1] + return None + + def app_home(self, endpoint): + return os.path.join(endpoint.node.exp_home, self._home) + + def run_home(self, endpoint): + return os.path.join(self.app_home(endpoint), self.ec.run_id) + + def udp_connect(self, endpoint, remote_ip): + # Get udp connect command + local_port_file = os.path.join(self.run_home(endpoint), + "local_port") + remote_port_file = os.path.join(self.run_home(endpoint), + "remote_port") + ret_file = os.path.join(self.run_home(endpoint), + "ret_file") + cipher = self.get("cipher") + cipher_key = self.get("cipherKey") + bwlimit = self.get("bwLimit") + txqueuelen = self.get("txQueueLen") + udp_connect_command = endpoint.udp_connect_command( + remote_ip, local_port_file, remote_port_file, + ret_file, cipher, cipher_key, bwlimit, txqueuelen) + + # upload command to connect.sh script + shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh") + endpoint.node.upload(udp_connect_command, + shfile, + text = True, + overwrite = False) + + # invoke connect script + cmd = "bash %s" % shfile + (out, err), proc = endpoint.node.run(cmd, self.run_home(endpoint)) + + # check if execution errors occurred + msg = " Failed to connect endpoints " + + if proc.poll(): + self.error(msg, out, err) + raise RuntimeError, msg + + # Wait for pid file to be generated + pid, ppid = endpoint.node.wait_pid(self.run_home(endpoint)) + + # If the process is not running, check for error information + # on the remote machine + if not pid or not ppid: + (out, err), proc = endpoint.node.check_errors(self.run_home(endpoint)) + # Out is what was written in the stderr file + if err: + msg = " Failed to start command '%s' " % command + self.error(msg, out, err) + raise RuntimeError, msg + + # wait until port is written to file + port = self.wait_local_port(endpoint) + return (port, pid, ppid) + + def do_provision(self): + # create run dir for tunnel on each node + self.endpoint1.node.mkdir(self.run_home(self.endpoint1)) + self.endpoint2.node.mkdir(self.run_home(self.endpoint2)) + + # Invoke connect script in endpoint 1 + remote_ip1 = socket.gethostbyname(self.endpoint2.node.get("hostname")) + (port1, self._pid1, self._ppid1) = self.udp_connect(self.endpoint1, + remote_ip1) + + # Invoke connect script in endpoint 2 + remote_ip2 = socket.gethostbyname(self.endpoint1.node.get("hostname")) + (port2, self._pid2, self._ppid2) = self.udp_connect(self.endpoint2, + remote_ip2) + + # upload file with port 2 to endpoint 1 + self.upload_remote_port(self.endpoint1, port2) + + # upload file with port 1 to endpoint 2 + self.upload_remote_port(self.endpoint2, port1) + + # check if connection was successful on both sides + self.wait_result(self.endpoint1) + self.wait_result(self.endpoint2) + + self.info("Provisioning finished") + + self.set_provisioned() + + def do_deploy(self): + if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ + (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): + self.ec.schedule(reschedule_delay, self.deploy) + else: + self.do_discover() + self.do_provision() + + self.set_ready() + + def do_start(self): + if self.state == ResourceState.READY: + command = self.get("command") + self.info("Starting command '%s'" % command) + + self.set_started() + else: + msg = " Failed to execute command '%s'" % command + self.error(msg, out, err) + raise RuntimeError, msg + + def do_stop(self): + """ Stops application execution + """ + if self.state == ResourceState.STARTED: + self.info("Stopping tunnel") + + # Only try to kill the process if the pid and ppid + # were retrieved + if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: + (out1, err1), proc1 = self.endpoint1.node.kill(self._pid1, + self._ppid1, sudo = True) + (out2, err2), proc2 = self.endpoint2.node.kill(self._pid2, + self._ppid2, sudo = True) + + if (proc1.poll() and err1) or (proc2.poll() and err2): + # check if execution errors occurred + msg = " Failed to STOP tunnel" + self.error(msg, err1, err2) + raise RuntimeError, msg + + self.set_stopped() + + @property + def state(self): + """ Returns the state of the application + """ + if self._state == ResourceState.STARTED: + # In order to avoid overwhelming the remote host and + # the local processor with too many ssh queries, the state is only + # requested every 'state_check_delay' seconds. + state_check_delay = 0.5 + if tdiffsec(tnow(), self._last_state_check) > state_check_delay: + if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: + # Make sure the process is still running in background + # No execution errors occurred. Make sure the background + # process with the recorded pid is still running. + status1 = self.endpoint1.node.status(self._pid1, self._ppid1) + status2 = self.endpoint2.node.status(self._pid2, self._ppid2) + + if status1 == ProcStatus.FINISHED and \ + status2 == ProcStatus.FINISHED: + + # check if execution errors occurred + (out1, err1), proc1 = self.endpoint1.node.check_errors( + self.run_home(self.endpoint1)) + + (out2, err2), proc2 = self.endpoint2.node.check_errors( + self.run_home(self.endpoint2)) + + if err1 or err2: + msg = "Error occurred in tunnel" + self.error(msg, err1, err2) + self.fail() + else: + self.set_stopped() + + self._last_state_check = tnow() + + return self._state + + def wait_local_port(self, endpoint): + """ Waits until the local_port file for the endpoint is generated, + and returns the port number + + """ + return self.wait_file(endpoint, "local_port") + + def wait_result(self, endpoint): + """ Waits until the return code file for the endpoint is generated + + """ + return self.wait_file(endpoint, "ret_file") + + def wait_file(self, endpoint, filename): + """ Waits until file on endpoint is generated """ + result = None + delay = 1.0 + + for i in xrange(20): + (out, err), proc = endpoint.node.check_output( + self.run_home(endpoint), filename) + + if out: + result = out.strip() + break + else: + time.sleep(delay) + delay = delay * 1.5 + else: + msg = "Couldn't retrieve %s" % filename + self.error(msg, out, err) + raise RuntimeError, msg + + return result + + def upload_remote_port(self, endpoint, port): + # upload remote port number to file + port = "%s\n" % port + endpoint.node.upload(port, + os.path.join(self.run_home(endpoint), "remote_port"), + text = True, + overwrite = False) + + def valid_connection(self, guid): + # TODO: Validate! + return True + diff --git a/src/nepi/resources/linux/tunnel.py b/src/nepi/resources/linux/tunnel.py new file mode 100644 index 00000000..5f8b7f8f --- /dev/null +++ b/src/nepi/resources/linux/tunnel.py @@ -0,0 +1,167 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.resource import clsinit_copy, ResourceState, \ + reschedule_delay +from nepi.resources.linux.application import LinuxApplication +from nepi.util.timefuncs import tnow, tdiffsec + +import os +import time + +state_check_delay = 0.5 + +@clsinit_copy +class LinuxTunnel(LinuxApplication): + _rtype = "abstract::LinuxTunnel" + _help = "Constructs a tunnel between two Linux endpoints" + _backend = "linux" + + def __init__(self, ec, guid): + super(LinuxTunnel, self).__init__(ec, guid) + self._home = "tunnel-%s" % self.guid + + def log_message(self, msg): + return " guid %d - tunnel %s - %s - %s " % (self.guid, + self.endpoint1.node.get("hostname"), + self.endpoint2.node.get("hostname"), + msg) + + def get_endpoints(self): + """ Returns the list of RM that are endpoints to the tunnel + """ + connected = [] + for guid in self.connections: + rm = self.ec.get_resource(guid) + if hasattr(rm, "udp_connect_command"): + connected.append(rm) + return connected + + @property + def endpoint1(self): + endpoints = self.get_endpoints() + if endpoints: return endpoints[0] + return None + + @property + def endpoint2(self): + endpoints = self.get_endpoints() + if endpoints and len(endpoints) > 1: return endpoints[1] + return None + + def app_home(self, endpoint): + return os.path.join(endpoint.node.exp_home, self._home) + + def run_home(self, endpoint): + return os.path.join(self.app_home(endpoint), self.ec.run_id) + + def initiate_connection(self, endpoint, remote_endpoint): + raise NotImplementedError + + def establish_connection(self, endpoint, remote_endpoint, data): + raise NotImplementedError + + def verify_connection(self, endpoint, remote_endpoint): + raise NotImplementedError + + def terminate_connection(self, endpoint, remote_endpoint): + raise NotImplementedError + + def check_state_connection(self, endpoint, remote_endpoint): + raise NotImplementedError + + def do_provision(self): + # create run dir for tunnel on each node + self.endpoint1.node.mkdir(self.run_home(self.endpoint1)) + self.endpoint2.node.mkdir(self.run_home(self.endpoint2)) + + # Start 2 step connection + # Initiate connection from endpoint 1 to endpoint 2 + data1 = self.initiate_connection(self.endpoint1, self.endpoint2) + + # Initiate connection from endpoint 2 to endpoint 1 + data2 = self.initiate_connection(self.endpoint2, self.endpoint1) + + # Establish connection from endpoint 1 to endpoint 2 + self.establish_connection(self.endpoint1, self.endpoint2, data2) + + # Establish connection from endpoint 2 to endpoint 1 + self.establish_connection(self.endpoint2, self.endpoint1, data1) + + # check if connection was successful on both sides + self.verify_connection(self.endpoint1, self.endpoint2) + self.verify_connection(self.endpoint2, self.endpoint1) + + self.info("Provisioning finished") + + self.set_provisioned() + + def do_deploy(self): + if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ + (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): + self.ec.schedule(reschedule_delay, self.deploy) + else: + self.do_discover() + self.do_provision() + + self.set_ready() + + def do_start(self): + if self.state == ResourceState.READY: + command = self.get("command") + self.info("Starting command '%s'" % command) + + self.set_started() + else: + msg = " Failed to execute command '%s'" % command + self.error(msg, out, err) + raise RuntimeError, msg + + def do_stop(self): + """ Stops application execution + """ + if self.state == ResourceState.STARTED: + self.info("Stopping tunnel") + + self.terminate_connection(self.endpoint1, self.endpoint2) + self.terminate_connection(self.endpoint2, self.endpoint1) + + self.set_stopped() + + @property + def state(self): + """ Returns the state of the application + """ + if self._state == ResourceState.STARTED: + # In order to avoid overwhelming the remote host and + # the local processor with too many ssh queries, the state is only + # requested every 'state_check_delay' seconds. + if tdiffsec(tnow(), self._last_state_check) > state_check_delay: + + self.check_state_connection() + + self._last_state_check = tnow() + + return self._state + + + def valid_connection(self, guid): + # TODO: Validate! + return True + diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index 01f6898b..5b0eac7c 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import clsinit_copy, ResourceState, \ reschedule_delay -from nepi.resources.linux.application import LinuxApplication +from nepi.resources.linux.tunnel import LinuxTunnel from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec @@ -29,8 +29,8 @@ import socket import time @clsinit_copy -class UdpTunnel(LinuxApplication): - _rtype = "UdpTunnel" +class LinuxUdpTunnel(LinuxTunnel): + _rtype = "LinuxUdpTunnel" _help = "Constructs a tunnel between two Linux endpoints using a UDP connection " _backend = "linux" @@ -68,62 +68,28 @@ class UdpTunnel(LinuxApplication): cls._register_attribute(bwlimit) def __init__(self, ec, guid): - super(UdpTunnel, self).__init__(ec, guid) + super(LinuxUdpTunnel, self).__init__(ec, guid) self._home = "udp-tunnel-%s" % self.guid - self._pid1 = None - self._ppid1 = None - self._pid2 = None - self._ppid2 = None + self._pids = dict() def log_message(self, msg): - return " guid %d - tunnel %s - %s - %s " % (self.guid, + return " guid %d - udptunnel %s - %s - %s " % (self.guid, self.endpoint1.node.get("hostname"), self.endpoint2.node.get("hostname"), msg) - def get_endpoints(self): - """ Returns the list of RM that are endpoints to the tunnel - """ - connected = [] - for guid in self.connections: - rm = self.ec.get_resource(guid) - if hasattr(rm, "udp_connect_command"): - connected.append(rm) - return connected - - @property - def endpoint1(self): - endpoints = self.get_endpoints() - if endpoints: return endpoints[0] - return None - - @property - def endpoint2(self): - endpoints = self.get_endpoints() - if endpoints and len(endpoints) > 1: return endpoints[1] - return None - - def app_home(self, endpoint): - return os.path.join(endpoint.node.exp_home, self._home) - - def run_home(self, endpoint): - return os.path.join(self.app_home(endpoint), self.ec.run_id) - - def udp_connect(self, endpoint, remote_ip): - # Get udp connect command - local_port_file = os.path.join(self.run_home(endpoint), - "local_port") - remote_port_file = os.path.join(self.run_home(endpoint), - "remote_port") - ret_file = os.path.join(self.run_home(endpoint), - "ret_file") + def initiate_connection(self, endpoint, remote_endpoint): cipher = self.get("cipher") cipher_key = self.get("cipherKey") bwlimit = self.get("bwLimit") txqueuelen = self.get("txQueueLen") + + # Return the command to execute to initiate the connection to the + # other endpoint + connection_run_home = self.run_home(endpoint) udp_connect_command = endpoint.udp_connect_command( - remote_ip, local_port_file, remote_port_file, - ret_file, cipher, cipher_key, bwlimit, txqueuelen) + remote_endpoint, connection_run_home, + cipher, cipher_key, bwlimit, txqueuelen) # upload command to connect.sh script shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh") @@ -158,117 +124,56 @@ class UdpTunnel(LinuxApplication): # wait until port is written to file port = self.wait_local_port(endpoint) - return (port, pid, ppid) - def do_provision(self): - # create run dir for tunnel on each node - self.endpoint1.node.mkdir(self.run_home(self.endpoint1)) - self.endpoint2.node.mkdir(self.run_home(self.endpoint2)) + self._pids[endpoint] = (pid, ppid) - # Invoke connect script in endpoint 1 - remote_ip1 = socket.gethostbyname(self.endpoint2.node.get("hostname")) - (port1, self._pid1, self._ppid1) = self.udp_connect(self.endpoint1, - remote_ip1) + return port - # Invoke connect script in endpoint 2 - remote_ip2 = socket.gethostbyname(self.endpoint1.node.get("hostname")) - (port2, self._pid2, self._ppid2) = self.udp_connect(self.endpoint2, - remote_ip2) + def establish_connection(self, endpoint, remote_endpoint, port): + self.upload_remote_port(endpoint, port) - # upload file with port 2 to endpoint 1 - self.upload_remote_port(self.endpoint1, port2) - - # upload file with port 1 to endpoint 2 - self.upload_remote_port(self.endpoint2, port1) + def verify_connection(self, endpoint, remote_endpoint): + self.wait_result(endpoint) - # check if connection was successful on both sides - self.wait_result(self.endpoint1) - self.wait_result(self.endpoint2) - - self.info("Provisioning finished") - - self.set_provisioned() + def terminate_connection(self, endpoint, remote_endpoint): + pid, ppid = self._pids[endpoint] - def do_deploy(self): - if (not self.endpoint1 or self.endpoint1.state < ResourceState.READY) or \ - (not self.endpoint2 or self.endpoint2.state < ResourceState.READY): - self.ec.schedule(reschedule_delay, self.deploy) - else: - self.do_discover() - self.do_provision() - - self.set_ready() - - def do_start(self): - if self.state == ResourceState.READY: - command = self.get("command") - self.info("Starting command '%s'" % command) - - self.set_started() - else: - msg = " Failed to execute command '%s'" % command - self.error(msg, out, err) - raise RuntimeError, msg + if pid and ppid: + (out, err), proc = endpoint.node.kill(pid, ppid, + sudo = True) - def do_stop(self): - """ Stops application execution - """ - if self.state == ResourceState.STARTED: - self.info("Stopping tunnel") - - # Only try to kill the process if the pid and ppid - # were retrieved - if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: - (out1, err1), proc1 = self.endpoint1.node.kill(self._pid1, - self._ppid1, sudo = True) - (out2, err2), proc2 = self.endpoint2.node.kill(self._pid2, - self._ppid2, sudo = True) - - if (proc1.poll() and err1) or (proc2.poll() and err2): - # check if execution errors occurred - msg = " Failed to STOP tunnel" - self.error(msg, err1, err2) - raise RuntimeError, msg - - self.set_stopped() - - @property - def state(self): - """ Returns the state of the application - """ - if self._state == ResourceState.STARTED: - # In order to avoid overwhelming the remote host and - # the local processor with too many ssh queries, the state is only - # requested every 'state_check_delay' seconds. - state_check_delay = 0.5 - if tdiffsec(tnow(), self._last_state_check) > state_check_delay: - if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: - # Make sure the process is still running in background - # No execution errors occurred. Make sure the background - # process with the recorded pid is still running. - status1 = self.endpoint1.node.status(self._pid1, self._ppid1) - status2 = self.endpoint2.node.status(self._pid2, self._ppid2) - - if status1 == ProcStatus.FINISHED and \ - status2 == ProcStatus.FINISHED: - - # check if execution errors occurred - (out1, err1), proc1 = self.endpoint1.node.check_errors( - self.run_home(self.endpoint1)) - - (out2, err2), proc2 = self.endpoint2.node.check_errors( - self.run_home(self.endpoint2)) - - if err1 or err2: - msg = "Error occurred in tunnel" - self.error(msg, err1, err2) - self.fail() - else: - self.set_stopped() - - self._last_state_check = tnow() - - return self._state + # check if execution errors occurred + if proc.poll() and err: + msg = " Failed to STOP tunnel" + self.error(msg, out, err) + raise RuntimeError, msg + + def check_state_connection(self): + # Make sure the process is still running in background + # No execution errors occurred. Make sure the background + # process with the recorded pid is still running. + pid1, ppid1 = self._pids[self.endpoint1] + pid2, ppid2 = self._pids[self.endpoint2] + + status1 = self.endpoint1.node.status(pid1, ppid1) + status2 = self.endpoint2.node.status(pid2, ppid2) + + if status1 == ProcStatus.FINISHED and \ + status2 == ProcStatus.FINISHED: + + # check if execution errors occurred + (out1, err1), proc1 = self.endpoint1.node.check_errors( + self.run_home(self.endpoint1)) + + (out2, err2), proc2 = self.endpoint2.node.check_errors( + self.run_home(self.endpoint2)) + + if err1 or err2: + msg = "Error occurred in tunnel" + self.error(msg, err1, err2) + self.fail() + else: + self.set_stopped() def wait_local_port(self, endpoint): """ Waits until the local_port file for the endpoint is generated, @@ -313,7 +218,4 @@ class UdpTunnel(LinuxApplication): text = True, overwrite = False) - def valid_connection(self, guid): - # TODO: Validate! - return True diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-create.py b/src/nepi/resources/planetlab/scripts/pl-vif-create.py index b1ef4f35..7b2a8e0d 100644 --- a/src/nepi/resources/planetlab/scripts/pl-vif-create.py +++ b/src/nepi/resources/planetlab/scripts/pl-vif-create.py @@ -22,9 +22,7 @@ import errno import passfd import socket import vsys -from optparse import OptionParser, SUPPRESS_HELP - -# TODO: GRE OPTION!! CONFIGURE THE VIF-UP IN GRE MODE!! +from optparse import OptionParser STOP_MSG = "STOP" PASSFD_MSG = "PASSFD" @@ -87,7 +85,8 @@ def passfd_action(fd, args): def get_options(): usage = ("usage: %prog -t -a -n " - "-s -p -f -S ") + "-s -p -q -f " + "-S ") parser = OptionParser(usage = usage) @@ -112,8 +111,13 @@ def get_options(): help = "Peer end point for the interface ", default = None, type="str") - parser.add_option("-f", "--if-name-file", dest="if_name_file", - help = "File to store the interface name assigned by the OS", + parser.add_option("-q", "--txqueuelen", dest="txqueuelen", + help = "Size of transmision queue. Defaults to 0.", + default = 0, + type="int") + + parser.add_option("-f", "--vif-name-file", dest="vif_name_file", + help = "File to store the virtual interface name assigned by the OS", default = "if_name", type="str") parser.add_option("-S", "--socket-name", dest="socket_name", @@ -126,20 +130,23 @@ def get_options(): if options.vif_type and options.vif_type == "IFF_TUN": vif_type = vsys.IFF_TUN - return (vif_type, options.ip4_address, options.net_prefix, options.snat, - options.pointopoint, options.if_name_file, options.socket_name) + return (vif_type, options.ip4_address, options.net_prefix, + options.snat, options.pointopoint, options.txqueuelen, + options.vif_name_file, options.socket_name) if __name__ == '__main__': - (vif_type, ip4_address, net_prefix, snat, pointopoint, - if_name_file, socket_name) = get_options() + (vif_type, ip4_address, net_prefix, snat, pointopoint, + txqueuelen, vif_name_file, socket_name) = get_options() - (fd, if_name) = vsys.fd_tuntap(vif_type) - vsys.vif_up(if_name, ip4_address, net_prefix, snat, pointopoint) - + (fd, vif_name) = vsys.fd_tuntap(vif_type) + + vsys.vif_up(vif_name, ip4_address, net_prefix, snat = snat, + pointopoint = pointopoint, txqueuelen = txqueuelen) + # Saving interface name to 'if_name_file - f = open(if_name_file, 'w') - f.write(if_name) + f = open(vif_name_file, 'w') + f.write(vif_name) f.close() # create unix socket to receive instructions diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-stop.py b/src/nepi/resources/planetlab/scripts/pl-vif-down.py similarity index 50% rename from src/nepi/resources/planetlab/scripts/pl-vif-stop.py rename to src/nepi/resources/planetlab/scripts/pl-vif-down.py index 3176a877..0047be4e 100644 --- a/src/nepi/resources/planetlab/scripts/pl-vif-stop.py +++ b/src/nepi/resources/planetlab/scripts/pl-vif-down.py @@ -18,39 +18,53 @@ # Author: Alina Quereilhac import base64 -import errno -import vsys import socket -from optparse import OptionParser, SUPPRESS_HELP +import vsys + +from optparse import OptionParser STOP_MSG = "STOP" def get_options(): - usage = ("usage: %prog -S ") + usage = ("usage: %prog -N -D -S ") parser = OptionParser(usage = usage) + parser.add_option("-N", "--vif-name", dest="vif_name", + help = "The name of the virtual interface, or a " + "unique numeric identifier to name the interface " + "if GRE mode is used.", + type="str") + + parser.add_option("-D", "--delete", dest="delete", + action="store_true", + default = False, + help="Removes virtual interface if GRE mode was used") + parser.add_option("-S", "--socket-name", dest="socket_name", help = "Name for the unix socket used to interact with this process", default = "tap.sock", type="str") (options, args) = parser.parse_args() - return (options.socket_name) + return (options.vif_name, options.delete, options.socket_name) if __name__ == '__main__': - (socket_name) = get_options() - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.connect(socket_name) - encoded = base64.b64encode(STOP_MSG) - sock.send("%s\n" % encoded) - reply = sock.recv(1024) - reply = base64.b64decode(reply) - - print reply + (vif_name, delete, socket_name) = get_options() + # If a socket name is sent, send the STOP message and wait for a reply + if socket_name: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(socket_name) + encoded = base64.b64encode(STOP_MSG) + sock.send("%s\n" % encoded) + reply = sock.recv(1024) + reply = base64.b64decode(reply) + print reply + # Else, use the vsys interface to set the virtual interface down + elif vif_name: + vsys.vif_down(vif_name, delete = delete) diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py b/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py index c59bf05a..fa16879b 100644 --- a/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py +++ b/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py @@ -8,7 +8,7 @@ import time import tunchannel import vsys -from optparse import OptionParser, SUPPRESS_HELP +from optparse import OptionParser PASSFD_MSG = "PASSFD" @@ -146,7 +146,7 @@ if __name__ == '__main__': # xxx: There seems to be a weird behavior where # even if the file exists and had the port number, # the read operation returns empty string! - # Maybe a raise condition? + # Maybe a race condition? for i in xrange(10): f = open(remote_port_file, 'r') remote_port = f.read() diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-up.py b/src/nepi/resources/planetlab/scripts/pl-vif-up.py new file mode 100644 index 00000000..6805e31a --- /dev/null +++ b/src/nepi/resources/planetlab/scripts/pl-vif-up.py @@ -0,0 +1,109 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +import vsys + +from optparse import OptionParser + +def get_options(): + usage = ("usage: %prog -N -t -a " + "-n -s -p " + "-q -g -G ") + + parser = OptionParser(usage = usage) + + parser.add_option("-N", "--vif-name", dest="vif_name", + help = "The name of the virtual interface, or a " + "unique numeric identifier to name the interface " + "if GRE mode is used.", + type="str") + + parser.add_option("-t", "--vif-type", dest="vif_type", + help = "Virtual interface type. Either IFF_TAP or IFF_TUN. " + "Defaults to IFF_TAP. ", type="str") + + parser.add_option("-a", "--ip4-address", dest="ip4_address", + help = "IPv4 address to assign to interface. It must belong to the " + "network segment owned by the slice, given by the vsys_vnet tag. ", + type="str") + + parser.add_option("-n", "--net-prefix", dest="net_prefix", + help = "IPv4 network prefix for the interface. It must be the one " + "given by the slice's vsys_vnet tag. ", + type="int") + + parser.add_option("-s", "--snat", dest="snat", + action="store_true", + default = False, + help="Enable SNAT for the interface") + + parser.add_option("-p", "--pointopoint", dest="pointopoint", + help = "Peer end point for the interface. ", + default = None, + type="str") + + parser.add_option("-q", "--txqueuelen", dest="txqueuelen", + help = "Size of transmision queue. Defaults to 0.", + default = 0, + type="int") + + parser.add_option("-g", "--gre-key", dest="gre_key", + help = "When set, enables GRE mode with the corresponding GRE key.", + default = None, + type="str") + + parser.add_option("-G", "--gre-remote", dest="gre_remote", + help = "Remote endpoint (public IP) for the GRE tunnel.", + default = None, + type="str") + + (options, args) = parser.parse_args() + + vif_type = vsys.IFF_TAP + if options.vif_type and options.vif_type == "IFF_TUN": + vif_type = vsys.IFF_TUN + + return (options.vif_name, vif_type, options.ip4_address, + options.net_prefix, options.snat, options.pointopoint, + options.txqueuelen, options.gre_key, options.gre_remote) + +if __name__ == '__main__': + + (vif_name, vif_type, ip4_address, net_prefix, snat, pointopoint, + pointopoint, txqueuelen, gre_key, gre_remote) = get_options() + + if (gre_key): + import pwd + import getpass + + slicename = getpass.getuser() + sliceid = pwd.getpwnam(slicename).pw_uid + + if vif_type == vsys.IFF_TAP: + vif_prefix = "tap" + else: + vif_prefix = "tun" + + # if_name should be a unique numeric vif id + vif_name = "%s%s-%d" % (vif_prefix, sliceid, vif_name) + + vsys.vif_up(vif_name, ip4_address, net_prefix, snat = snat, + pointopoint = pointopoint, txqueuelen = txqueuelen, + gre_key = gre_key, gre_remote = gre_remote) + diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index 2acc3f43..fd4df486 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -25,6 +25,7 @@ from nepi.resources.planetlab.node import PlanetlabNode from nepi.util.timefuncs import tnow, tdiffsec import os +import socket import time # TODO: @@ -93,40 +94,48 @@ class PlanetlabTap(LinuxApplication): return None def upload_sources(self): - # upload vif-creation python script + scripts = [] + + # vif-creation python script pl_vif_create = os.path.join(os.path.dirname(__file__), "scripts", "pl-vif-create.py") - self.node.upload(pl_vif_create, - os.path.join(self.node.src_dir, "pl-vif-create.py"), - overwrite = False) - - # upload vif-stop python script - pl_vif_stop = os.path.join(os.path.dirname(__file__), "scripts", - "pl-vif-stop.py") + scripts.append(pl_vif_create) + + # vif-up python script + pl_vif_up = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-up.py") + + scripts.append(pl_vif_up) - self.node.upload(pl_vif_stop, - os.path.join(self.node.src_dir, "pl-vif-stop.py"), - overwrite = False) + # vif-down python script + pl_vif_down = os.path.join(os.path.dirname(__file__), "scripts", + "pl-vif-down.py") + + scripts.append(pl_vif_down) - # upload vif-connect python script + # udp-connect python script pl_vif_connect = os.path.join(os.path.dirname(__file__), "scripts", "pl-vif-udp-connect.py") + + scripts.append(pl_vif_connect) - self.node.upload(pl_vif_connect, - os.path.join(self.node.src_dir, "pl-vif-udp-connect.py"), - overwrite = False) - - # upload tun-connect python script + # tunnel creation python script tunchannel = os.path.join(os.path.dirname(__file__), "..", "linux", "scripts", "tunchannel.py") - self.node.upload(tunchannel, - os.path.join(self.node.src_dir, "tunchannel.py"), + scripts.append(tunchannel) + + # Upload scripts + scripts = ";".join(scripts) + + self.node.upload(scripts, + os.path.join(self.node.src_dir), overwrite = False) # upload stop.sh script stop_command = self.replace_paths(self._stop_command) + self.node.upload(stop_command, os.path.join(self.app_home, "stop.sh"), text = True, @@ -152,8 +161,8 @@ class PlanetlabTap(LinuxApplication): # After creating the TAP, the pl-vif-create.py script # will write the name of the TAP to a file. We wait until # we can read the interface name from the file. - if_name = self.wait_if_name() - self.set("deviceName", if_name) + vif_name = self.wait_vif_name() + self.set("deviceName", vif_name) def do_deploy(self): if not self.node or self.node.state < ResourceState.PROVISIONED: @@ -194,6 +203,10 @@ class PlanetlabTap(LinuxApplication): (out, err), proc = self.execute_command(command, blocking = True) + if err: + msg = " Failed to stop command '%s' " % command + self.error(msg, out, err) + self.set_stopped() @property @@ -217,8 +230,9 @@ class PlanetlabTap(LinuxApplication): def do_release(self): # Node needs to wait until all associated RMs are released # to be released - from nepi.resources.linux.udptunnel import UdpTunnel - rms = self.get_connected(UdpTunnel.get_rtype()) + from nepi.resources.linux.tunnel import LinuxTunnel + rms = self.get_connected(LinuxTunnel.get_rtype()) + for rm in rms: if rm.state < ResourceState.STOPPED: self.ec.schedule(reschedule_delay, self.release) @@ -226,14 +240,14 @@ class PlanetlabTap(LinuxApplication): super(PlanetlabTap, self).do_release() - def wait_if_name(self): - """ Waits until the if_name file for the command is generated, - and returns the if_name for the device """ - if_name = None + def wait_vif_name(self): + """ Waits until the vif_name file for the command is generated, + and returns the vif_name for the device """ + vif_name = None delay = 0.5 for i in xrange(20): - (out, err), proc = self.node.check_output(self.run_home, "if_name") + (out, err), proc = self.node.check_output(self.run_home, "vif_name") if proc.poll() > 0: (out, err), proc = self.node.check_errors(self.run_home) @@ -242,21 +256,34 @@ class PlanetlabTap(LinuxApplication): raise RuntimeError, err if out: - if_name = out.strip() + vif_name = out.strip() break else: time.sleep(delay) delay = delay * 1.5 else: - msg = "Couldn't retrieve if_name" + msg = "Couldn't retrieve vif_name" self.error(msg, out, err) raise RuntimeError, msg - return if_name + return vif_name + + def udp_connect_command(self, remote_endpoint, connection_run_home, + cipher, cipher_key, bwlimit, txqueuelen): + + # Generate UDP connect command + remote_ip = socket.gethostbyname( + remote_endpoint.node.get("hostname")) + + local_port_file = os.path.join(connection_run_home, + "local_port") + + remote_port_file = os.path.join(connection_run_home, + "remote_port") + + ret_file = os.path.join(connection_run_home, + "ret_file") - def udp_connect_command(self, remote_ip, local_port_file, - remote_port_file, ret_file, cipher, cipher_key, - bwlimit, txqueuelen): command = ["sudo -S "] command.append("PYTHONPATH=$PYTHONPATH:${SRC}") command.append("python ${SRC}/pl-vif-udp-connect.py") @@ -277,6 +304,8 @@ class PlanetlabTap(LinuxApplication): command = " ".join(command) command = self.replace_paths(command) + + # TODO: RECONFIGUTE THE TAP WITH THE INFORMATION ENDPOINT! return command @property @@ -286,10 +315,12 @@ class PlanetlabTap(LinuxApplication): command.append("-t %s" % self.vif_type) command.append("-a %s" % self.get("ip4")) command.append("-n %d" % self.get("prefix4")) - command.append("-f %s " % self.if_name_file) + command.append("-f %s " % self.vif_name_file) command.append("-S %s " % self.sock_name) + if self.get("snat") == True: command.append("-s") + if self.get("pointopoint"): command.append("-p %s" % self.get("pointopoint")) @@ -297,7 +328,7 @@ class PlanetlabTap(LinuxApplication): @property def _stop_command(self): - command = ["sudo -S python ${SRC}/pl-vif-stop.py"] + command = ["sudo -S python ${SRC}/pl-vif-down.py"] command.append("-S %s " % self.sock_name) return " ".join(command) @@ -307,8 +338,8 @@ class PlanetlabTap(LinuxApplication): return "IFF_TAP" @property - def if_name_file(self): - return os.path.join(self.run_home, "if_name") + def vif_name_file(self): + return os.path.join(self.run_home, "vif_name") @property def sock_name(self): diff --git a/test/resources/planetlab/gretunnel.py b/test/resources/planetlab/gretunnel.py new file mode 100644 index 00000000..2cde3aec --- /dev/null +++ b/test/resources/planetlab/gretunnel.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Alina Quereilhac + +from nepi.execution.ec import ExperimentController + +from test_utils import skipIfAnyNotAliveWithIdentity + +import os +import time +import unittest + +class GRETunnelTestCase(unittest.TestCase): + def setUp(self): + #self.host1 = "nepi2.pl.sophia.inria.fr" + #self.host2 = "nepi5.pl.sophia.inria.fr" + self.host1 = "planetlab1.informatik.uni-erlangen.de" + self.host2 = "planetlab1.informatik.uni-goettingen.de" + self.user = "inria_nepi" + self.identity = "%s/.ssh/id_rsa_planetlab" % (os.environ['HOME']) + #self.netblock = "192.168.1" + self.netblock = "192.168.3" + + @skipIfAnyNotAliveWithIdentity + def t_tap_gre_tunnel(self, user1, host1, identity1, user2, host2, + identity2): + + ec = ExperimentController(exp_id = "test-tap-gre-tunnel") + + node1 = ec.register_resource("PlanetlabNode") + ec.set(node1, "hostname", host1) + ec.set(node1, "username", user1) + ec.set(node1, "identity", identity1) + ec.set(node1, "cleanHome", True) + ec.set(node1, "cleanProcesses", True) + + tap1 = ec.register_resource("PlanetlabTap") + ec.set(tap1, "ip4", "%s.1" % self.netblock) + ec.set(tap1, "prefix4", 24) + ec.register_connection(tap1, node1) + + node2 = ec.register_resource("PlanetlabNode") + ec.set(node2, "hostname", host2) + ec.set(node2, "username", user2) + ec.set(node2, "identity", identity2) + ec.set(node2, "cleanHome", True) + ec.set(node2, "cleanProcesses", True) + + tap2 = ec.register_resource("PlanetlabTap") + ec.set(tap2, "ip4", "%s.2" % self.netblock) + ec.set(tap2, "prefix4", 24) + ec.register_connection(tap2, node2) + + gretun = ec.register_resource("GRETunnel") + ec.register_connection(tap1, gretun) + ec.register_connection(tap2, gretun) + + app = ec.register_resource("LinuxApplication") + cmd = "ping -c3 %s.2" % self.netblock + ec.set(app, "command", cmd) + ec.register_connection(app, node1) + + ec.deploy() + + ec.wait_finished(app) + + ping = ec.trace(app, 'stdout') + expected = """3 packets transmitted, 3 received, 0% packet loss""" + self.assertTrue(ping.find(expected) > -1) + + if_name = ec.get(tap1, "deviceName") + self.assertTrue(if_name.startswith("tap")) + + if_name = ec.get(tap2, "deviceName") + self.assertTrue(if_name.startswith("tap")) + + ec.shutdown() + + @skipIfAnyNotAliveWithIdentity + def t_tun_udp_tunnel(self, user1, host1, identity1, user2, host2, + identity2): + + ec = ExperimentController(exp_id = "test-tun-gre-tunnel") + + node1 = ec.register_resource("PlanetlabNode") + ec.set(node1, "hostname", host1) + ec.set(node1, "username", user1) + ec.set(node1, "identity", identity1) + ec.set(node1, "cleanHome", True) + ec.set(node1, "cleanProcesses", True) + + tun1 = ec.register_resource("PlanetlabTun") + ec.set(tun1, "ip4", "%s.1" % self.netblock) + ec.set(tun1, "pointopoint", "%s.2" % self.netblock) + ec.set(tun1, "prefix4", 24) + ec.register_connection(tun1, node1) + + node2 = ec.register_resource("PlanetlabNode") + ec.set(node2, "hostname", host2) + ec.set(node2, "username", user2) + ec.set(node2, "identity", identity2) + ec.set(node2, "cleanHome", True) + ec.set(node2, "cleanProcesses", True) + + tun2 = ec.register_resource("PlanetlabTun") + ec.set(tun2, "ip4", "%s.2" % self.netblock) + ec.set(tun2, "pointopoint", "%s.1" % self.netblock ) + ec.set(tun2, "prefix4", 24) + ec.register_connection(tun2, node2) + + udptun = ec.register_resource("UdpTunnel") + ec.register_connection(tun1, udptun) + ec.register_connection(tun2, udptun) + + app = ec.register_resource("LinuxApplication") + cmd = "ping -c3 %s.2" % self.netblock + ec.set(app, "command", cmd) + ec.register_connection(app, node1) + + ec.deploy() + + ec.wait_finished(app) + + ping = ec.trace(app, 'stdout') + expected = """3 packets transmitted, 3 received, 0% packet loss""" + self.assertTrue(ping.find(expected) > -1) + + if_name = ec.get(tun1, "deviceName") + self.assertTrue(if_name.startswith("tun")) + + if_name = ec.get(tun2, "deviceName") + self.assertTrue(if_name.startswith("tun")) + + ec.shutdown() + + def test_tap_udp_tunnel(self): + self.t_tap_udp_tunnel(self.user, self.host1, self.identity, + self.user, self.host2, self.identity) + + def test_tun_udp_tunnel(self): + self.t_tun_udp_tunnel(self.user, self.host1, self.identity, + self.user, self.host2, self.identity) + +if __name__ == '__main__': + unittest.main() + diff --git a/test/resources/planetlab/udptunnel.py b/test/resources/planetlab/udptunnel.py index 9dd7d5b4..6b767fa3 100755 --- a/test/resources/planetlab/udptunnel.py +++ b/test/resources/planetlab/udptunnel.py @@ -69,7 +69,7 @@ class UdpTunnelTestCase(unittest.TestCase): ec.set(tap2, "prefix4", 24) ec.register_connection(tap2, node2) - udptun = ec.register_resource("UdpTunnel") + udptun = ec.register_resource("LinuxUdpTunnel") ec.register_connection(tap1, udptun) ec.register_connection(tap2, udptun) @@ -86,18 +86,18 @@ class UdpTunnelTestCase(unittest.TestCase): expected = """3 packets transmitted, 3 received, 0% packet loss""" self.assertTrue(ping.find(expected) > -1) - if_name = ec.get(tap1, "deviceName") - self.assertTrue(if_name.startswith("tap")) + vif_name = ec.get(tap1, "deviceName") + self.assertTrue(vif_name.startswith("tap")) - if_name = ec.get(tap2, "deviceName") - self.assertTrue(if_name.startswith("tap")) + vif_name = ec.get(tap2, "deviceName") + self.assertTrue(vif_name.startswith("tap")) ec.shutdown() @skipIfAnyNotAliveWithIdentity def t_tun_udp_tunnel(self, user1, host1, identity1, user2, host2, identity2): - ec = ExperimentController(exp_id = "test-tap-udp-tunnel") + ec = ExperimentController(exp_id = "test-tun-udp-tunnel") node1 = ec.register_resource("PlanetlabNode") ec.set(node1, "hostname", host1) @@ -125,7 +125,7 @@ class UdpTunnelTestCase(unittest.TestCase): ec.set(tun2, "prefix4", 24) ec.register_connection(tun2, node2) - udptun = ec.register_resource("UdpTunnel") + udptun = ec.register_resource("LinuxUdpTunnel") ec.register_connection(tun1, udptun) ec.register_connection(tun2, udptun) @@ -142,11 +142,11 @@ class UdpTunnelTestCase(unittest.TestCase): expected = """3 packets transmitted, 3 received, 0% packet loss""" self.assertTrue(ping.find(expected) > -1) - if_name = ec.get(tun1, "deviceName") - self.assertTrue(if_name.startswith("tun")) + vif_name = ec.get(tun1, "deviceName") + self.assertTrue(vif_name.startswith("tun")) - if_name = ec.get(tun2, "deviceName") - self.assertTrue(if_name.startswith("tun")) + vif_name = ec.get(tun2, "deviceName") + self.assertTrue(vif_name.startswith("tun")) ec.shutdown()