Replacing RM.rtype() for RM.get_type() for consistency
[nepi.git] / src / nepi / resources / linux / node.py
index 85e2286..cbc0099 100644 (file)
-from nepi.execution.attribute import Attribute, Flags
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 from nepi.resources.linux import rpmfuncs, debfuncs 
-from nepi.util import sshfuncs, execfuncs 
+from nepi.util import sshfuncs, execfuncs
+from nepi.util.sshfuncs import ProcStatus
 
 import collections
-import logging
 import os
 import random
 import re
 import tempfile
 import time
 import threading
+import traceback
 
-# TODO: Verify files and dirs exists already
-# TODO: Blacklist nodes!
 # TODO: Unify delays!!
 # TODO: Validate outcome of uploads!! 
 
-reschedule_delay = "0.5s"
-
-
-@clsinit
+class ExitCode:
+    """
+    Error codes that the rexitcode function can return if unable to
+    check the exit code of a spawned process
+    """
+    FILENOTFOUND = -1
+    CORRUPTFILE = -2
+    ERROR = -3
+    OK = 0
+
+class OSType:
+    """
+    Supported flavors of Linux OS
+    """
+    FEDORA_8 = "f8"
+    FEDORA_12 = "f12"
+    FEDORA_14 = "f14"
+    FEDORA = "fedora"
+    UBUNTU = "ubuntu"
+    DEBIAN = "debian"
+
+@clsinit_copy
 class LinuxNode(ResourceManager):
+    """
+    .. class:: Class Args :
+      
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+
+    .. note::
+
+        There are different ways in which commands can be executed using the
+        LinuxNode interface (i.e. 'execute' - blocking and non blocking, 'run',
+        'run_and_wait'). 
+        
+        Brief explanation:
+
+            * 'execute' (blocking mode) :  
+
+                     HOW IT WORKS: 'execute', forks a process and run the
+                     command, synchronously, attached to the terminal, in
+                     foreground.
+                     The execute method will block until the command returns
+                     the result on 'out', 'err' (so until it finishes executing).
+  
+                     USAGE: short-lived commands that must be executed attached
+                     to a terminal and in foreground, for which it IS necessary
+                     to block until the command has finished (e.g. if you want
+                     to run 'ls' or 'cat').
+
+            * 'execute' (NON blocking mode - blocking = False) :
+
+                    HOW IT WORKS: Same as before, except that execute method
+                    will return immediately (even if command still running).
+
+                    USAGE: long-lived commands that must be executed attached
+                    to a terminal and in foreground, but for which it is not
+                    necessary to block until the command has finished. (e.g.
+                    start an application using X11 forwarding)
+
+             * 'run' :
+
+                   HOW IT WORKS: Connects to the host ( using SSH if remote)
+                   and launches the command in background, detached from any
+                   terminal (daemonized), and returns. The command continues to
+                   run remotely, but since it is detached from the terminal,
+                   its pipes (stdin, stdout, stderr) can't be redirected to the
+                   console (as normal non detached processes would), and so they
+                   are explicitly redirected to files. The pidfile is created as
+                   part of the process of launching the command. The pidfile
+                   holds the pid and ppid of the process forked in background,
+                   so later on it is possible to check whether the command is still
+                   running.
+
+                    USAGE: long-lived commands that can run detached in background,
+                    for which it is NOT necessary to block (wait) until the command
+                    has finished. (e.g. start an application that is not using X11
+                    forwarding. It can run detached and remotely in background)
+
+             * 'run_and_wait' :
+
+                    HOW IT WORKS: Similar to 'run' except that it 'blocks' until
+                    the command has finished execution. It also checks whether
+                    errors occurred during runtime by reading the exitcode file,
+                    which contains the exit code of the command that was run
+                    (checking stderr only is not always reliable since many
+                    commands throw debugging info to stderr and the only way to
+                    automatically know whether an error really happened is to
+                    check the process exit code).
+
+                    Another difference with respect to 'run', is that instead
+                    of directly executing the command as a bash command line,
+                    it uploads the command to a bash script and runs the script.
+                    This allows to use the bash script to debug errors, since
+                    it remains at the remote host and can be run manually to
+                    reproduce the error.
+                  
+                    USAGE: medium-lived commands that can run detached in
+                    background, for which it IS necessary to block (wait) until
+                    the command has finished. (e.g. Package installation,
+                    source compilation, file download, etc)
+
+    """
     _rtype = "LinuxNode"
+    _help = "Controls Linux host machines ( either localhost or a host " \
+            "that can be accessed using a SSH key)"
+    _backend_type = "linux"
 
     @classmethod
     def _register_attributes(cls):
@@ -44,12 +166,22 @@ class LinuxNode(ResourceManager):
         server_key = Attribute("serverKey", "Server public key", 
                 flags = Flags.ExecReadOnly)
         
-        clean_home = Attribute("cleanHome", "Remove all files and directories " + \
-                " from home folder before starting experiment", 
+        clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
+                " from node home folder before starting experiment", 
+                type = Types.Bool,
+                default = False,
+                flags = Flags.ExecReadOnly)
+
+        clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
+                " from a previous same experiment, before the new experiment starts", 
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
+                type = Types.Bool,
+                default = False,
                 flags = Flags.ExecReadOnly)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
@@ -63,34 +195,71 @@ class LinuxNode(ResourceManager):
         cls._register_attribute(identity)
         cls._register_attribute(server_key)
         cls._register_attribute(clean_home)
+        cls._register_attribute(clean_experiment)
         cls._register_attribute(clean_processes)
         cls._register_attribute(tear_down)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
+        # home directory at Linux host
+        self._home_dir = ""
         
-        # lock to avoid concurrency issues on methods used by applications 
-        self._lock = threading.Lock()
-
-        self._logger = logging.getLogger("LinuxNode")
+        # lock to prevent concurrent applications on the same node,
+        # to execute commands at the same time. There are potential
+        # concurrency issues when using SSH to a same host from 
+        # multiple threads. There are also possible operational 
+        # issues, e.g. an application querying the existence 
+        # of a file or folder prior to its creation, and another 
+        # application creating the same file or folder in between.
+        self._node_lock = threading.Lock()
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
                 self.get("hostname"), msg)
 
     @property
-    def home(self):
-        return self.get("home") or ""
+    def home_dir(self):
+        home = self.get("home") or ""
+        if not home.startswith("/"):
+           home = os.path.join(self._home_dir, home) 
+        return home
+
+    @property
+    def usr_dir(self):
+        return os.path.join(self.home_dir, "nepi-usr")
+
+    @property
+    def lib_dir(self):
+        return os.path.join(self.usr_dir, "lib")
+
+    @property
+    def bin_dir(self):
+        return os.path.join(self.usr_dir, "bin")
+
+    @property
+    def src_dir(self):
+        return os.path.join(self.usr_dir, "src")
+
+    @property
+    def share_dir(self):
+        return os.path.join(self.usr_dir, "share")
+
+    @property
+    def exp_dir(self):
+        return os.path.join(self.home_dir, "nepi-exp")
 
     @property
     def exp_home(self):
-        return os.path.join(self.home, self.ec.exp_id)
+        return os.path.join(self.exp_dir, self.ec.exp_id)
 
     @property
     def node_home(self):
-        node_home = "node-%d" % self.guid
-        return os.path.join(self.exp_home, node_home)
+        return os.path.join(self.exp_home, "node-%d" % self.guid)
+
+    @property
+    def run_home(self):
+        return os.path.join(self.node_home, self.ec.run_id)
 
     @property
     def os(self):
@@ -102,21 +271,20 @@ class LinuxNode(ResourceManager):
             self.error(msg)
             raise RuntimeError, msg
 
-        (out, err), proc = self.execute("cat /etc/issue", with_lock = True)
-
-        if err and proc.poll():
-            msg = "Error detecting OS "
-            self.error(msg, out, err)
-            raise RuntimeError, "%s - %s - %s" %( msg, out, err )
+        out = self.get_os()
 
-        if out.find("Fedora release 12") == 0:
-            self._os = "f12"
+        if out.find("Fedora release 8") == 0:
+            self._os = OSType.FEDORA_8
+        elif out.find("Fedora release 12") == 0:
+            self._os = OSType.FEDORA_12
         elif out.find("Fedora release 14") == 0:
-            self._os = "f14"
+            self._os = OSType.FEDORA_14
+        elif out.find("Fedora release") == 0:
+            self._os = OSType.FEDORA
         elif out.find("Debian") == 0: 
-            self._os = "debian"
+            self._os = OSType.DEBIAN
         elif out.find("Ubuntu") ==0:
-            self._os = "ubuntu"
+            self._os = OSType.UBUNTU
         else:
             msg = "Unsupported OS"
             self.error(msg, out)
@@ -124,53 +292,98 @@ class LinuxNode(ResourceManager):
 
         return self._os
 
+    def get_os(self):
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        out = ""
+        try:
+            (out, err), proc = self.execute("cat /etc/issue", 
+                    with_lock = True,
+                    blocking = True)
+        except:
+            trace = traceback.format_exc()
+            msg = "Error detecting OS: %s " % trace
+            self.error(msg, out, err)
+        
+        return out
+
+    @property
+    def use_deb(self):
+        return self.os in [OSType.DEBIAN, OSType.UBUNTU]
+
+    @property
+    def use_rpm(self):
+        return self.os in [OSType.FEDORA_12, OSType.FEDORA_14, OSType.FEDORA_8,
+                OSType.FEDORA]
+
     @property
     def localhost(self):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
-    def provision(self, filters = None):
+    def do_provision(self):
+        # check if host is alive
         if not self.is_alive():
-            self._state = ResourceState.FAILED
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
 
+        self.find_home()
+
         if self.get("cleanProcesses"):
             self.clean_processes()
 
         if self.get("cleanHome"):
             self.clean_home()
-       
+        if self.get("cleanExperiment"):
+            self.clean_experiment()
+    
+        # Create shared directory structure
+        self.mkdir(self.lib_dir)
+        self.mkdir(self.bin_dir)
+        self.mkdir(self.src_dir)
+        self.mkdir(self.share_dir)
+
+        # Create experiment node home directory
         self.mkdir(self.node_home)
 
-        super(LinuxNode, self).provision()
+        super(LinuxNode, self).do_provision()
 
-    def deploy(self):
+    def do_deploy(self):
         if self.state == ResourceState.NEW:
-            try:
-               self.discover()
-               self.provision()
-            except:
-                self._state = ResourceState.FAILED
-                raise
+            self.info("Deploying node")
+            self.do_discover()
+            self.do_provision()
 
         # Node needs to wait until all associated interfaces are 
         # ready before it can finalize deployment
         from nepi.resources.linux.interface import LinuxInterface
-        ifaces = self.get_connected(LinuxInterface.rtype())
+        ifaces = self.get_connected(LinuxInterface.get_rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
                 self.ec.schedule(reschedule_delay, self.deploy)
                 return 
 
-        super(LinuxNode, self).deploy()
+        super(LinuxNode, self).do_deploy()
+
+    def do_release(self):
+        rms = self.get_connected()
+        for rm in rms:
+            # Node needs to wait until all associated RMs are released
+            # before it can be released
+            if rm.state != ResourceState.RELEASED:
+                self.ec.schedule(reschedule_delay, self.release)
+                return 
 
-    def release(self):
         tear_down = self.get("tearDown")
         if tear_down:
             self.execute(tear_down)
 
-        super(LinuxNode, self).release()
+        self.clean_processes()
+
+        super(LinuxNode, self).do_release()
 
     def valid_connection(self, guid):
         # TODO: Validate!
@@ -194,258 +407,30 @@ class LinuxNode(ResourceManager):
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
 
         out = err = ""
-        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
-            
+        (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
+
     def clean_home(self):
+        """ Cleans all NEPI related folders in the Linux host
+        """
         self.info("Cleaning up home")
         
-        cmd = (
-            # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
-            "find . -maxdepth 1 -name 'nepi-*' " +
-            " -execdir rm -rf {} + "
-            )
-            
-        if self.home:
-            cmd = "cd %s ; " % self.home + cmd
-
-        out = err = ""
-        (out, err), proc = self.execute(cmd, with_lock = True)
-
-    def upload(self, src, dst, text = False):
-        """ Copy content to destination
-
-           src  content to copy. Can be a local file, directory or a list of files
-
-           dst  destination path on the remote host (remote is always self.host)
-
-           text src is text input, it must be stored into a temp file before uploading
-        """
-        # If source is a string input 
-        f = None
-        if text and not os.path.isfile(src):
-            # src is text input that should be uploaded as file
-            # create a temporal file with the content to upload
-            f = tempfile.NamedTemporaryFile(delete=False)
-            f.write(src)
-            f.close()
-            src = f.name
-
-        if not self.localhost:
-            # Build destination as <user>@<server>:<path>
-            dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
-
-        result = self.copy(src, dst)
-
-        # clean up temp file
-        if f:
-            os.remove(f.name)
-
-        return result
-
-    def download(self, src, dst):
-        if not self.localhost:
-            # Build destination as <user>@<server>:<path>
-            src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
-        return self.copy(src, dst)
-
-    def install_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
-        if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.install_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.install_packages_command(self.os, packages)
-        else:
-            msg = "Error installing packages ( OS not known ) "
-            self.error(msg, self.os)
-            raise RuntimeError, msg
-
-        out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "instpkg_pid",
-            stdout = "instpkg_out", 
-            stderr = "instpkg_err",
-            raise_on_error = True)
-
-        return (out, err), proc 
-
-    def remove_packages(self, packages, home = None):
-        home = home or self.node_home
-
-        cmd = ""
-        if self.os in ["f12", "f14"]:
-            cmd = rpmfuncs.remove_packages_command(self.os, packages)
-        elif self.os in ["debian", "ubuntu"]:
-            cmd = debfuncs.remove_packages_command(self.os, packages)
-        else:
-            msg = "Error removing packages ( OS not known ) "
-            self.error(msg)
-            raise RuntimeError, msg
-
-        out = err = ""
-        (out, err), proc = self.run_and_wait(cmd, home, 
-            pidfile = "rmpkg_pid",
-            stdout = "rmpkg_out", 
-            stderr = "rmpkg_err",
-            raise_on_error = True)
-         
-        return (out, err), proc 
-
-    def mkdir(self, path, clean = False):
-        if clean:
-            self.rmdir(path)
-
-        return self.execute("mkdir -p %s" % path, with_lock = True)
+        cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
+                self.home_dir )
 
-    def rmdir(self, path):
-        return self.execute("rm -rf %s" % path, with_lock = True)
+        return self.execute(cmd, with_lock = True)
 
-    def run_and_wait(self, command, 
-            home = ".", 
-            pidfile = "pid", 
-            stdin = None, 
-            stdout = 'stdout', 
-            stderr = 'stderr', 
-            sudo = False,
-            tty = False,
-            raise_on_error = False):
-        """ runs a command in background on the remote host, but waits
-            until the command finishes execution.
-            This is more robust than doing a simple synchronized 'execute',
-            since in the remote host the command can continue to run detached
-            even if network disconnections occur
+    def clean_experiment(self):
+        """ Cleans all experiment related files in the Linux host.
+        It preserves NEPI files and folders that have a multi experiment
+        scope.
         """
-        # run command in background in remote host
-        (out, err), proc = self.run(command, home, 
-                pidfile = pidfile,
-                stdin = stdin, 
-                stdout = stdout, 
-                stderr = stderr, 
-                sudo = sudo,
-                tty = tty)
-
-        # check no errors occurred
-        if proc.poll() and err:
-            msg = " Failed to run command '%s' " % command
-            self.error(msg, out, err)
-            if raise_on_error:
-                raise RuntimeError, msg
-
-        # Wait for pid file to be generated
-        pid, ppid = self.wait_pid(
-                home = home, 
-                pidfile = pidfile, 
-                raise_on_error = raise_on_error)
-
-        # wait until command finishes to execute
-        self.wait_run(pid, ppid)
-       
-        # check if execution errors occurred
-        (out, err), proc = self.check_output(home, stderr)
-
-        if err or out:
-            msg = " Failed to run command '%s' " % command
-            self.error(msg, out, err)
-
-            if raise_on_error:
-                raise RuntimeError, msg
+        self.info("Cleaning up experiment files")
         
-        return (out, err), proc
-    def wait_pid(self, home = ".", pidfile = "pid", raise_on_error = False):
-        """ Waits until the pid file for the command is generated, 
-            and returns the pid and ppid of the process """
-        pid = ppid = None
-        delay = 1.0
-        for i in xrange(5):
-            pidtuple = self.checkpid(home = home, pidfile = pidfile)
-            
-            if pidtuple:
-                pid, ppid = pidtuple
-                break
-            else:
-                time.sleep(delay)
-                delay = min(30,delay*1.2)
-        else:
-            msg = " Failed to get pid for pidfile %s/%s " % (
-                    home, pidfile )
-            self.error(msg)
-            
-            if raise_on_error:
-                raise RuntimeError, msg
-
-        return pid, ppid
-
-    def wait_run(self, pid, ppid, trial = 0):
-        """ wait for a remote process to finish execution """
-        delay = 1.0
-        first = True
-        bustspin = 0
-
-        while True:
-            status = self.status(pid, ppid)
+        cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
+                self.exp_dir,
+                self.ec.exp_id )
             
-            if status is sshfuncs.FINISHED:
-                break
-            elif status is not sshfuncs.RUNNING:
-                bustspin += 1
-                time.sleep(delay*(5.5+random.random()))
-                if bustspin > 12:
-                    break
-            else:
-                if first:
-                    first = False
-
-                time.sleep(delay*(0.5+random.random()))
-                delay = min(30,delay*1.2)
-                bustspin = 0
-
-    def check_output(self, home, filename):
-        """ checks file content """
-        (out, err), proc = self.execute("cat %s" % 
-            os.path.join(home, filename), retry = 1, with_lock = True)
-        return (out, err), proc
-
-    def is_alive(self):
-        if self.localhost:
-            return True
-
-        out = err = ""
-        try:
-            # TODO: FIX NOT ALIVE!!!!
-            (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
-                    with_lock = True)
-        except:
-            import traceback
-            trace = traceback.format_exc()
-            msg = "Unresponsive host  %s " % err
-            self.error(msg, out, trace)
-            return False
-
-        if out.strip().startswith('ALIVE'):
-            return True
-        else:
-            msg = "Unresponsive host "
-            self.error(msg, out, err)
-            return False
-
-    def copy(self, src, dst):
-        if self.localhost:
-            (out, err), proc =  execfuncs.lcopy(source, dest, 
-                    recursive = True,
-                    strict_host_checking = False)
-        else:
-            with self._lock:
-                (out, err), proc = sshfuncs.rcopy(
-                    src, dst, 
-                    port = self.get("port"),
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey"),
-                    recursive = True,
-                    strict_host_checking = False)
-
-        return (out, err), proc
+        return self.execute(cmd, with_lock = True)
 
     def execute(self, command,
             sudo = False,
@@ -459,6 +444,7 @@ class LinuxNode(ResourceManager):
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
+            blocking = True,
             with_lock = False
             ):
         """ Notice that this invocation will block until the
@@ -473,7 +459,7 @@ class LinuxNode(ResourceManager):
                     env = env)
         else:
             if with_lock:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                         host = self.get("hostname"),
@@ -492,6 +478,7 @@ class LinuxNode(ResourceManager):
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
+                        blocking = blocking, 
                         strict_host_checking = strict_host_checking
                         )
             else:
@@ -512,21 +499,22 @@ class LinuxNode(ResourceManager):
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
-                    persistent = persistent
+                    persistent = persistent,
+                    blocking = blocking, 
+                    strict_host_checking = strict_host_checking
                     )
 
         return (out, err), proc
 
-    def run(self, command, 
-            home = None,
+    def run(self, command, home,
             create_home = False,
-            pidfile = "pid",
+            pidfile = 'pidfile',
             stdin = None, 
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             tty = False):
-
+        
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
@@ -539,10 +527,8 @@ class LinuxNode(ResourceManager):
                     sudo = sudo,
                     user = user) 
         else:
-            # Start process in a "daemonized" way, using nohup and heavy
-            # stdin/out redirection to avoid connection issues
-            with self._lock:
-                (out,err), proc = sshfuncs.rspawn(
+            with self._node_lock:
+                (out, err), proc = sshfuncs.rspawn(
                     command,
                     pidfile = pidfile,
                     home = home,
@@ -562,12 +548,12 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
-    def checkpid(self, home = ".", pidfile = "pid"):
+    def getpid(self, home, pidfile = "pidfile"):
         if self.localhost:
-            pidtuple =  execfuncs.lcheckpid(os.path.join(home, pidfile))
+            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
         else:
-            with self._lock:
-                pidtuple = sshfuncs.rcheckpid(
+            with self._node_lock:
+                pidtuple = sshfuncs.rgetpid(
                     os.path.join(home, pidfile),
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -578,12 +564,12 @@ class LinuxNode(ResourceManager):
                     )
         
         return pidtuple
-    
+
     def status(self, pid, ppid):
         if self.localhost:
             status = execfuncs.lstatus(pid, ppid)
         else:
-            with self._lock:
+            with self._node_lock:
                 status = sshfuncs.rstatus(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -601,11 +587,11 @@ class LinuxNode(ResourceManager):
         proc = None
         status = self.status(pid, ppid)
 
-        if status == sshfuncs.RUNNING:
+        if status == sshfuncs.ProcStatus.RUNNING:
             if self.localhost:
                 (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
             else:
-                with self._lock:
+                with self._node_lock:
                     (out, err), proc = sshfuncs.rkill(
                         pid, ppid,
                         host = self.get("hostname"),
@@ -616,18 +602,423 @@ class LinuxNode(ResourceManager):
                         identity = self.get("identity"),
                         server_key = self.get("serverKey")
                         )
+
         return (out, err), proc
 
-    def check_bad_host(self, out, err):
-        badre = re.compile(r'(?:'
-                           r'|Error: disk I/O error'
-                           r')', 
-                           re.I)
-        return badre.search(out) or badre.search(err)
-
-    def blacklist(self):
-        # TODO!!!!
-        self.warn(" Blacklisting malfunctioning node ")
-        #import util
-        #util.appendBlacklist(self.hostname)
+    def copy(self, src, dst):
+        if self.localhost:
+            (out, err), proc = execfuncs.lcopy(source, dest, 
+                    recursive = True,
+                    strict_host_checking = False)
+        else:
+            with self._node_lock:
+                (out, err), proc = sshfuncs.rcopy(
+                    src, dst, 
+                    port = self.get("port"),
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    recursive = True,
+                    strict_host_checking = False)
+
+        return (out, err), proc
+
+    def upload(self, src, dst, text = False, overwrite = True):
+        """ Copy content to destination
+
+           src  content to copy. Can be a local file, directory or a list of files
+
+           dst  destination path on the remote host (remote is always self.host)
+
+           text src is text input, it must be stored into a temp file before uploading
+        """
+        # If source is a string input 
+        f = None
+        if text and not os.path.isfile(src):
+            # src is text input that should be uploaded as file
+            # create a temporal file with the content to upload
+            f = tempfile.NamedTemporaryFile(delete=False)
+            f.write(src)
+            f.close()
+            src = f.name
+
+        # If dst files should not be overwritten, check that the files do not
+        # exits already 
+        if overwrite == False:
+            src = self.filter_existing_files(src, dst)
+            if not src:
+                return ("", ""), None 
+
+        if not self.localhost:
+            # Build destination as <user>@<server>:<path>
+            dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
+
+        result = self.copy(src, dst)
+
+        # clean up temp file
+        if f:
+            os.remove(f.name)
+
+        return result
+
+    def download(self, src, dst):
+        if not self.localhost:
+            # Build destination as <user>@<server>:<path>
+            src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
+        return self.copy(src, dst)
+
+    def install_packages_command(self, packages):
+        command = ""
+        if self.use_rpm:
+            command = rpmfuncs.install_packages_command(self.os, packages)
+        elif self.use_deb:
+            command = debfuncs.install_packages_command(self.os, packages)
+        else:
+            msg = "Error installing packages ( OS not known ) "
+            self.error(msg, self.os)
+            raise RuntimeError, msg
+
+        return command
+
+    def install_packages(self, packages, home, run_home = None):
+        """ Install packages in the Linux host.
+
+        'home' is the directory to upload the package installation script.
+        'run_home' is the directory from where to execute the script.
+        """
+        command = self.install_packages_command(packages)
+
+        run_home = run_home or home
+
+        (out, err), proc = self.run_and_wait(command, run_home, 
+            shfile = os.path.join(home, "instpkg.sh"),
+            pidfile = "instpkg_pidfile",
+            ecodefile = "instpkg_exitcode",
+            stdout = "instpkg_stdout", 
+            stderr = "instpkg_stderr",
+            overwrite = False,
+            raise_on_error = True)
+
+        return (out, err), proc 
+
+    def remove_packages(self, packages, home, run_home = None):
+        """ Uninstall packages from the Linux host.
+
+        'home' is the directory to upload the package un-installation script.
+        'run_home' is the directory from where to execute the script.
+        """
+        if self.use_rpm:
+            command = rpmfuncs.remove_packages_command(self.os, packages)
+        elif self.use_deb:
+            command = debfuncs.remove_packages_command(self.os, packages)
+        else:
+            msg = "Error removing packages ( OS not known ) "
+            self.error(msg)
+            raise RuntimeError, msg
+
+        run_home = run_home or home
+
+        (out, err), proc = self.run_and_wait(command, run_home, 
+            shfile = os.path.join(home, "rmpkg.sh"),
+            pidfile = "rmpkg_pidfile",
+            ecodefile = "rmpkg_exitcode",
+            stdout = "rmpkg_stdout", 
+            stderr = "rmpkg_stderr",
+            overwrite = False,
+            raise_on_error = True)
+         
+        return (out, err), proc 
+
+    def mkdir(self, path, clean = False):
+        if clean:
+            self.rmdir(path)
+
+        return self.execute("mkdir -p %s" % path, with_lock = True)
+
+    def rmdir(self, path):
+        return self.execute("rm -rf %s" % path, with_lock = True)
+        
+    def run_and_wait(self, command, home, 
+            shfile = "cmd.sh",
+            env = None,
+            overwrite = True,
+            pidfile = "pidfile", 
+            ecodefile = "exitcode", 
+            stdin = None, 
+            stdout = "stdout", 
+            stderr = "stderr", 
+            sudo = False,
+            tty = False,
+            raise_on_error = False):
+        """
+        Uploads the 'command' to a bash script in the host.
+        Then runs the script detached in background in the host, and
+        busy-waites until the script finishes executing.
+        """
+
+        if not shfile.startswith("/"):
+            shfile = os.path.join(home, shfile)
+
+        self.upload_command(command, 
+            shfile = shfile, 
+            ecodefile = ecodefile, 
+            env = env,
+            overwrite = overwrite)
+
+        command = "bash %s" % shfile
+        # run command in background in remote host
+        (out, err), proc = self.run(command, home, 
+                pidfile = pidfile,
+                stdin = stdin, 
+                stdout = stdout, 
+                stderr = stderr, 
+                sudo = sudo,
+                tty = tty)
+
+        # check no errors occurred
+        if proc.poll():
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, out, err)
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        # Wait for pid file to be generated
+        pid, ppid = self.wait_pid(
+                home = home, 
+                pidfile = pidfile, 
+                raise_on_error = raise_on_error)
+
+        # wait until command finishes to execute
+        self.wait_run(pid, ppid)
+      
+        (eout, err), proc = self.check_errors(home,
+            ecodefile = ecodefile,
+            stderr = stderr)
+
+        # Out is what was written in the stderr file
+        if err:
+            msg = " Failed to run command '%s' " % command
+            self.error(msg, eout, err)
+
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        (out, oerr), proc = self.check_output(home, stdout)
+        
+        return (out, err), proc
+
+    def exitcode(self, home, ecodefile = "exitcode"):
+        """
+        Get the exit code of an application.
+        Returns an integer value with the exit code 
+        """
+        (out, err), proc = self.check_output(home, ecodefile)
+
+        # Succeeded to open file, return exit code in the file
+        if proc.wait() == 0:
+            try:
+                return int(out.strip())
+            except:
+                # Error in the content of the file!
+                return ExitCode.CORRUPTFILE
+
+        # No such file or directory
+        if proc.returncode == 1:
+            return ExitCode.FILENOTFOUND
+        
+        # Other error from 'cat'
+        return ExitCode.ERROR
+
+    def upload_command(self, command, 
+            shfile = "cmd.sh",
+            ecodefile = "exitcode",
+            overwrite = True,
+            env = None):
+        """ Saves the command as a bash script file in the remote host, and
+        forces to save the exit code of the command execution to the ecodefile
+        """
+
+        if not (command.strip().endswith(";") or command.strip().endswith("&")):
+            command += ";"
+      
+        # The exit code of the command will be stored in ecodefile
+        command = " { %(command)s } ; echo $? > %(ecodefile)s ;" % {
+                'command': command,
+                'ecodefile': ecodefile,
+                } 
+
+        # Export environment
+        environ = self.format_environment(env)
+
+        # Add environ to command
+        command = environ + command
+
+        return self.upload(command, shfile, text = True, overwrite = overwrite)
+
+    def format_environment(self, env, inline = False):
+        """ Formats the environment variables for a command to be executed
+        either as an inline command
+        (i.e. export PYTHONPATH=src/..; export LALAL= ..;python script.py) or 
+        as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
+        """
+        if not env: return ""
+
+        # Remove extra white spaces
+        env = re.sub(r'\s+', ' ', env.strip())
+
+        sep = ";" if inline else "\n"
+        return sep.join(map(lambda e: " export %s" % e, env.split(" "))) + sep 
+
+    def check_errors(self, home, 
+            ecodefile = "exitcode", 
+            stderr = "stderr"):
+        """ Checks whether errors occurred while running a command.
+        It first checks the exit code for the command, and only if the
+        exit code is an error one it returns the error output.
+
+        """
+        proc = None
+        err = ""
+
+        # get exit code saved in the 'exitcode' file
+        ecode = self.exitcode(home, ecodefile)
+
+        if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
+            err = "Error retrieving exit code status from file %s/%s" % (home, ecodefile)
+        elif ecode > 0 or ecode == ExitCode.FILENOTFOUND:
+            # The process returned an error code or didn't exist. 
+            # Check standard error.
+            (err, eerr), proc = self.check_output(home, stderr)
+
+            # If the stderr file was not found, assume nothing bad happened,
+            # and just ignore the error.
+            # (cat returns 1 for error "No such file or directory")
+            if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
+                err = "" 
+            
+        return ("", err), proc
+    def wait_pid(self, home, pidfile = "pidfile", raise_on_error = False):
+        """ Waits until the pid file for the command is generated, 
+            and returns the pid and ppid of the process """
+        pid = ppid = None
+        delay = 1.0
+
+        for i in xrange(2):
+            pidtuple = self.getpid(home = home, pidfile = pidfile)
+            
+            if pidtuple:
+                pid, ppid = pidtuple
+                break
+            else:
+                time.sleep(delay)
+                delay = delay * 1.5
+        else:
+            msg = " Failed to get pid for pidfile %s/%s " % (
+                    home, pidfile )
+            self.error(msg)
+            
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        return pid, ppid
+
+    def wait_run(self, pid, ppid, trial = 0):
+        """ wait for a remote process to finish execution """
+        delay = 1.0
+
+        while True:
+            status = self.status(pid, ppid)
+            
+            if status is ProcStatus.FINISHED:
+                break
+            elif status is not ProcStatus.RUNNING:
+                delay = delay * 1.5
+                time.sleep(delay)
+                # If it takes more than 20 seconds to start, then
+                # asume something went wrong
+                if delay > 20:
+                    break
+            else:
+                # The app is running, just wait...
+                time.sleep(0.5)
+
+    def check_output(self, home, filename):
+        """ Retrives content of file """
+        (out, err), proc = self.execute("cat %s" % 
+            os.path.join(home, filename), retry = 1, with_lock = True)
+        return (out, err), proc
+
+    def is_alive(self):
+        """ Checks if host is responsive
+        """
+        if self.localhost:
+            return True
+
+        out = err = ""
+        msg = "Unresponsive host. Wrong answer. "
+
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        try:
+            (out, err), proc = self.execute("echo 'ALIVE'",
+                    blocking = True,
+                    with_lock = True)
+    
+            if out.find("ALIVE") > -1:
+                return True
+        except:
+            trace = traceback.format_exc()
+            msg = "Unresponsive host. Error reaching host: %s " % trace
+
+        self.error(msg, out, err)
+        return False
+
+    def find_home(self):
+        """ Retrieves host home directory
+        """
+        # The underlying SSH layer will sometimes return an empty
+        # output (even if the command was executed without errors).
+        # To work arround this, repeat the operation N times or
+        # until the result is not empty string
+        msg = "Impossible to retrieve HOME directory"
+        try:
+            (out, err), proc = self.execute("echo ${HOME}",
+                    blocking = True,
+                    with_lock = True)
+    
+            if out.strip() != "":
+                self._home_dir =  out.strip()
+        except:
+            trace = traceback.format_exc()
+            msg = "Impossible to retrieve HOME directory" % trace
+
+        if not self._home_dir:
+            self.error(msg, out, err)
+            raise RuntimeError, msg
+
+    def filter_existing_files(self, src, dst):
+        """ Removes files that already exist in the Linux host from src list
+        """
+        # construct a dictionary with { dst: src }
+        dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
+            src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
+
+        command = []
+        for d in dests.keys():
+            command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
+
+        command = ";".join(command)
+
+        (out, err), proc = self.execute(command, retry = 1, with_lock = True)
+    
+        for d in dests.keys():
+            if out.find(d) > -1:
+                del dests[d]
+
+        if not dests:
+            return ""
+
+        return " ".join(dests.values())