From: Alina Quereilhac Date: Tue, 16 Jul 2013 05:18:06 +0000 (-0700) Subject: Improvements to UdpTunnel X-Git-Tag: nepi-3.0.0~71 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=8b192f1c0cc55e5bc847ce40436ab55ddce0585c;p=nepi.git Improvements to UdpTunnel --- diff --git a/src/nepi/resources/linux/application.py b/src/nepi/resources/linux/application.py index a55fab54..e874bcfd 100644 --- a/src/nepi/resources/linux/application.py +++ b/src/nepi/resources/linux/application.py @@ -654,21 +654,23 @@ class LinuxApplication(ResourceManager): # requested every 'state_check_delay' seconds. state_check_delay = 0.5 if tdiffsec(tnow(), self._last_state_check) > state_check_delay: - # check if execution errors occurred - (out, err), proc = self.node.check_errors(self.run_home) - - if err: - msg = " Failed to execute command '%s'" % self.get("command") - self.error(msg, out, err) - self.fail() - - elif self.pid and self.ppid: - # No execution errors occurred. Make sure the background - # process with the recorded pid is still running. + if self.pid and self.ppid: + # Make sure the process is still running in background status = self.node.status(self.pid, self.ppid) if status == ProcStatus.FINISHED: - self._state = ResourceState.FINISHED + # If the program finished, check if execution + # errors occurred + (out, err), proc = self.node.check_errors( + self.run_home) + + if err: + msg = " Failed to execute command '%s'" % \ + self.get("command") + self.error(msg, out, err) + self.fail() + else: + self._state = ResourceState.FINISHED self._last_state_check = tnow() diff --git a/src/nepi/resources/linux/node.py b/src/nepi/resources/linux/node.py index 6157b71d..82ccaca1 100644 --- a/src/nepi/resources/linux/node.py +++ b/src/nepi/resources/linux/node.py @@ -345,6 +345,14 @@ class LinuxNode(ResourceManager): super(LinuxNode, self).deploy() def release(self): + # Node needs to wait until all associated RMs are released + # to be released + rms = self.get_connected() + for rm in rms: + if rm.state < ResourceState.STOPPED: + self.ec.schedule(reschedule_delay, self.release) + return + tear_down = self.get("tearDown") if tear_down: self.execute(tear_down) diff --git a/src/nepi/resources/linux/udptunnel.py b/src/nepi/resources/linux/udptunnel.py index d240416b..7b897293 100644 --- a/src/nepi/resources/linux/udptunnel.py +++ b/src/nepi/resources/linux/udptunnel.py @@ -21,6 +21,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \ reschedule_delay from nepi.resources.linux.application import LinuxApplication +from nepi.util.sshfuncs import ProcStatus from nepi.util.timefuncs import tnow, tdiffsec import os @@ -31,6 +32,39 @@ import time class UdpTunnel(LinuxApplication): _rtype = "UdpTunnel" + @classmethod + def _register_attributes(cls): + cipher = Attribute("cipher", + "Cipher to encript communication. " + "One of PLAIN, AES, Blowfish, DES, DES3. ", + default = None, + allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"], + type = Types.Enumerate, + flags = Flags.ExecReadOnly) + + cipher_key = Attribute("cipherKey", + "Specify a symmetric encryption key with which to protect " + "packets across the tunnel. python-crypto must be installed " + "on the system." , + flags = Flags.ExecReadOnly) + + txqueuelen = Attribute("txQueueLen", + "Specifies the interface's transmission queue length. " + "Defaults to 1000. ", + type = Types.Integer, + flags = Flags.ExecReadOnly) + + bwlimit = Attribute("bwLimit", + "Specifies the interface's emulated bandwidth in bytes " + "per second.", + type = Types.Integer, + flags = Flags.ExecReadOnly) + + cls._register_attribute(cipher) + cls._register_attribute(cipher_key) + cls._register_attribute(txqueuelen) + cls._register_attribute(bwlimit) + def __init__(self, ec, guid): super(UdpTunnel, self).__init__(ec, guid) self._home = "udp-tunnel-%s" % self.guid @@ -81,9 +115,13 @@ class UdpTunnel(LinuxApplication): "remote_port") ret_file = os.path.join(self.run_home(endpoint), "ret_file") + cipher = self.get("cipher") + cipher_key = self.get("cipherKey") + bwlimit = self.get("bwLimit") + txqueuelen = self.get("txQueueLen") udp_connect_command = endpoint.udp_connect_command( remote_ip, local_port_file, remote_port_file, - ret_file) + ret_file, cipher, cipher_key, bwlimit, txqueuelen) # upload command to connect.sh script shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh") @@ -182,20 +220,8 @@ class UdpTunnel(LinuxApplication): self._state = ResourceState.FAILED raise RuntimeError, msg - def stop(self): - command = self.get('command') or '' - state = self.state - - if state == ResourceState.STARTED: - self.info("Stopping command '%s'" % command) - - command = "bash %s" % os.path.join(self.app_home, "stop.sh") - (out, err), proc = self.execute_command(command, - blocking = True) - - self._stop_time = tnow() - self._state = ResourceState.STOPPED - + # XXX: Leaves process unkilled!! + # Implement another mechanism to kill the tunnel! def stop(self): """ Stops application execution """ @@ -214,7 +240,7 @@ class UdpTunnel(LinuxApplication): if err1 or err2 or pro1.poll() or proc2.poll(): # check if execution errors occurred msg = " Failed to STOP tunnel" - self.error(msg, out, err) + self.error(msg, err1, err2) self.fail() stopped = False @@ -232,27 +258,29 @@ class UdpTunnel(LinuxApplication): # requested every 'state_check_delay' seconds. state_check_delay = 0.5 if tdiffsec(tnow(), self._last_state_check) > state_check_delay: - # check if execution errors occurred - (out1, err1), proc1 = self.endpoint1.node.check_errors( - self.run_home(self.endpoint1)) - - (out2, err2), proc2 = self.endpoint2.node.check_errors( - self.run_home(self.endpoint2)) - - if err1 or err2: - msg = " Failed to connect endpoints " - self.error(msg, err1, err2) - self.fail() - - elif self._pid1 and self._ppid1 and self._pid2 and self._ppid2: + if self._pid1 and self._ppid1 and self._pid2 and self._ppid2: + # Make sure the process is still running in background # No execution errors occurred. Make sure the background # process with the recorded pid is still running. - status1 = self.node.status(self._pid1, self._ppid1) - status2 = self.node.status(self._pid2, self._ppid2) + status1 = self.endpoint1.node.status(self._pid1, self._ppid1) + status2 = self.endpoint2.node.status(self._pid2, self._ppid2) if status1 == ProcStatus.FINISHED and \ - satus2 == ProcStatus.FINISHED: - self._state = ResourceState.FINISHED + status2 == ProcStatus.FINISHED: + + # check if execution errors occurred + (out1, err1), proc1 = self.endpoint1.node.check_errors( + self.run_home(self.endpoint1)) + + (out2, err2), proc2 = self.endpoint2.node.check_errors( + self.run_home(self.endpoint2)) + + if err1 or err2: + msg = "Error occurred in tunnel" + self.error(msg, err1, err2) + self.fail() + else: + self._state = ResourceState.FINISHED self._last_state_check = tnow() diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index 9d1b2954..b441397f 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -176,9 +176,3 @@ class PlanetlabNode(LinuxNode): # TODO: Validate! return True - def blacklist(self): - # TODO!!!! - self.warn(" Blacklisting malfunctioning node ") - #import util - #util.appendBlacklist(self.hostname) - diff --git a/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py b/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py index 9b0b514e..c59bf05a 100644 --- a/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py +++ b/src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py @@ -61,8 +61,9 @@ def get_fd(socket_name): def get_options(): usage = ("usage: %prog -t -S " - "-l -r -H " - "-R ") + "-b -c -k -q " + "-l -r -H " + "-R ") parser = OptionParser(usage = usage) @@ -72,6 +73,23 @@ def get_options(): parser.add_option("-S", "--fd-socket-name", dest="fd_socket_name", help = "Name for the unix socket to request the TAP file descriptor", default = "tap.sock", type="str") + + parser.add_option("-b", "--bwlimit", dest="bwlimit", + help = "Specifies the interface's emulated bandwidth in bytes ", + default = None, type="int") + parser.add_option("-q", "--txqueuelen", dest="txqueuelen", + help = "Specifies the interface's transmission queue length. ", + default = 1000, type="int") + parser.add_option("-c", "--cipher", dest="cipher", + help = "Cipher to encript communication. " + "One of PLAIN, AES, Blowfish, DES, DES3. ", + default = None, type="str") + parser.add_option("-k", "--cipher-key", dest="cipher_key", + help = "Specify a symmetric encryption key with which to protect " + "packets across the tunnel. python-crypto must be installed " + "on the system." , + default = None, type="str") + parser.add_option("-l", "--local-port-file", dest="local_port_file", help = "File where to store the local binded UDP port number ", default = "local_port_file", type="str") @@ -92,13 +110,15 @@ def get_options(): vif_type = vsys.IFF_TUN return ( vif_type, options.fd_socket_name, options.local_port_file, - options.remote_port_file, options.remote_host, - options.ret_file ) + options.remote_port_file, options.remote_host, options.ret_file, + options.bwlimit, options.cipher, options.cipher_key, + options.txqueuelen ) if __name__ == '__main__': ( vif_type, socket_name, local_port_file, remote_port_file, - remote_host, ret_file ) = get_options() + remote_host, ret_file, bwlimit, cipher, cipher_key, txqueuelen + ) = get_options() # Get the file descriptor of the TAP device from the process # that created it @@ -120,10 +140,22 @@ if __name__ == '__main__': while not os.path.exists(remote_port_file): time.sleep(2) + remote_port = '' # Read remote port from file - f = open(remote_port_file, 'r') - remote_port = f.read() - f.close() + # Try until something is read... + # xxx: There seems to be a weird behavior where + # even if the file exists and had the port number, + # the read operation returns empty string! + # Maybe a raise condition? + for i in xrange(10): + f = open(remote_port_file, 'r') + remote_port = f.read() + f.close() + + if remote_port: + break + + time.sleep(2) remote_port = remote_port.strip() remote_port = int(remote_port) @@ -140,17 +172,17 @@ if __name__ == '__main__': f.close() # Establish tunnel - # TODO: ADD parameters tunqueue, tunkqueue, cipher_key tunchannel.tun_fwd(tun, remote, - # Planetlab TAP devices add PI headers - with_pi = True, + with_pi = True, # Planetlab TAP devices add PI headers ether_mode = (vif_type == vsys.IFF_TAP), - cipher_key = None, udp = True, + cipher_key = cipher_key, + cipher = cipher, TERMINATE = TERMINATE, SUSPEND = SUSPEND, - tunqueue = 1000, + tunqueue = txqueuelen, tunkqueue = 500, + bwlimit = bwlimit ) diff --git a/src/nepi/resources/planetlab/tap.py b/src/nepi/resources/planetlab/tap.py index b9691740..9fdd5f08 100644 --- a/src/nepi/resources/planetlab/tap.py +++ b/src/nepi/resources/planetlab/tap.py @@ -238,7 +238,8 @@ class PlanetlabTap(LinuxApplication): return if_name def udp_connect_command(self, remote_ip, local_port_file, - remote_port_file, ret_file): + remote_port_file, ret_file, cipher, cipher_key, + bwlimit, txqueuelen): command = ["sudo -S "] command.append("PYTHONPATH=$PYTHONPATH:${SRC}") command.append("python ${SRC}/pl-vif-udp-connect.py") @@ -248,6 +249,14 @@ class PlanetlabTap(LinuxApplication): command.append("-r %s " % remote_port_file) command.append("-H %s " % remote_ip) command.append("-R %s " % ret_file) + if cipher: + command.append("-c %s " % cipher) + if cipher_key: + command.append("-k %s " % cipher_key) + if txqueuelen: + command.append("-q %s " % txqueuelen) + if bwlimit: + command.append("-b %s " % bwlimit) command = " ".join(command) command = self.replace_paths(command) diff --git a/test/resources/planetlab/udptunnel.py b/test/resources/planetlab/udptunnel.py index 41351485..6f45ae8d 100755 --- a/test/resources/planetlab/udptunnel.py +++ b/test/resources/planetlab/udptunnel.py @@ -84,12 +84,68 @@ class UdpTunnelTestCase(unittest.TestCase): if_name = ec.get(tap2, "deviceName") self.assertTrue(if_name.startswith("tap")) + ec.shutdown() + + @skipIfAnyNotAlive + def t_tun_udp_tunnel(self, user, host1, host2): + + ec = ExperimentController(exp_id = "test-tap-udp-tunnel") + + node1 = ec.register_resource("PlanetlabNode") + ec.set(node1, "hostname", host1) + ec.set(node1, "username", user) + ec.set(node1, "cleanHome", True) + ec.set(node1, "cleanProcesses", True) + + tun1 = ec.register_resource("PlanetlabTun") + ec.set(tun1, "ip4", "192.168.1.1") + ec.set(tun1, "pointopoint", "192.168.1.2") + ec.set(tun1, "prefix4", 24) + ec.register_connection(tun1, node1) + + node2 = ec.register_resource("PlanetlabNode") + ec.set(node2, "hostname", host2) + ec.set(node2, "username", user) + ec.set(node2, "cleanHome", True) + ec.set(node2, "cleanProcesses", True) + + tun2 = ec.register_resource("PlanetlabTun") + ec.set(tun2, "ip4", "192.168.1.2") + ec.set(tun2, "pointopoint", "192.168.1.1") + ec.set(tun2, "prefix4", 24) + ec.register_connection(tun2, node2) + + udptun = ec.register_resource("UdpTunnel") + ec.register_connection(tun1, udptun) + ec.register_connection(tun2, udptun) + + app = ec.register_resource("LinuxApplication") + cmd = "ping -c3 192.168.1.2" + ec.set(app, "command", cmd) + ec.register_connection(app, node1) + + ec.deploy() + + ec.wait_finished(app) + + ping = ec.trace(app, 'stdout') + expected = """3 packets transmitted, 3 received, 0% packet loss""" + self.assertTrue(ping.find(expected) > -1) + + if_name = ec.get(tun1, "deviceName") + self.assertTrue(if_name.startswith("tun")) + + if_name = ec.get(tun2, "deviceName") + self.assertTrue(if_name.startswith("tun")) ec.shutdown() def test_tap_udp_tunnel(self): self.t_tap_udp_tunnel(self.user, self.host1, self.host2) + def test_tun_udp_tunnel(self): + self.t_tun_udp_tunnel(self.user, self.host1, self.host2) + if __name__ == '__main__': unittest.main()