Fix bug strict host checking
[nepi.git] / src / nepi / resources / linux / node.py
index 49f5342..232a9df 100644 (file)
@@ -17,9 +17,9 @@
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-        reschedule_delay
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -28,6 +28,7 @@ import collections
 import os
 import random
 import re
 import os
 import random
 import re
+import socket
 import tempfile
 import time
 import threading
 import tempfile
 import time
 import threading
@@ -57,7 +58,7 @@ class OSType:
     UBUNTU = "ubuntu"
     DEBIAN = "debian"
 
     UBUNTU = "ubuntu"
     DEBIAN = "debian"
 
-@clsinit
+@clsinit_copy
 class LinuxNode(ResourceManager):
     """
     .. class:: Class Args :
 class LinuxNode(ResourceManager):
     """
     .. class:: Class Args :
@@ -142,42 +143,61 @@ class LinuxNode(ResourceManager):
 
     """
     _rtype = "LinuxNode"
 
     """
     _rtype = "LinuxNode"
+    _help = "Controls Linux host machines ( either localhost or a host " \
+            "that can be accessed using a SSH key)"
+    _backend_type = "linux"
 
     @classmethod
     def _register_attributes(cls):
         hostname = Attribute("hostname", "Hostname of the machine",
 
     @classmethod
     def _register_attributes(cls):
         hostname = Attribute("hostname", "Hostname of the machine",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
 
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
 
 
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
 
-        port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
+        port = Attribute("port", "SSH port", flags = Flags.Design)
         
         home = Attribute("home",
                 "Experiment home directory to store all experiment related files",
         
         home = Attribute("home",
                 "Experiment home directory to store all experiment related files",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         
         identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
         
         server_key = Attribute("serverKey", "Server public key", 
         
         identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
         
         server_key = Attribute("serverKey", "Server public key", 
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
-                flags = Flags.ExecReadOnly)
+                type = Types.Bool,
+                default = False,
+                flags = Flags.Design)
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
-                flags = Flags.ExecReadOnly)
+                type = Types.Bool,
+                default = False,
+                flags = Flags.Design)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
-                flags = Flags.ExecReadOnly)
+                type = Types.Bool,
+                default = False,
+                flags = Flags.Design)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
                 "releasing the resource",
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
                 "releasing the resource",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
+
+        gateway_user = Attribute("gatewayUser", "Gateway account username",
+                flags = Flags.Design)
+
+        gateway = Attribute("gateway", "Hostname of the gateway machine",
+                flags = Flags.Design)
+
+        ip = Attribute("ip", "Linux host public IP address. "
+                   "Must not be modified by the user unless hostname is 'localhost'",
+                    flags = Flags.Design)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
@@ -189,15 +209,24 @@ class LinuxNode(ResourceManager):
         cls._register_attribute(clean_experiment)
         cls._register_attribute(clean_processes)
         cls._register_attribute(tear_down)
         cls._register_attribute(clean_experiment)
         cls._register_attribute(clean_processes)
         cls._register_attribute(tear_down)
+        cls._register_attribute(gateway_user)
+        cls._register_attribute(gateway)
+        cls._register_attribute(ip)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
         # home directory at Linux host
         self._home_dir = ""
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
         # home directory at Linux host
         self._home_dir = ""
-        
-        # lock to avoid concurrency issues on methods used by applications 
-        self._lock = threading.Lock()
+
+        # lock to prevent concurrent applications on the same node,
+        # to execute commands at the same time. There are potential
+        # concurrency issues when using SSH to a same host from 
+        # multiple threads. There are also possible operational 
+        # issues, e.g. an application querying the existence 
+        # of a file or folder prior to its creation, and another 
+        # application creating the same file or folder in between.
+        self._node_lock = threading.Lock()
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -210,9 +239,13 @@ class LinuxNode(ResourceManager):
            home = os.path.join(self._home_dir, home) 
         return home
 
            home = os.path.join(self._home_dir, home) 
         return home
 
+    @property
+    def nepi_home(self):
+        return os.path.join(self.home_dir, ".nepi")
+
     @property
     def usr_dir(self):
     @property
     def usr_dir(self):
-        return os.path.join(self.home_dir, "nepi-usr")
+        return os.path.join(self.nepi_home, "nepi-usr")
 
     @property
     def lib_dir(self):
 
     @property
     def lib_dir(self):
@@ -232,7 +265,7 @@ class LinuxNode(ResourceManager):
 
     @property
     def exp_dir(self):
 
     @property
     def exp_dir(self):
-        return os.path.join(self.home_dir, "nepi-exp")
+        return os.path.join(self.nepi_home, "nepi-exp")
 
     @property
     def exp_home(self):
 
     @property
     def exp_home(self):
@@ -251,17 +284,12 @@ class LinuxNode(ResourceManager):
         if self._os:
             return self._os
 
         if self._os:
             return self._os
 
-        if (not self.get("hostname") or not self.get("username")):
+        if not self.localhost and not self.get("username"):
             msg = "Can't resolve OS, insufficient data "
             self.error(msg)
             raise RuntimeError, msg
 
             msg = "Can't resolve OS, insufficient data "
             self.error(msg)
             raise RuntimeError, msg
 
-        (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
-
-        if err and proc.poll():
-            msg = "Error detecting OS "
-            self.error(msg, out, err)
-            raise RuntimeError, "%s - %s - %s" %( msg, out, err )
+        out = self.get_os()
 
         if out.find("Fedora release 8") == 0:
             self._os = OSType.FEDORA_8
 
         if out.find("Fedora release 8") == 0:
             self._os = OSType.FEDORA_8
@@ -269,6 +297,8 @@ class LinuxNode(ResourceManager):
             self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
             self._os = OSType.FEDORA_14
             self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
             self._os = OSType.FEDORA_14
+        elif out.find("Fedora release") == 0:
+            self._os = OSType.FEDORA
         elif out.find("Debian") == 0: 
             self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
         elif out.find("Debian") == 0: 
             self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
@@ -280,6 +310,23 @@ class LinuxNode(ResourceManager):
 
         return self._os
 
 
         return self._os
 
+    def get_os(self):
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        out = ""
+        try:
+            (out, err), proc = self.execute("cat /etc/issue", 
+                    with_lock = True,
+                    blocking = True)
+        except:
+            trace = traceback.format_exc()
+            msg = "Error detecting OS: %s " % trace
+            self.error(msg, out, err)
+        
+        return out
+
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
     @property
     def use_deb(self):
         return self.os in [OSType.DEBIAN, OSType.UBUNTU]
@@ -291,13 +338,11 @@ class LinuxNode(ResourceManager):
 
     @property
     def localhost(self):
 
     @property
     def localhost(self):
-        return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+        return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
 
 
-    def provision(self):
+    def do_provision(self):
         # check if host is alive
         if not self.is_alive():
         # check if host is alive
         if not self.is_alive():
-            self.fail()
-            
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
@@ -313,76 +358,105 @@ class LinuxNode(ResourceManager):
         if self.get("cleanExperiment"):
             self.clean_experiment()
     
         if self.get("cleanExperiment"):
             self.clean_experiment()
     
-        # Create shared directory structure
-        self.mkdir(self.lib_dir)
-        self.mkdir(self.bin_dir)
-        self.mkdir(self.src_dir)
-        self.mkdir(self.share_dir)
+        # Create shared directory structure and node home directory
+        paths = [self.lib_dir, 
+            self.bin_dir, 
+            self.src_dir, 
+            self.share_dir, 
+            self.node_home]
 
 
-        # Create experiment node home directory
-        self.mkdir(self.node_home)
+        self.mkdir(paths)
 
 
-        super(LinuxNode, self).provision()
+        # Get Public IP address
+        if not self.get("ip"):
+            if self.localhost:
+                ip = socket.gethostbyname(socket.gethostname())
+            else:
+                ip = socket.gethostbyname(self.get("hostname"))
 
 
-    def deploy(self):
+            self.set("ip", ip)
+
+        super(LinuxNode, self).do_provision()
+
+    def do_deploy(self):
         if self.state == ResourceState.NEW:
         if self.state == ResourceState.NEW:
-            try:
-                self.discover()
-                self.provision()
-            except:
-                self._state = ResourceState.FAILED
-                raise
+            self.info("Deploying node")
+            self.do_discover()
+            self.do_provision()
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
         from nepi.resources.linux.interface import LinuxInterface
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
         from nepi.resources.linux.interface import LinuxInterface
-        ifaces = self.get_connected(LinuxInterface.rtype())
+        ifaces = self.get_connected(LinuxInterface.get_rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
                 self.ec.schedule(reschedule_delay, self.deploy)
                 return 
 
         for iface in ifaces:
             if iface.state < ResourceState.READY:
                 self.ec.schedule(reschedule_delay, self.deploy)
                 return 
 
-        super(LinuxNode, self).deploy()
+        super(LinuxNode, self).do_deploy()
+
+    def do_release(self):
+        rms = self.get_connected()
+        for rm in rms:
+            # Node needs to wait until all associated RMs are released
+            # before it can be released
+            if rm.state != ResourceState.RELEASED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
 
 
-    def release(self):
         tear_down = self.get("tearDown")
         if tear_down:
             self.execute(tear_down)
 
         self.clean_processes()
 
         tear_down = self.get("tearDown")
         if tear_down:
             self.execute(tear_down)
 
         self.clean_processes()
 
-        super(LinuxNode, self).release()
+        super(LinuxNode, self).do_release()
 
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
 
 
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
 
-    def clean_processes(self, killer = False):
+    def clean_processes(self):
         self.info("Cleaning up processes")
         self.info("Cleaning up processes")
+
+        if self.localhost:
+            return 
         
         
-        if killer:
-            # Hardcore kill
-            cmd = ("sudo -S killall python tcpdump || /bin/true ; " +
-                "sudo -S killall python tcpdump || /bin/true ; " +
-                "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ; " +
-                "sudo -S killall -u root || /bin/true ; " +
-                "sudo -S killall -u root || /bin/true ; ")
-        else:
-            # Be gentler...
+        if self.get("username") != 'root':
             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
-                "sudo -S killall tcpdump || /bin/true ; " +
-                "sudo -S killall -u %s || /bin/true ; " % self.get("username") +
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
+        else:
+            if self.state >= ResourceState.READY:
+                import pickle
+                pids = pickle.load(open("/tmp/save.proc", "rb"))
+                pids_temp = dict()
+                ps_aux = "ps aux |awk '{print $2,$11}'"
+                (out, err), proc = self.execute(ps_aux)
+                if len(out) != 0:
+                    for line in out.strip().split("\n"):
+                        parts = line.strip().split(" ")
+                        pids_temp[parts[0]] = parts[1]
+                    kill_pids = set(pids_temp.items()) - set(pids.items())
+                    kill_pids = ' '.join(dict(kill_pids).keys())
+
+                    cmd = ("killall tcpdump || /bin/true ; " +
+                        "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
+                        "kill %s || /bin/true ; " % kill_pids)
+                else:
+                    cmd = ("killall tcpdump || /bin/true ; " +
+                        "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
+            else:
+                cmd = ("killall tcpdump || /bin/true ; " +
+                    "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
+
+        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
 
 
-        out = err = ""
-        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
-            
     def clean_home(self):
         """ Cleans all NEPI related folders in the Linux host
         """
         self.info("Cleaning up home")
         
     def clean_home(self):
         """ Cleans all NEPI related folders in the Linux host
         """
         self.info("Cleaning up home")
         
-        cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
+        cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
                 self.home_dir )
 
         return self.execute(cmd, with_lock = True)
                 self.home_dir )
 
         return self.execute(cmd, with_lock = True)
@@ -402,13 +476,10 @@ class LinuxNode(ResourceManager):
 
     def execute(self, command,
             sudo = False,
 
     def execute(self, command,
             sudo = False,
-            stdin = None, 
             env = None,
             tty = False,
             forward_x11 = False,
             env = None,
             tty = False,
             forward_x11 = False,
-            timeout = None,
             retry = 3,
             retry = 3,
-            err_on_timeout = True,
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
@@ -421,29 +492,31 @@ class LinuxNode(ResourceManager):
 
         if self.localhost:
             (out, err), proc = execfuncs.lexec(command, 
 
         if self.localhost:
             (out, err), proc = execfuncs.lexec(command, 
-                    user = user,
+                    user = self.get("username"), # still problem with localhost
                     sudo = sudo,
                     sudo = sudo,
-                    stdin = stdin,
                     env = env)
         else:
             if with_lock:
                     env = env)
         else:
             if with_lock:
-                with self._lock:
+                # If the execute command is blocking, we don't want to keep
+                # the node lock. This lock is used to avoid race conditions
+                # when creating the ControlMaster sockets. A more elegant
+                # solution is needed.
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
                         agent = True,
                         sudo = sudo,
-                        stdin = stdin,
                         identity = self.get("identity"),
                         server_key = self.get("serverKey"),
                         env = env,
                         tty = tty,
                         forward_x11 = forward_x11,
                         identity = self.get("identity"),
                         server_key = self.get("serverKey"),
                         env = env,
                         tty = tty,
                         forward_x11 = forward_x11,
-                        timeout = timeout,
                         retry = retry,
                         retry = retry,
-                        err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
                         blocking = blocking, 
                         connect_timeout = connect_timeout,
                         persistent = persistent,
                         blocking = blocking, 
@@ -455,17 +528,16 @@ class LinuxNode(ResourceManager):
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     sudo = sudo,
                     agent = True,
                     sudo = sudo,
-                    stdin = stdin,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     env = env,
                     tty = tty,
                     forward_x11 = forward_x11,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     env = env,
                     tty = tty,
                     forward_x11 = forward_x11,
-                    timeout = timeout,
                     retry = retry,
                     retry = retry,
-                    err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
                     persistent = persistent,
                     blocking = blocking, 
                     connect_timeout = connect_timeout,
                     persistent = persistent,
                     blocking = blocking, 
@@ -481,37 +553,40 @@ class LinuxNode(ResourceManager):
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
-            tty = False):
+            tty = False,
+            strict_host_checking = False):
         
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
         
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
-            (out, err), proc = execfuncs.lspawn(command, pidfile, 
-                    stdout = stdout, 
-                    stderr = stderr, 
-                    stdin = stdin, 
+            (out, err), proc = execfuncs.lspawn(command, pidfile,
                     home = home, 
                     create_home = create_home, 
                     home = home, 
                     create_home = create_home, 
-                    sudo = sudo,
-                    user = user) 
+                    stdin = stdin or '/dev/null',
+                    stdout = stdout or '/dev/null',
+                    stderr = stderr or '/dev/null',
+                    sudo = sudo) 
         else:
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                     home = home,
                     create_home = create_home,
                 (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                     home = home,
                     create_home = create_home,
-                    stdin = stdin if stdin is not None else '/dev/null',
-                    stdout = stdout if stdout else '/dev/null',
-                    stderr = stderr if stderr else '/dev/null',
+                    stdin = stdin or '/dev/null',
+                    stdout = stdout or '/dev/null',
+                    stderr = stderr or '/dev/null',
                     sudo = sudo,
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
                     sudo = sudo,
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
-                    tty = tty
+                    tty = tty,
+                    strict_host_checking = strict_host_checking
                     )
 
         return (out, err), proc
                     )
 
         return (out, err), proc
@@ -520,15 +595,18 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
         if self.localhost:
             pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
-            with self._lock:
+            with self._node_lock:
                 pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
                 pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
                     agent = True,
                     identity = self.get("identity"),
-                    server_key = self.get("serverKey")
+                    server_key = self.get("serverKey"),
+                    strict_host_checking = False
                     )
         
         return pidtuple
                     )
         
         return pidtuple
@@ -537,15 +615,18 @@ class LinuxNode(ResourceManager):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
-            with self._lock:
+            with self._node_lock:
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         identity = self.get("identity"),
                         agent = True,
                         identity = self.get("identity"),
-                        server_key = self.get("serverKey")
+                        server_key = self.get("serverKey"),
+                        strict_host_checking = False
                         )
            
         return status
                         )
            
         return status
@@ -559,30 +640,34 @@ class LinuxNode(ResourceManager):
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
                         user = self.get("username"),
                         port = self.get("port"),
+                        gwuser = self.get("gatewayUser"),
+                        gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
                         identity = self.get("identity"),
                         agent = True,
                         sudo = sudo,
                         identity = self.get("identity"),
-                        server_key = self.get("serverKey")
+                        server_key = self.get("serverKey"),
+                        strict_host_checking = False
                         )
 
         return (out, err), proc
 
     def copy(self, src, dst):
         if self.localhost:
                         )
 
         return (out, err), proc
 
     def copy(self, src, dst):
         if self.localhost:
-            (out, err), proc = execfuncs.lcopy(source, dest, 
-                    recursive = True,
-                    strict_host_checking = False)
+            (out, err), proc = execfuncs.lcopy(src, dst, 
+                    recursive = True)
         else:
         else:
-            with self._lock:
+            with self._node_lock:
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
                 (out, err), proc = sshfuncs.rcopy(
                     src, dst, 
                     port = self.get("port"),
+                    gwuser = self.get("gatewayUser"),
+                    gw = self.get("gateway"),
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     recursive = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     recursive = True,
@@ -590,15 +675,21 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
 
         return (out, err), proc
 
-
-    def upload(self, src, dst, text = False, overwrite = True):
+    def upload(self, src, dst, text = False, overwrite = True,
+            raise_on_error = True):
         """ Copy content to destination
 
         """ Copy content to destination
 
-           src  content to copy. Can be a local file, directory or a list of files
+        src  string with the content to copy. Can be:
+            - plain text
+            - a string with the path to a local file
+            - a string with a semi-colon separeted list of local files
+            - a string with a local directory
 
 
-           dst  destination path on the remote host (remote is always self.host)
+        dst  string with destination path on the remote host (remote is 
+            always self.host)
 
 
-           text src is text input, it must be stored into a temp file before uploading
+        text src is text input, it must be stored into a temp file before 
+        uploading
         """
         # If source is a string input 
         f = None
         """
         # If source is a string input 
         f = None
@@ -611,36 +702,52 @@ class LinuxNode(ResourceManager):
             src = f.name
 
         # If dst files should not be overwritten, check that the files do not
             src = f.name
 
         # If dst files should not be overwritten, check that the files do not
-        # exits already 
+        # exits already
+        if isinstance(src, str):
+            src = map(str.strip, src.split(";"))
+    
         if overwrite == False:
             src = self.filter_existing_files(src, dst)
             if not src:
         if overwrite == False:
             src = self.filter_existing_files(src, dst)
             if not src:
-                return ("", ""), None 
+                return ("", ""), None
 
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
 
 
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
 
-        result = self.copy(src, dst)
+        ((out, err), proc) = self.copy(src, dst)
 
         # clean up temp file
         if f:
             os.remove(f.name)
 
 
         # clean up temp file
         if f:
             os.remove(f.name)
 
-        return result
+        if err:
+            msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
+            self.error(msg, out, err)
+            
+            msg = "%s out: %s err: %s" % (msg, out, err)
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        return ((out, err), proc)
 
 
-    def download(self, src, dst):
+    def download(self, src, dst, raise_on_error = True):
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
-        return self.copy(src, dst)
 
 
-    def install_packages(self, packages, home, run_home = None):
-        """ Install packages in the Linux host.
+        ((out, err), proc) = self.copy(src, dst)
 
 
-        'home' is the directory to upload the package installation script.
-        'run_home' is the directory from where to execute the script.
-        """
+        if err:
+            msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
+            self.error(msg, out, err)
+
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        return ((out, err), proc)
+
+    def install_packages_command(self, packages):
         command = ""
         if self.use_rpm:
             command = rpmfuncs.install_packages_command(self.os, packages)
         command = ""
         if self.use_rpm:
             command = rpmfuncs.install_packages_command(self.os, packages)
@@ -651,6 +758,17 @@ class LinuxNode(ResourceManager):
             self.error(msg, self.os)
             raise RuntimeError, msg
 
             self.error(msg, self.os)
             raise RuntimeError, msg
 
+        return command
+
+    def install_packages(self, packages, home, run_home = None,
+            raise_on_error = True):
+        """ Install packages in the Linux host.
+
+        'home' is the directory to upload the package installation script.
+        'run_home' is the directory from where to execute the script.
+        """
+        command = self.install_packages_command(packages)
+
         run_home = run_home or home
 
         (out, err), proc = self.run_and_wait(command, run_home, 
         run_home = run_home or home
 
         (out, err), proc = self.run_and_wait(command, run_home, 
@@ -660,11 +778,12 @@ class LinuxNode(ResourceManager):
             stdout = "instpkg_stdout", 
             stderr = "instpkg_stderr",
             overwrite = False,
             stdout = "instpkg_stdout", 
             stderr = "instpkg_stderr",
             overwrite = False,
-            raise_on_error = True)
+            raise_on_error = raise_on_error)
 
         return (out, err), proc 
 
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home, run_home = None):
+    def remove_packages(self, packages, home, run_home = None,
+            raise_on_error = True):
         """ Uninstall packages from the Linux host.
 
         'home' is the directory to upload the package un-installation script.
         """ Uninstall packages from the Linux host.
 
         'home' is the directory to upload the package un-installation script.
@@ -688,18 +807,35 @@ class LinuxNode(ResourceManager):
             stdout = "rmpkg_stdout", 
             stderr = "rmpkg_stderr",
             overwrite = False,
             stdout = "rmpkg_stdout", 
             stderr = "rmpkg_stderr",
             overwrite = False,
-            raise_on_error = True)
+            raise_on_error = raise_on_error)
          
         return (out, err), proc 
 
          
         return (out, err), proc 
 
-    def mkdir(self, path, clean = False):
+    def mkdir(self, paths, clean = False):
+        """ Paths is either a single remote directory path to create,
+        or a list of directories to create.
+        """
         if clean:
         if clean:
-            self.rmdir(path)
+            self.rmdir(paths)
 
 
-        return self.execute("mkdir -p %s" % path, with_lock = True)
+        if isinstance(paths, str):
+            paths = [paths]
 
 
-    def rmdir(self, path):
-        return self.execute("rm -rf %s" % path, with_lock = True)
+        cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
+
+        return self.execute(cmd, with_lock = True)
+
+    def rmdir(self, paths):
+        """ Paths is either a single remote directory path to delete,
+        or a list of directories to delete.
+        """
+
+        if isinstance(paths, str):
+            paths = [paths]
+
+        cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
+
+        return self.execute(cmd, with_lock = True)
         
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
         
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
@@ -712,7 +848,7 @@ class LinuxNode(ResourceManager):
             stderr = "stderr", 
             sudo = False,
             tty = False,
             stderr = "stderr", 
             sudo = False,
             tty = False,
-            raise_on_error = False):
+            raise_on_error = True):
         """
         Uploads the 'command' to a bash script in the host.
         Then runs the script detached in background in the host, and
         """
         Uploads the 'command' to a bash script in the host.
         Then runs the script detached in background in the host, and
@@ -819,8 +955,8 @@ class LinuxNode(ResourceManager):
         return self.upload(command, shfile, text = True, overwrite = overwrite)
 
     def format_environment(self, env, inline = False):
         return self.upload(command, shfile, text = True, overwrite = overwrite)
 
     def format_environment(self, env, inline = False):
-        """Format environmental variables for command to be executed either
-        as an inline command
+        """ Formats the environment variables for a command to be executed
+        either as an inline command
         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
         """
         (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
         as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
         """
@@ -835,8 +971,7 @@ class LinuxNode(ResourceManager):
     def check_errors(self, home, 
             ecodefile = "exitcode", 
             stderr = "stderr"):
     def check_errors(self, home, 
             ecodefile = "exitcode", 
             stderr = "stderr"):
-        """
-        Checks whether errors occurred while running a command.
+        """ Checks whether errors occurred while running a command.
         It first checks the exit code for the command, and only if the
         exit code is an error one it returns the error output.
 
         It first checks the exit code for the command, and only if the
         exit code is an error one it returns the error output.
 
@@ -868,7 +1003,7 @@ class LinuxNode(ResourceManager):
         pid = ppid = None
         delay = 1.0
 
         pid = ppid = None
         delay = 1.0
 
-        for i in xrange(4):
+        for i in xrange(2):
             pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
             pidtuple = self.getpid(home = home, pidfile = pidfile)
             
             if pidtuple:
@@ -889,7 +1024,7 @@ class LinuxNode(ResourceManager):
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
 
     def wait_run(self, pid, ppid, trial = 0):
         """ wait for a remote process to finish execution """
-        start_delay = 1.0
+        delay = 1.0
 
         while True:
             status = self.status(pid, ppid)
 
         while True:
             status = self.status(pid, ppid)
@@ -920,42 +1055,55 @@ class LinuxNode(ResourceManager):
             return True
 
         out = err = ""
             return True
 
         out = err = ""
+        msg = "Unresponsive host. Wrong answer. "
+
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
         try:
             (out, err), proc = self.execute("echo 'ALIVE'",
         try:
             (out, err), proc = self.execute("echo 'ALIVE'",
-                    retry = 5, 
+                    blocking = True,
                     with_lock = True)
                     with_lock = True)
+    
+            if out.find("ALIVE") > -1:
+                return True
         except:
             trace = traceback.format_exc()
         except:
             trace = traceback.format_exc()
-            msg = "Unresponsive host  %s " % err
-            self.error(msg, out, trace)
-            return False
+            msg = "Unresponsive host. Error reaching host: %s " % trace
 
 
-        if out.strip() == "ALIVE":
-            return True
-        else:
-            msg = "Unresponsive host "
-            self.error(msg, out, err)
-            return False
+        self.error(msg, out, err)
+        return False
 
     def find_home(self):
         """ Retrieves host home directory
         """
 
     def find_home(self):
         """ Retrieves host home directory
         """
-        (out, err), proc = self.execute("echo ${HOME}", retry = 5, 
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        msg = "Impossible to retrieve HOME directory"
+        try:
+            (out, err), proc = self.execute("echo ${HOME}",
+                    blocking = True,
                     with_lock = True)
                     with_lock = True)
+    
+            if out.strip() != "":
+                self._home_dir =  out.strip()
+        except:
+            trace = traceback.format_exc()
+            msg = "Impossible to retrieve HOME directory %s" % trace
 
 
-        if proc.poll():
-            msg = "Imposible to retrieve HOME directory"
-            self.error(msg, out, err)
+        if not self._home_dir:
+            self.error(msg)
             raise RuntimeError, msg
 
             raise RuntimeError, msg
 
-        self._home_dir =  out.strip()
-
     def filter_existing_files(self, src, dst):
         """ Removes files that already exist in the Linux host from src list
         """
         # construct a dictionary with { dst: src }
     def filter_existing_files(self, src, dst):
         """ Removes files that already exist in the Linux host from src list
         """
         # construct a dictionary with { dst: src }
-        dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
-            src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
+        dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
+                    if len(src) > 1 else dict({dst: src[0]})
 
         command = []
         for d in dests.keys():
 
         command = []
         for d in dests.keys():
@@ -970,7 +1118,7 @@ class LinuxNode(ResourceManager):
                 del dests[d]
 
         if not dests:
                 del dests[d]
 
         if not dests:
-            return ""
+            return []
 
 
-        return " ".join(dests.values())
+        return dests.values()