Changing reschedule_delay internals
[nepi.git] / src / nepi / resources / linux / node.py
index 60fad69..96c9acd 100644 (file)
@@ -19,7 +19,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
-        ResourceState, reschedule_delay
+        ResourceState
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
 from nepi.resources.linux import rpmfuncs, debfuncs 
 from nepi.util import sshfuncs, execfuncs
 from nepi.util.sshfuncs import ProcStatus
@@ -28,6 +28,7 @@ import collections
 import os
 import random
 import re
 import os
 import random
 import re
+import socket
 import tempfile
 import time
 import threading
 import tempfile
 import time
 import threading
@@ -149,50 +150,54 @@ class LinuxNode(ResourceManager):
     @classmethod
     def _register_attributes(cls):
         hostname = Attribute("hostname", "Hostname of the machine",
     @classmethod
     def _register_attributes(cls):
         hostname = Attribute("hostname", "Hostname of the machine",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
 
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
 
 
         username = Attribute("username", "Local account username", 
                 flags = Flags.Credential)
 
-        port = Attribute("port", "SSH port", flags = Flags.ExecReadOnly)
+        port = Attribute("port", "SSH port", flags = Flags.Design)
         
         home = Attribute("home",
                 "Experiment home directory to store all experiment related files",
         
         home = Attribute("home",
                 "Experiment home directory to store all experiment related files",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         
         identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
         
         server_key = Attribute("serverKey", "Server public key", 
         
         identity = Attribute("identity", "SSH identity file",
                 flags = Flags.Credential)
         
         server_key = Attribute("serverKey", "Server public key", 
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
                 type = Types.Bool,
                 default = False,
         
         clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
                 " from node home folder before starting experiment", 
                 type = Types.Bool,
                 default = False,
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
                 type = Types.Bool,
                 default = False,
 
         clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
                 " from a previous same experiment, before the new experiment starts", 
                 type = Types.Bool,
                 default = False,
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
                 type = Types.Bool,
                 default = False,
         
         clean_processes = Attribute("cleanProcesses", 
                 "Kill all running processes before starting experiment",
                 type = Types.Bool,
                 default = False,
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
                 "releasing the resource",
         
         tear_down = Attribute("tearDown", "Bash script to be executed before " + \
                 "releasing the resource",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
 
         gateway_user = Attribute("gatewayUser", "Gateway account username",
 
         gateway_user = Attribute("gatewayUser", "Gateway account username",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
 
         gateway = Attribute("gateway", "Hostname of the gateway machine",
 
         gateway = Attribute("gateway", "Hostname of the gateway machine",
-                flags = Flags.ExecReadOnly)
+                flags = Flags.Design)
+
+        ip = Attribute("ip", "Linux host public IP address. "
+                   "Must not be modified by the user unless hostname is 'localhost'",
+                    flags = Flags.Design)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
 
         cls._register_attribute(hostname)
         cls._register_attribute(username)
@@ -206,6 +211,7 @@ class LinuxNode(ResourceManager):
         cls._register_attribute(tear_down)
         cls._register_attribute(gateway_user)
         cls._register_attribute(gateway)
         cls._register_attribute(tear_down)
         cls._register_attribute(gateway_user)
         cls._register_attribute(gateway)
+        cls._register_attribute(ip)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
@@ -213,9 +219,6 @@ class LinuxNode(ResourceManager):
         # home directory at Linux host
         self._home_dir = ""
 
         # home directory at Linux host
         self._home_dir = ""
 
-        # list of pids before running the app if the user is root
-        self._pids = []
-        
         # lock to prevent concurrent applications on the same node,
         # to execute commands at the same time. There are potential
         # concurrency issues when using SSH to a same host from 
         # lock to prevent concurrent applications on the same node,
         # to execute commands at the same time. There are potential
         # concurrency issues when using SSH to a same host from 
@@ -236,9 +239,13 @@ class LinuxNode(ResourceManager):
            home = os.path.join(self._home_dir, home) 
         return home
 
            home = os.path.join(self._home_dir, home) 
         return home
 
+    @property
+    def nepi_home(self):
+        return os.path.join(self.home_dir, ".nepi")
+
     @property
     def usr_dir(self):
     @property
     def usr_dir(self):
-        return os.path.join(self.home_dir, "nepi-usr")
+        return os.path.join(self.nepi_home, "nepi-usr")
 
     @property
     def lib_dir(self):
 
     @property
     def lib_dir(self):
@@ -258,7 +265,7 @@ class LinuxNode(ResourceManager):
 
     @property
     def exp_dir(self):
 
     @property
     def exp_dir(self):
-        return os.path.join(self.home_dir, "nepi-exp")
+        return os.path.join(self.nepi_home, "nepi-exp")
 
     @property
     def exp_home(self):
 
     @property
     def exp_home(self):
@@ -277,7 +284,7 @@ class LinuxNode(ResourceManager):
         if self._os:
             return self._os
 
         if self._os:
             return self._os
 
-        if (not self.get("hostname") or not self.get("username")):
+        if not self.localhost and not self.get("username"):
             msg = "Can't resolve OS, insufficient data "
             self.error(msg)
             raise RuntimeError, msg
             msg = "Can't resolve OS, insufficient data "
             self.error(msg)
             raise RuntimeError, msg
@@ -331,7 +338,7 @@ class LinuxNode(ResourceManager):
 
     @property
     def localhost(self):
 
     @property
     def localhost(self):
-        return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
+        return self.get("hostname") in ['localhost', '127.0.0.1', '::1']
 
     def do_provision(self):
         # check if host is alive
 
     def do_provision(self):
         # check if host is alive
@@ -351,14 +358,29 @@ class LinuxNode(ResourceManager):
         if self.get("cleanExperiment"):
             self.clean_experiment()
     
         if self.get("cleanExperiment"):
             self.clean_experiment()
     
-        # Create shared directory structure
-        self.mkdir(self.lib_dir)
-        self.mkdir(self.bin_dir)
-        self.mkdir(self.src_dir)
-        self.mkdir(self.share_dir)
+        # Create shared directory structure and node home directory
+        paths = [self.lib_dir, 
+            self.bin_dir, 
+            self.src_dir, 
+            self.share_dir, 
+            self.node_home]
 
 
-        # Create experiment node home directory
-        self.mkdir(self.node_home)
+        self.mkdir(paths)
+
+        # Get Public IP address if possible
+        if not self.get("ip"):
+            ip = None
+
+            if self.localhost:
+                ip = socket.gethostbyname(socket.gethostname())
+            else:
+                try:
+                    ip = socket.gethostbyname(self.get("hostname"))
+                except:
+                    msg = "DNS can not resolve hostname %s" % self.get("hostname") 
+                    self.debug(msg)
+
+            self.set("ip", ip)
 
         super(LinuxNode, self).do_provision()
 
 
         super(LinuxNode, self).do_provision()
 
@@ -374,7 +396,7 @@ class LinuxNode(ResourceManager):
         ifaces = self.get_connected(LinuxInterface.get_rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
         ifaces = self.get_connected(LinuxInterface.get_rtype())
         for iface in ifaces:
             if iface.state < ResourceState.READY:
-                self.ec.schedule(reschedule_delay, self.deploy)
+                self.ec.schedule(self.reschedule_delay, self.deploy)
                 return 
 
         super(LinuxNode, self).do_deploy()
                 return 
 
         super(LinuxNode, self).do_deploy()
@@ -385,7 +407,7 @@ class LinuxNode(ResourceManager):
             # Node needs to wait until all associated RMs are released
             # before it can be released
             if rm.state != ResourceState.RELEASED:
             # Node needs to wait until all associated RMs are released
             # before it can be released
             if rm.state != ResourceState.RELEASED:
-                self.ec.schedule(reschedule_delay, self.release)
+                self.ec.schedule(self.reschedule_delay, self.release)
                 return 
 
         tear_down = self.get("tearDown")
                 return 
 
         tear_down = self.get("tearDown")
@@ -402,28 +424,37 @@ class LinuxNode(ResourceManager):
 
     def clean_processes(self):
         self.info("Cleaning up processes")
 
     def clean_processes(self):
         self.info("Cleaning up processes")
+
+        if self.localhost:
+            return 
+        
         if self.get("username") != 'root':
             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
         if self.get("username") != 'root':
             cmd = ("sudo -S killall tcpdump || /bin/true ; " +
-                "sudo -S kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
         else:
                 "sudo -S killall -u %s || /bin/true ; " % self.get("username"))
         else:
-            pids_temp = []
             if self.state >= ResourceState.READY:
             if self.state >= ResourceState.READY:
-                ps_aux = "ps aux |awk '{print $2}' |sort -u"
+                import pickle
+                pids = pickle.load(open("/tmp/save.proc", "rb"))
+                pids_temp = dict()
+                ps_aux = "ps aux |awk '{print $2,$11}'"
                 (out, err), proc = self.execute(ps_aux)
                 (out, err), proc = self.execute(ps_aux)
-                pids_temp = out.split()
-                kill_pids = list(set(pids_temp) - set(self._pids))
-                kill_pids = ' '.join(kill_pids)
-
-                cmd = ("killall tcpdump || /bin/true ; " +
-                    "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
-                    "kill %s || /bin/true ; " % kill_pids)
+                if len(out) != 0:
+                    for line in out.strip().split("\n"):
+                        parts = line.strip().split(" ")
+                        pids_temp[parts[0]] = parts[1]
+                    kill_pids = set(pids_temp.items()) - set(pids.items())
+                    kill_pids = ' '.join(dict(kill_pids).keys())
+
+                    cmd = ("killall tcpdump || /bin/true ; " +
+                        "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; " +
+                        "kill %s || /bin/true ; " % kill_pids)
+                else:
+                    cmd = ("killall tcpdump || /bin/true ; " +
+                        "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
             else:
                 cmd = ("killall tcpdump || /bin/true ; " +
                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
 
             else:
                 cmd = ("killall tcpdump || /bin/true ; " +
                     "kill $(ps aux | grep '[n]epi' | awk '{print $2}') || /bin/true ; ")
 
-        out = err = ""
         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
 
     def clean_home(self):
         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True)
 
     def clean_home(self):
@@ -431,7 +462,7 @@ class LinuxNode(ResourceManager):
         """
         self.info("Cleaning up home")
         
         """
         self.info("Cleaning up home")
         
-        cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
+        cmd = "cd %s ; find . -maxdepth 1 -name \.nepi -execdir rm -rf {} + " % (
                 self.home_dir )
 
         return self.execute(cmd, with_lock = True)
                 self.home_dir )
 
         return self.execute(cmd, with_lock = True)
@@ -451,13 +482,10 @@ class LinuxNode(ResourceManager):
 
     def execute(self, command,
             sudo = False,
 
     def execute(self, command,
             sudo = False,
-            stdin = None, 
             env = None,
             tty = False,
             forward_x11 = False,
             env = None,
             tty = False,
             forward_x11 = False,
-            timeout = None,
             retry = 3,
             retry = 3,
-            err_on_timeout = True,
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
@@ -472,10 +500,13 @@ class LinuxNode(ResourceManager):
             (out, err), proc = execfuncs.lexec(command, 
                     user = self.get("username"), # still problem with localhost
                     sudo = sudo,
             (out, err), proc = execfuncs.lexec(command, 
                     user = self.get("username"), # still problem with localhost
                     sudo = sudo,
-                    stdin = stdin,
                     env = env)
         else:
             if with_lock:
                     env = env)
         else:
             if with_lock:
+                # If the execute command is blocking, we don't want to keep
+                # the node lock. This lock is used to avoid race conditions
+                # when creating the ControlMaster sockets. A more elegant
+                # solution is needed.
                 with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
                 with self._node_lock:
                     (out, err), proc = sshfuncs.rexec(
                         command, 
@@ -486,15 +517,12 @@ class LinuxNode(ResourceManager):
                         gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
                         gw = self.get("gateway"),
                         agent = True,
                         sudo = sudo,
-                        stdin = stdin,
                         identity = self.get("identity"),
                         server_key = self.get("serverKey"),
                         env = env,
                         tty = tty,
                         forward_x11 = forward_x11,
                         identity = self.get("identity"),
                         server_key = self.get("serverKey"),
                         env = env,
                         tty = tty,
                         forward_x11 = forward_x11,
-                        timeout = timeout,
                         retry = retry,
                         retry = retry,
-                        err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
                         blocking = blocking, 
                         connect_timeout = connect_timeout,
                         persistent = persistent,
                         blocking = blocking, 
@@ -510,15 +538,12 @@ class LinuxNode(ResourceManager):
                     gw = self.get("gateway"),
                     agent = True,
                     sudo = sudo,
                     gw = self.get("gateway"),
                     agent = True,
                     sudo = sudo,
-                    stdin = stdin,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     env = env,
                     tty = tty,
                     forward_x11 = forward_x11,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     env = env,
                     tty = tty,
                     forward_x11 = forward_x11,
-                    timeout = timeout,
                     retry = retry,
                     retry = retry,
-                    err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
                     persistent = persistent,
                     blocking = blocking, 
                     connect_timeout = connect_timeout,
                     persistent = persistent,
                     blocking = blocking, 
@@ -534,19 +559,19 @@ class LinuxNode(ResourceManager):
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
             stdout = 'stdout', 
             stderr = 'stderr', 
             sudo = False,
-            tty = False):
+            tty = False,
+            strict_host_checking = False):
         
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
         
         self.debug("Running command '%s'" % command)
         
         if self.localhost:
-            (out, err), proc = execfuncs.lspawn(command, pidfile, 
-                    stdout = stdout, 
-                    stderr = stderr, 
-                    stdin = stdin, 
+            (out, err), proc = execfuncs.lspawn(command, pidfile,
                     home = home, 
                     create_home = create_home, 
                     home = home, 
                     create_home = create_home, 
-                    sudo = sudo,
-                    user = user) 
+                    stdin = stdin or '/dev/null',
+                    stdout = stdout or '/dev/null',
+                    stderr = stderr or '/dev/null',
+                    sudo = sudo) 
         else:
             with self._node_lock:
                 (out, err), proc = sshfuncs.rspawn(
         else:
             with self._node_lock:
                 (out, err), proc = sshfuncs.rspawn(
@@ -554,9 +579,9 @@ class LinuxNode(ResourceManager):
                     pidfile = pidfile,
                     home = home,
                     create_home = create_home,
                     pidfile = pidfile,
                     home = home,
                     create_home = create_home,
-                    stdin = stdin if stdin is not None else '/dev/null',
-                    stdout = stdout if stdout else '/dev/null',
-                    stderr = stderr if stderr else '/dev/null',
+                    stdin = stdin or '/dev/null',
+                    stdout = stdout or '/dev/null',
+                    stderr = stderr or '/dev/null',
                     sudo = sudo,
                     host = self.get("hostname"),
                     user = self.get("username"),
                     sudo = sudo,
                     host = self.get("hostname"),
                     user = self.get("username"),
@@ -566,7 +591,8 @@ class LinuxNode(ResourceManager):
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
                     agent = True,
                     identity = self.get("identity"),
                     server_key = self.get("serverKey"),
-                    tty = tty
+                    tty = tty,
+                    strict_host_checking = strict_host_checking
                     )
 
         return (out, err), proc
                     )
 
         return (out, err), proc
@@ -585,7 +611,8 @@ class LinuxNode(ResourceManager):
                     gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
                     gw = self.get("gateway"),
                     agent = True,
                     identity = self.get("identity"),
-                    server_key = self.get("serverKey")
+                    server_key = self.get("serverKey"),
+                    strict_host_checking = False
                     )
         
         return pidtuple
                     )
         
         return pidtuple
@@ -604,7 +631,8 @@ class LinuxNode(ResourceManager):
                         gw = self.get("gateway"),
                         agent = True,
                         identity = self.get("identity"),
                         gw = self.get("gateway"),
                         agent = True,
                         identity = self.get("identity"),
-                        server_key = self.get("serverKey")
+                        server_key = self.get("serverKey"),
+                        strict_host_checking = False
                         )
            
         return status
                         )
            
         return status
@@ -629,16 +657,16 @@ class LinuxNode(ResourceManager):
                         agent = True,
                         sudo = sudo,
                         identity = self.get("identity"),
                         agent = True,
                         sudo = sudo,
                         identity = self.get("identity"),
-                        server_key = self.get("serverKey")
+                        server_key = self.get("serverKey"),
+                        strict_host_checking = False
                         )
 
         return (out, err), proc
 
     def copy(self, src, dst):
         if self.localhost:
                         )
 
         return (out, err), proc
 
     def copy(self, src, dst):
         if self.localhost:
-            (out, err), proc = execfuncs.lcopy(source, dest, 
-                    recursive = True,
-                    strict_host_checking = False)
+            (out, err), proc = execfuncs.lcopy(src, dst, 
+                    recursive = True)
         else:
             with self._node_lock:
                 (out, err), proc = sshfuncs.rcopy(
         else:
             with self._node_lock:
                 (out, err), proc = sshfuncs.rcopy(
@@ -653,14 +681,21 @@ class LinuxNode(ResourceManager):
 
         return (out, err), proc
 
 
         return (out, err), proc
 
-    def upload(self, src, dst, text = False, overwrite = True):
+    def upload(self, src, dst, text = False, overwrite = True,
+            raise_on_error = True):
         """ Copy content to destination
 
         """ Copy content to destination
 
-           src  content to copy. Can be a local file, directory or a list of files
+        src  string with the content to copy. Can be:
+            - plain text
+            - a string with the path to a local file
+            - a string with a semi-colon separeted list of local files
+            - a string with a local directory
 
 
-           dst  destination path on the remote host (remote is always self.host)
+        dst  string with destination path on the remote host (remote is 
+            always self.host)
 
 
-           text src is text input, it must be stored into a temp file before uploading
+        text src is text input, it must be stored into a temp file before 
+        uploading
         """
         # If source is a string input 
         f = None
         """
         # If source is a string input 
         f = None
@@ -673,29 +708,50 @@ class LinuxNode(ResourceManager):
             src = f.name
 
         # If dst files should not be overwritten, check that the files do not
             src = f.name
 
         # If dst files should not be overwritten, check that the files do not
-        # exits already 
+        # exits already
+        if isinstance(src, str):
+            src = map(str.strip, src.split(";"))
+    
         if overwrite == False:
             src = self.filter_existing_files(src, dst)
             if not src:
         if overwrite == False:
             src = self.filter_existing_files(src, dst)
             if not src:
-                return ("", ""), None 
+                return ("", ""), None
 
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
 
 
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
 
-        result = self.copy(src, dst)
+        ((out, err), proc) = self.copy(src, dst)
 
         # clean up temp file
         if f:
             os.remove(f.name)
 
 
         # clean up temp file
         if f:
             os.remove(f.name)
 
-        return result
+        if err:
+            msg = " Failed to upload files - src: %s dst: %s" %  (";".join(src), dst) 
+            self.error(msg, out, err)
+            
+            msg = "%s out: %s err: %s" % (msg, out, err)
+            if raise_on_error:
+                raise RuntimeError, msg
 
 
-    def download(self, src, dst):
+        return ((out, err), proc)
+
+    def download(self, src, dst, raise_on_error = True):
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
-        return self.copy(src, dst)
+
+        ((out, err), proc) = self.copy(src, dst)
+
+        if err:
+            msg = " Failed to download files - src: %s dst: %s" %  (";".join(src), dst) 
+            self.error(msg, out, err)
+
+            if raise_on_error:
+                raise RuntimeError, msg
+
+        return ((out, err), proc)
 
     def install_packages_command(self, packages):
         command = ""
 
     def install_packages_command(self, packages):
         command = ""
@@ -710,7 +766,8 @@ class LinuxNode(ResourceManager):
 
         return command
 
 
         return command
 
-    def install_packages(self, packages, home, run_home = None):
+    def install_packages(self, packages, home, run_home = None,
+            raise_on_error = True):
         """ Install packages in the Linux host.
 
         'home' is the directory to upload the package installation script.
         """ Install packages in the Linux host.
 
         'home' is the directory to upload the package installation script.
@@ -727,11 +784,12 @@ class LinuxNode(ResourceManager):
             stdout = "instpkg_stdout", 
             stderr = "instpkg_stderr",
             overwrite = False,
             stdout = "instpkg_stdout", 
             stderr = "instpkg_stderr",
             overwrite = False,
-            raise_on_error = True)
+            raise_on_error = raise_on_error)
 
         return (out, err), proc 
 
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home, run_home = None):
+    def remove_packages(self, packages, home, run_home = None,
+            raise_on_error = True):
         """ Uninstall packages from the Linux host.
 
         'home' is the directory to upload the package un-installation script.
         """ Uninstall packages from the Linux host.
 
         'home' is the directory to upload the package un-installation script.
@@ -755,18 +813,35 @@ class LinuxNode(ResourceManager):
             stdout = "rmpkg_stdout", 
             stderr = "rmpkg_stderr",
             overwrite = False,
             stdout = "rmpkg_stdout", 
             stderr = "rmpkg_stderr",
             overwrite = False,
-            raise_on_error = True)
+            raise_on_error = raise_on_error)
          
         return (out, err), proc 
 
          
         return (out, err), proc 
 
-    def mkdir(self, path, clean = False):
+    def mkdir(self, paths, clean = False):
+        """ Paths is either a single remote directory path to create,
+        or a list of directories to create.
+        """
         if clean:
         if clean:
-            self.rmdir(path)
+            self.rmdir(paths)
+
+        if isinstance(paths, str):
+            paths = [paths]
+
+        cmd = " ; ".join(map(lambda path: "mkdir -p %s" % path, paths))
 
 
-        return self.execute("mkdir -p %s" % path, with_lock = True)
+        return self.execute(cmd, with_lock = True)
+
+    def rmdir(self, paths):
+        """ Paths is either a single remote directory path to delete,
+        or a list of directories to delete.
+        """
 
 
-    def rmdir(self, path):
-        return self.execute("rm -rf %s" % path, with_lock = True)
+        if isinstance(paths, str):
+            paths = [paths]
+
+        cmd = " ; ".join(map(lambda path: "rm -rf %s" % path, paths))
+
+        return self.execute(cmd, with_lock = True)
         
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
         
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
@@ -779,7 +854,7 @@ class LinuxNode(ResourceManager):
             stderr = "stderr", 
             sudo = False,
             tty = False,
             stderr = "stderr", 
             sudo = False,
             tty = False,
-            raise_on_error = False):
+            raise_on_error = True):
         """
         Uploads the 'command' to a bash script in the host.
         Then runs the script detached in background in the host, and
         """
         Uploads the 'command' to a bash script in the host.
         Then runs the script detached in background in the host, and
@@ -1033,8 +1108,8 @@ class LinuxNode(ResourceManager):
         """ Removes files that already exist in the Linux host from src list
         """
         # construct a dictionary with { dst: src }
         """ Removes files that already exist in the Linux host from src list
         """
         # construct a dictionary with { dst: src }
-        dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
-            src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
+        dests = dict(map(lambda s: (os.path.join(dst, os.path.basename(s)), s), src)) \
+                    if len(src) > 1 else dict({dst: src[0]})
 
         command = []
         for d in dests.keys():
 
         command = []
         for d in dests.keys():
@@ -1049,7 +1124,7 @@ class LinuxNode(ResourceManager):
                 del dests[d]
 
         if not dests:
                 del dests[d]
 
         if not dests:
-            return ""
+            return []
 
 
-        return " ".join(dests.values())
+        return dests.values()