Improvements to UdpTunnel
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 16 Jul 2013 05:18:06 +0000 (22:18 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 16 Jul 2013 05:18:06 +0000 (22:18 -0700)
src/nepi/resources/linux/application.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/udptunnel.py
src/nepi/resources/planetlab/node.py
src/nepi/resources/planetlab/scripts/pl-vif-udp-connect.py
src/nepi/resources/planetlab/tap.py
test/resources/planetlab/udptunnel.py

index a55fab5..e874bcf 100644 (file)
@@ -654,21 +654,23 @@ class LinuxApplication(ResourceManager):
                 # requested every 'state_check_delay' seconds.
                 state_check_delay = 0.5
                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
-                    # check if execution errors occurred
-                    (out, err), proc = self.node.check_errors(self.run_home)
-
-                    if err:
-                        msg = " Failed to execute command '%s'" % self.get("command")
-                        self.error(msg, out, err)
-                        self.fail()
-
-                    elif self.pid and self.ppid:
-                        # No execution errors occurred. Make sure the background
-                        # process with the recorded pid is still running.
+                    if self.pid and self.ppid:
+                        # Make sure the process is still running in background
                         status = self.node.status(self.pid, self.ppid)
 
                         if status == ProcStatus.FINISHED:
-                            self._state = ResourceState.FINISHED
+                            # If the program finished, check if execution
+                            # errors occurred
+                            (out, err), proc = self.node.check_errors(
+                                    self.run_home)
+
+                            if err:
+                                msg = " Failed to execute command '%s'" % \
+                                        self.get("command")
+                                self.error(msg, out, err)
+                                self.fail()
+                            else:
+                               self._state = ResourceState.FINISHED
 
                     self._last_state_check = tnow()
 
index 6157b71..82ccaca 100644 (file)
@@ -345,6 +345,14 @@ class LinuxNode(ResourceManager):
         super(LinuxNode, self).deploy()
 
     def release(self):
+        # Node needs to wait until all associated RMs are released
+        # to be released
+        rms = self.get_connected()
+        for rm in rms:
+            if rm.state < ResourceState.STOPPED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
+
         tear_down = self.get("tearDown")
         if tear_down:
             self.execute(tear_down)
index d240416..7b89729 100644 (file)
@@ -21,6 +21,7 @@ from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
         reschedule_delay
 from nepi.resources.linux.application import LinuxApplication
+from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
 
 import os
@@ -31,6 +32,39 @@ import time
 class UdpTunnel(LinuxApplication):
     _rtype = "UdpTunnel"
 
+    @classmethod
+    def _register_attributes(cls):
+        cipher = Attribute("cipher",
+               "Cipher to encript communication. "
+                "One of PLAIN, AES, Blowfish, DES, DES3. ",
+                default = None,
+                allowed = ["PLAIN", "AES", "Blowfish", "DES", "DES3"],
+                type = Types.Enumerate, 
+                flags = Flags.ExecReadOnly)
+
+        cipher_key = Attribute("cipherKey",
+                "Specify a symmetric encryption key with which to protect "
+                "packets across the tunnel. python-crypto must be installed "
+                "on the system." ,
+                flags = Flags.ExecReadOnly)
+
+        txqueuelen = Attribute("txQueueLen",
+                "Specifies the interface's transmission queue length. "
+                "Defaults to 1000. ", 
+                type = Types.Integer, 
+                flags = Flags.ExecReadOnly)
+
+        bwlimit = Attribute("bwLimit",
+                "Specifies the interface's emulated bandwidth in bytes "
+                "per second.",
+                type = Types.Integer, 
+                flags = Flags.ExecReadOnly)
+
+        cls._register_attribute(cipher)
+        cls._register_attribute(cipher_key)
+        cls._register_attribute(txqueuelen)
+        cls._register_attribute(bwlimit)
+
     def __init__(self, ec, guid):
         super(UdpTunnel, self).__init__(ec, guid)
         self._home = "udp-tunnel-%s" % self.guid
@@ -81,9 +115,13 @@ class UdpTunnel(LinuxApplication):
                 "remote_port")
         ret_file = os.path.join(self.run_home(endpoint), 
                 "ret_file")
+        cipher = self.get("cipher")
+        cipher_key = self.get("cipherKey")
+        bwlimit = self.get("bwLimit")
+        txqueuelen = self.get("txQueueLen")
         udp_connect_command = endpoint.udp_connect_command(
                 remote_ip, local_port_file, remote_port_file,
-                ret_file)
+                ret_file, cipher, cipher_key, bwlimit, txqueuelen)
 
         # upload command to connect.sh script
         shfile = os.path.join(self.app_home(endpoint), "udp-connect.sh")
@@ -182,20 +220,8 @@ class UdpTunnel(LinuxApplication):
             self._state = ResourceState.FAILED
             raise RuntimeError, msg
 
-    def stop(self):
-        command = self.get('command') or ''
-        state = self.state
-        
-        if state == ResourceState.STARTED:
-            self.info("Stopping command '%s'" % command)
-
-            command = "bash %s" % os.path.join(self.app_home, "stop.sh")
-            (out, err), proc = self.execute_command(command,
-                    blocking = True)
-
-            self._stop_time = tnow()
-            self._state = ResourceState.STOPPED
-
+    # XXX: Leaves process unkilled!! 
+    #       Implement another mechanism to kill the tunnel!
     def stop(self):
         """ Stops application execution
         """
@@ -214,7 +240,7 @@ class UdpTunnel(LinuxApplication):
                 if err1 or err2 or pro1.poll() or proc2.poll():
                     # check if execution errors occurred
                     msg = " Failed to STOP tunnel"
-                    self.error(msg, out, err)
+                    self.error(msg, err1, err2)
                     self.fail()
                     stopped = False
 
@@ -232,27 +258,29 @@ class UdpTunnel(LinuxApplication):
             # requested every 'state_check_delay' seconds.
             state_check_delay = 0.5
             if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
-                # check if execution errors occurred
-                (out1, err1), proc1 = self.endpoint1.node.check_errors(
-                        self.run_home(self.endpoint1))
-
-                (out2, err2), proc2 = self.endpoint2.node.check_errors(
-                        self.run_home(self.endpoint2))
-
-                if err1 or err2:
-                    msg = " Failed to connect endpoints "
-                    self.error(msg, err1, err2)
-                    self.fail()
-
-                elif self._pid1 and self._ppid1 and self._pid2 and self._ppid2:
+                if self._pid1 and self._ppid1 and self._pid2 and self._ppid2:
+                    # Make sure the process is still running in background
                     # No execution errors occurred. Make sure the background
                     # process with the recorded pid is still running.
-                    status1 = self.node.status(self._pid1, self._ppid1)
-                    status2 = self.node.status(self._pid2, self._ppid2)
+                    status1 = self.endpoint1.node.status(self._pid1, self._ppid1)
+                    status2 = self.endpoint2.node.status(self._pid2, self._ppid2)
 
                     if status1 == ProcStatus.FINISHED and \
-                            satus2 == ProcStatus.FINISHED:
-                        self._state = ResourceState.FINISHED
+                            status2 == ProcStatus.FINISHED:
+
+                        # check if execution errors occurred
+                        (out1, err1), proc1 = self.endpoint1.node.check_errors(
+                                self.run_home(self.endpoint1))
+
+                        (out2, err2), proc2 = self.endpoint2.node.check_errors(
+                                self.run_home(self.endpoint2))
+
+                        if err1 or err2: 
+                            msg = "Error occurred in tunnel"
+                            self.error(msg, err1, err2)
+                            self.fail()
+                        else:
+                            self._state = ResourceState.FINISHED
 
                 self._last_state_check = tnow()
 
index 9d1b295..b441397 100644 (file)
@@ -176,9 +176,3 @@ class PlanetlabNode(LinuxNode):
         # TODO: Validate!
         return True
 
-    def blacklist(self):
-        # TODO!!!!
-        self.warn(" Blacklisting malfunctioning node ")
-        #import util
-        #util.appendBlacklist(self.hostname)
-
index 9b0b514..c59bf05 100644 (file)
@@ -61,8 +61,9 @@ def get_fd(socket_name):
 
 def get_options():
     usage = ("usage: %prog -t <vif-type> -S <fd-socket-name> "
-        "-l <local-port-file> -r <remote-port-file> -H <remote-host> "
-        "-R <ret-file> ")
+            "-b <bwlimit> -c <cipher> -k <cipher-key> -q <txqueuelen> " 
+            "-l <local-port-file> -r <remote-port-file> -H <remote-host> "
+            "-R <ret-file> ")
     
     parser = OptionParser(usage = usage)
 
@@ -72,6 +73,23 @@ def get_options():
     parser.add_option("-S", "--fd-socket-name", dest="fd_socket_name",
         help = "Name for the unix socket to request the TAP file descriptor", 
         default = "tap.sock", type="str")
+
+    parser.add_option("-b", "--bwlimit", dest="bwlimit",
+        help = "Specifies the interface's emulated bandwidth in bytes ",
+        default = None, type="int")
+    parser.add_option("-q", "--txqueuelen", dest="txqueuelen",
+        help = "Specifies the interface's transmission queue length. ",
+        default = 1000, type="int")
+    parser.add_option("-c", "--cipher", dest="cipher",
+        help = "Cipher to encript communication. "
+            "One of PLAIN, AES, Blowfish, DES, DES3. ",
+        default = None, type="str")
+    parser.add_option("-k", "--cipher-key", dest="cipher_key",
+        help = "Specify a symmetric encryption key with which to protect "
+            "packets across the tunnel. python-crypto must be installed "
+            "on the system." ,
+        default = None, type="str")
+
     parser.add_option("-l", "--local-port-file", dest="local_port_file",
         help = "File where to store the local binded UDP port number ", 
         default = "local_port_file", type="str")
@@ -92,13 +110,15 @@ def get_options():
         vif_type = vsys.IFF_TUN
 
     return ( vif_type, options.fd_socket_name, options.local_port_file,
-            options.remote_port_file, options.remote_host,
-            options.ret_file )
+            options.remote_port_file, options.remote_host, options.ret_file, 
+            options.bwlimit, options.cipher, options.cipher_key,
+            options.txqueuelen )
 
 if __name__ == '__main__':
 
     ( vif_type, socket_name, local_port_file, remote_port_file,
-            remote_host, ret_file ) = get_options()
+      remote_host, ret_file, bwlimit, cipher, cipher_key, txqueuelen 
+         ) = get_options()
    
     # Get the file descriptor of the TAP device from the process
     # that created it
@@ -120,10 +140,22 @@ if __name__ == '__main__':
     while not os.path.exists(remote_port_file):
         time.sleep(2)
 
+    remote_port = ''
     # Read remote port from file
-    f = open(remote_port_file, 'r')
-    remote_port = f.read()
-    f.close()
+    # Try until something is read...
+    # xxx: There seems to be a weird behavior where
+    #       even if the file exists and had the port number,
+    #       the read operation returns empty string!
+    #       Maybe a raise condition?
+    for i in xrange(10):
+        f = open(remote_port_file, 'r')
+        remote_port = f.read()
+        f.close()
+
+        if remote_port:
+            break
+        
+        time.sleep(2)
     
     remote_port = remote_port.strip()
     remote_port = int(remote_port)
@@ -140,17 +172,17 @@ if __name__ == '__main__':
     f.close()
 
     # Establish tunnel
-    # TODO: ADD parameters tunqueue, tunkqueue, cipher_key
     tunchannel.tun_fwd(tun, remote,
-        # Planetlab TAP devices add PI headers 
-        with_pi = True,
+        with_pi = True, # Planetlab TAP devices add PI headers 
         ether_mode = (vif_type == vsys.IFF_TAP),
-        cipher_key = None,
         udp = True,
+        cipher_key = cipher_key,
+        cipher = cipher,
         TERMINATE = TERMINATE,
         SUSPEND = SUSPEND,
-        tunqueue = 1000,
+        tunqueue = txqueuelen,
         tunkqueue = 500,
+        bwlimit = bwlimit
     ) 
  
 
index b969174..9fdd5f0 100644 (file)
@@ -238,7 +238,8 @@ class PlanetlabTap(LinuxApplication):
         return if_name
 
     def udp_connect_command(self, remote_ip, local_port_file, 
-            remote_port_file, ret_file):
+            remote_port_file, ret_file, cipher, cipher_key,
+            bwlimit, txqueuelen):
         command = ["sudo -S "]
         command.append("PYTHONPATH=$PYTHONPATH:${SRC}")
         command.append("python ${SRC}/pl-vif-udp-connect.py")
@@ -248,6 +249,14 @@ class PlanetlabTap(LinuxApplication):
         command.append("-r %s " % remote_port_file)
         command.append("-H %s " % remote_ip)
         command.append("-R %s " % ret_file)
+        if cipher:
+            command.append("-c %s " % cipher)
+        if cipher_key:
+            command.append("-k %s " % cipher_key)
+        if txqueuelen:
+            command.append("-q %s " % txqueuelen)
+        if bwlimit:
+            command.append("-b %s " % bwlimit)
 
         command = " ".join(command)
         command = self.replace_paths(command)
index 4135148..6f45ae8 100755 (executable)
@@ -84,12 +84,68 @@ class UdpTunnelTestCase(unittest.TestCase):
         if_name = ec.get(tap2, "deviceName")
         self.assertTrue(if_name.startswith("tap"))
 
+        ec.shutdown()
+
+    @skipIfAnyNotAlive
+    def t_tun_udp_tunnel(self, user, host1, host2):
+
+        ec = ExperimentController(exp_id = "test-tap-udp-tunnel")
+        
+        node1 = ec.register_resource("PlanetlabNode")
+        ec.set(node1, "hostname", host1)
+        ec.set(node1, "username", user)
+        ec.set(node1, "cleanHome", True)
+        ec.set(node1, "cleanProcesses", True)
+
+        tun1 = ec.register_resource("PlanetlabTun")
+        ec.set(tun1, "ip4", "192.168.1.1")
+        ec.set(tun1, "pointopoint", "192.168.1.2")
+        ec.set(tun1, "prefix4", 24)
+        ec.register_connection(tun1, node1)
+
+        node2 = ec.register_resource("PlanetlabNode")
+        ec.set(node2, "hostname", host2)
+        ec.set(node2, "username", user)
+        ec.set(node2, "cleanHome", True)
+        ec.set(node2, "cleanProcesses", True)
+
+        tun2 = ec.register_resource("PlanetlabTun")
+        ec.set(tun2, "ip4", "192.168.1.2")
+        ec.set(tun2, "pointopoint", "192.168.1.1")
+        ec.set(tun2, "prefix4", 24)
+        ec.register_connection(tun2, node2)
+
+        udptun = ec.register_resource("UdpTunnel")
+        ec.register_connection(tun1, udptun)
+        ec.register_connection(tun2, udptun)
+
+        app = ec.register_resource("LinuxApplication")
+        cmd = "ping -c3 192.168.1.2"
+        ec.set(app, "command", cmd)
+        ec.register_connection(app, node1)
+
+        ec.deploy()
+
+        ec.wait_finished(app)
+
+        ping = ec.trace(app, 'stdout')
+        expected = """3 packets transmitted, 3 received, 0% packet loss"""
+        self.assertTrue(ping.find(expected) > -1)
+        
+        if_name = ec.get(tun1, "deviceName")
+        self.assertTrue(if_name.startswith("tun"))
+        
+        if_name = ec.get(tun2, "deviceName")
+        self.assertTrue(if_name.startswith("tun"))
 
         ec.shutdown()
 
     def test_tap_udp_tunnel(self):
         self.t_tap_udp_tunnel(self.user, self.host1, self.host2)
 
+    def test_tun_udp_tunnel(self):
+        self.t_tun_udp_tunnel(self.user, self.host1, self.host2)
+
 if __name__ == '__main__':
     unittest.main()