LinuxApplication: Changed directory structure to store experiment files in the Linux...
[nepi.git] / src / nepi / resources / linux / node.py
index 0d2597f..5080a6c 100644 (file)
@@ -31,13 +31,11 @@ import re
 import tempfile
 import time
 import threading
+import traceback
 
-# TODO: Verify files and dirs exists already
-# TODO: Blacklist nodes!
 # TODO: Unify delays!!
 # TODO: Validate outcome of uploads!! 
 
-
 class ExitCode:
     """
     Error codes that the rexitcode function can return if unable to
@@ -165,8 +163,12 @@ class LinuxNode(ResourceManager):
         server_key = Attribute("serverKey", "Server public key", 
                 flags = Flags.ExecReadOnly)
         
-        clean_home = Attribute("cleanHome", "Remove all files and directories " + \
-                " from home folder before starting experiment", 
+        clean_home = Attribute("cleanHome", "Remove all nepi files and directories "
+                " from node home folder before starting experiment", 
+                flags = Flags.ExecReadOnly)
+
+        clean_experiment = Attribute("cleanExperiment", "Remove all files and directories " 
+                " from a previous same experiment, before the new experiment starts", 
                 flags = Flags.ExecReadOnly)
         
         clean_processes = Attribute("cleanProcesses", 
@@ -184,12 +186,15 @@ class LinuxNode(ResourceManager):
         cls._register_attribute(identity)
         cls._register_attribute(server_key)
         cls._register_attribute(clean_home)
+        cls._register_attribute(clean_experiment)
         cls._register_attribute(clean_processes)
         cls._register_attribute(tear_down)
 
     def __init__(self, ec, guid):
         super(LinuxNode, self).__init__(ec, guid)
         self._os = None
+        # home directory at Linux host
+        self._home_dir = ""
         
         # lock to avoid concurrency issues on methods used by applications 
         self._lock = threading.Lock()
@@ -199,17 +204,47 @@ class LinuxNode(ResourceManager):
                 self.get("hostname"), msg)
 
     @property
-    def home(self):
-        return self.get("home") or ""
+    def home_dir(self):
+        home = self.get("home") or ""
+        if not home.startswith("/"):
+           home = os.path.join(self._home_dir, home) 
+        return home
+
+    @property
+    def usr_dir(self):
+        return os.path.join(self.home_dir, "nepi-usr")
+
+    @property
+    def lib_dir(self):
+        return os.path.join(self.usr_dir, "lib")
+
+    @property
+    def bin_dir(self):
+        return os.path.join(self.usr_dir, "bin")
+
+    @property
+    def src_dir(self):
+        return os.path.join(self.usr_dir, "src")
+
+    @property
+    def share_dir(self):
+        return os.path.join(self.usr_dir, "share")
+
+    @property
+    def exp_dir(self):
+        return os.path.join(self.home_dir, "nepi-exp")
 
     @property
     def exp_home(self):
-        return os.path.join(self.home, self.ec.exp_id)
+        return os.path.join(self.exp_dir, self.ec.exp_id)
 
     @property
     def node_home(self):
-        node_home = "node-%d" % self.guid
-        return os.path.join(self.exp_home, node_home)
+        return os.path.join(self.exp_home, "node-%d" % self.guid)
+
+    @property
+    def run_home(self):
+        return os.path.join(self.node_home, self.ec.run_id)
 
     @property
     def os(self):
@@ -259,18 +294,32 @@ class LinuxNode(ResourceManager):
         return self.get("hostname") in ['localhost', '127.0.0.7', '::1']
 
     def provision(self):
+        # check if host is alive
         if not self.is_alive():
-            self._state = ResourceState.FAILED
+            self.fail()
+            
             msg = "Deploy failed. Unresponsive node %s" % self.get("hostname")
             self.error(msg)
             raise RuntimeError, msg
 
+        self.find_home()
+
         if self.get("cleanProcesses"):
             self.clean_processes()
 
         if self.get("cleanHome"):
             self.clean_home()
-       
+        if self.get("cleanExperiment"):
+            self.clean_experiment()
+    
+        # Create shared directory structure
+        self.mkdir(self.lib_dir)
+        self.mkdir(self.bin_dir)
+        self.mkdir(self.src_dir)
+        self.mkdir(self.share_dir)
+
+        # Create experiment node home directory
         self.mkdir(self.node_home)
 
         super(LinuxNode, self).provision()
@@ -300,6 +349,8 @@ class LinuxNode(ResourceManager):
         if tear_down:
             self.execute(tear_down)
 
+        self.clean_processes()
+
         super(LinuxNode, self).release()
 
     def valid_connection(self, guid):
@@ -327,21 +378,220 @@ class LinuxNode(ResourceManager):
         (out, err), proc = self.execute(cmd, retry = 1, with_lock = True) 
             
     def clean_home(self):
+        """ Cleans all NEPI related folders in the Linux host
+        """
         self.info("Cleaning up home")
         
-        cmd = (
-            # "find . -maxdepth 1  \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \)" +
-            "find . -maxdepth 1 -name 'nepi-*' " +
-            " -execdir rm -rf {} + "
-            )
+        cmd = "cd %s ; find . -maxdepth 1 \( -name 'nepi-usr' -o -name 'nepi-exp' \) -execdir rm -rf {} + " % (
+                self.home_dir )
+
+        return self.execute(cmd, with_lock = True)
+
+    def clean_experiment(self):
+        """ Cleans all experiment related files in the Linux host.
+        It preserves NEPI files and folders that have a multi experiment
+        scope.
+        """
+        self.info("Cleaning up experiment files")
+        
+        cmd = "cd %s ; find . -maxdepth 1 -name '%s' -execdir rm -rf {} + " % (
+                self.exp_dir,
+                self.ec.exp_id )
             
-        if self.home:
-            cmd = "cd %s ; " % self.home + cmd
+        return self.execute(cmd, with_lock = True)
+
+    def execute(self, command,
+            sudo = False,
+            stdin = None, 
+            env = None,
+            tty = False,
+            forward_x11 = False,
+            timeout = None,
+            retry = 3,
+            err_on_timeout = True,
+            connect_timeout = 30,
+            strict_host_checking = False,
+            persistent = True,
+            blocking = True,
+            with_lock = False
+            ):
+        """ Notice that this invocation will block until the
+        execution finishes. If this is not the desired behavior,
+        use 'run' instead."""
 
+        if self.localhost:
+            (out, err), proc = execfuncs.lexec(command, 
+                    user = user,
+                    sudo = sudo,
+                    stdin = stdin,
+                    env = env)
+        else:
+            if with_lock:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rexec(
+                        command, 
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        stdin = stdin,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey"),
+                        env = env,
+                        tty = tty,
+                        forward_x11 = forward_x11,
+                        timeout = timeout,
+                        retry = retry,
+                        err_on_timeout = err_on_timeout,
+                        connect_timeout = connect_timeout,
+                        persistent = persistent,
+                        blocking = blocking, 
+                        strict_host_checking = strict_host_checking
+                        )
+            else:
+                (out, err), proc = sshfuncs.rexec(
+                    command, 
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    sudo = sudo,
+                    stdin = stdin,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    env = env,
+                    tty = tty,
+                    forward_x11 = forward_x11,
+                    timeout = timeout,
+                    retry = retry,
+                    err_on_timeout = err_on_timeout,
+                    connect_timeout = connect_timeout,
+                    persistent = persistent,
+                    blocking = blocking, 
+                    strict_host_checking = strict_host_checking
+                    )
+
+        return (out, err), proc
+
+    def run(self, command, home,
+            create_home = False,
+            pidfile = 'pidfile',
+            stdin = None, 
+            stdout = 'stdout', 
+            stderr = 'stderr', 
+            sudo = False,
+            tty = False):
+        
+        self.debug("Running command '%s'" % command)
+        
+        if self.localhost:
+            (out, err), proc = execfuncs.lspawn(command, pidfile, 
+                    stdout = stdout, 
+                    stderr = stderr, 
+                    stdin = stdin, 
+                    home = home, 
+                    create_home = create_home, 
+                    sudo = sudo,
+                    user = user) 
+        else:
+            with self._lock:
+                (out, err), proc = sshfuncs.rspawn(
+                    command,
+                    pidfile = pidfile,
+                    home = home,
+                    create_home = create_home,
+                    stdin = stdin if stdin is not None else '/dev/null',
+                    stdout = stdout if stdout else '/dev/null',
+                    stderr = stderr if stderr else '/dev/null',
+                    sudo = sudo,
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    tty = tty
+                    )
+
+        return (out, err), proc
+
+    def getpid(self, home, pidfile = "pidfile"):
+        if self.localhost:
+            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
+        else:
+            with self._lock:
+                pidtuple = sshfuncs.rgetpid(
+                    os.path.join(home, pidfile),
+                    host = self.get("hostname"),
+                    user = self.get("username"),
+                    port = self.get("port"),
+                    agent = True,
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey")
+                    )
+        
+        return pidtuple
+
+    def status(self, pid, ppid):
+        if self.localhost:
+            status = execfuncs.lstatus(pid, ppid)
+        else:
+            with self._lock:
+                status = sshfuncs.rstatus(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
+           
+        return status
+    
+    def kill(self, pid, ppid, sudo = False):
         out = err = ""
-        (out, err), proc = self.execute(cmd, with_lock = True)
+        proc = None
+        status = self.status(pid, ppid)
+
+        if status == sshfuncs.ProcStatus.RUNNING:
+            if self.localhost:
+                (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
+            else:
+                with self._lock:
+                    (out, err), proc = sshfuncs.rkill(
+                        pid, ppid,
+                        host = self.get("hostname"),
+                        user = self.get("username"),
+                        port = self.get("port"),
+                        agent = True,
+                        sudo = sudo,
+                        identity = self.get("identity"),
+                        server_key = self.get("serverKey")
+                        )
+
+        return (out, err), proc
 
-    def upload(self, src, dst, text = False):
+    def copy(self, src, dst):
+        if self.localhost:
+            (out, err), proc = execfuncs.lcopy(source, dest, 
+                    recursive = True,
+                    strict_host_checking = False)
+        else:
+            with self._lock:
+                (out, err), proc = sshfuncs.rcopy(
+                    src, dst, 
+                    port = self.get("port"),
+                    identity = self.get("identity"),
+                    server_key = self.get("serverKey"),
+                    recursive = True,
+                    strict_host_checking = False)
+
+        return (out, err), proc
+
+
+    def upload(self, src, dst, text = False, overwrite = True):
         """ Copy content to destination
 
            src  content to copy. Can be a local file, directory or a list of files
@@ -360,9 +610,17 @@ class LinuxNode(ResourceManager):
             f.close()
             src = f.name
 
+        # If dst files should not be overwritten, check that the files do not
+        # exits already 
+        if overwrite == False:
+            src = self.filter_existing_files(src, dst)
+            if not src:
+                return ("", ""), None 
+
         if not self.localhost:
             # Build destination as <user>@<server>:<path>
             dst = "%s@%s:%s" % (self.get("username"), self.get("hostname"), dst)
+
         result = self.copy(src, dst)
 
         # clean up temp file
@@ -377,7 +635,12 @@ class LinuxNode(ResourceManager):
             src = "%s@%s:%s" % (self.get("username"), self.get("hostname"), src)
         return self.copy(src, dst)
 
-    def install_packages(self, packages, home):
+    def install_packages(self, packages, home, run_home = None):
+        """ Install packages in the Linux host.
+
+        'home' is the directory to upload the package installation script.
+        'run_home' is the directory from where to execute the script.
+        """
         command = ""
         if self.use_rpm:
             command = rpmfuncs.install_packages_command(self.os, packages)
@@ -388,19 +651,25 @@ class LinuxNode(ResourceManager):
             self.error(msg, self.os)
             raise RuntimeError, msg
 
-        out = err = ""
-        (out, err), proc = self.run_and_wait(command, home, 
-            shfile = "instpkg.sh",
+        run_home = run_home or home
+
+        (out, err), proc = self.run_and_wait(command, run_home, 
+            shfile = os.path.join(home, "instpkg.sh"),
             pidfile = "instpkg_pidfile",
             ecodefile = "instpkg_exitcode",
             stdout = "instpkg_stdout", 
             stderr = "instpkg_stderr",
+            overwrite = False,
             raise_on_error = True)
 
         return (out, err), proc 
 
-    def remove_packages(self, packages, home):
-        command = ""
+    def remove_packages(self, packages, home, run_home = None):
+        """ Uninstall packages from the Linux host.
+
+        'home' is the directory to upload the package un-installation script.
+        'run_home' is the directory from where to execute the script.
+        """
         if self.use_rpm:
             command = rpmfuncs.remove_packages_command(self.os, packages)
         elif self.use_deb:
@@ -410,13 +679,15 @@ class LinuxNode(ResourceManager):
             self.error(msg)
             raise RuntimeError, msg
 
-        out = err = ""
-        (out, err), proc = self.run_and_wait(command, home, 
-            shfile = "rmpkg.sh",
+        run_home = run_home or home
+
+        (out, err), proc = self.run_and_wait(command, run_home, 
+            shfile = os.path.join(home, "rmpkg.sh"),
             pidfile = "rmpkg_pidfile",
             ecodefile = "rmpkg_exitcode",
             stdout = "rmpkg_stdout", 
             stderr = "rmpkg_stderr",
+            overwrite = False,
             raise_on_error = True)
          
         return (out, err), proc 
@@ -433,6 +704,7 @@ class LinuxNode(ResourceManager):
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
             env = None,
+            overwrite = True,
             pidfile = "pidfile", 
             ecodefile = "exitcode", 
             stdin = None, 
@@ -446,12 +718,17 @@ class LinuxNode(ResourceManager):
         Then runs the script detached in background in the host, and
         busy-waites until the script finishes executing.
         """
-        self.upload_command(command, home, 
+
+        if not shfile.startswith("/"):
+            shfile = os.path.join(home, shfile)
+
+        self.upload_command(command, 
             shfile = shfile, 
             ecodefile = ecodefile, 
-            env = env)
+            env = env,
+            overwrite = overwrite)
 
-        command = "bash ./%s" % shfile
+        command = "bash %s" % shfile
         # run command in background in remote host
         (out, err), proc = self.run(command, home, 
                 pidfile = pidfile,
@@ -514,9 +791,10 @@ class LinuxNode(ResourceManager):
         # Other error from 'cat'
         return ExitCode.ERROR
 
-    def upload_command(self, command, home, 
+    def upload_command(self, command, 
             shfile = "cmd.sh",
             ecodefile = "exitcode",
+            overwrite = True,
             env = None):
         """ Saves the command as a bash script file in the remote host, and
         forces to save the exit code of the command execution to the ecodefile
@@ -537,8 +815,7 @@ class LinuxNode(ResourceManager):
         # Add environ to command
         command = environ + command
 
-        dst = os.path.join(home, shfile)
-        return self.upload(command, dst, text = True)
+        return self.upload(command, shfile, text = True, overwrite = overwrite)
 
     def format_environment(self, env, inline = False):
         """Format environmental variables for command to be executed either
@@ -639,215 +916,63 @@ class LinuxNode(ResourceManager):
         return (out, err), proc
 
     def is_alive(self):
+        """ Checks if host is responsive
+        """
         if self.localhost:
             return True
 
         out = err = ""
         try:
-            # TODO: FIX NOT ALIVE!!!!
-            (out, err), proc = self.execute("echo 'ALIVE' || (echo 'NOTALIVE') >&2", retry = 5, 
+            (out, err), proc = self.execute("echo 'ALIVE'",
+                    retry = 5, 
                     with_lock = True)
         except:
-            import traceback
             trace = traceback.format_exc()
             msg = "Unresponsive host  %s " % err
             self.error(msg, out, trace)
             return False
 
-        if out.strip().startswith('ALIVE'):
+        if out.strip() == "ALIVE":
             return True
         else:
             msg = "Unresponsive host "
             self.error(msg, out, err)
             return False
 
-    def copy(self, src, dst):
-        if self.localhost:
-            (out, err), proc = execfuncs.lcopy(source, dest, 
-                    recursive = True,
-                    strict_host_checking = False)
-        else:
-            with self._lock:
-                (out, err), proc = sshfuncs.rcopy(
-                    src, dst, 
-                    port = self.get("port"),
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey"),
-                    recursive = True,
-                    strict_host_checking = False)
-
-        return (out, err), proc
-
-    def execute(self, command,
-            sudo = False,
-            stdin = None, 
-            env = None,
-            tty = False,
-            forward_x11 = False,
-            timeout = None,
-            retry = 3,
-            err_on_timeout = True,
-            connect_timeout = 30,
-            strict_host_checking = False,
-            persistent = True,
-            blocking = True,
-            with_lock = False
-            ):
-        """ Notice that this invocation will block until the
-        execution finishes. If this is not the desired behavior,
-        use 'run' instead."""
+    def find_home(self):
+        """ Retrieves host home directory
+        """
+        (out, err), proc = self.execute("echo ${HOME}", retry = 5, 
+                    with_lock = True)
 
-        if self.localhost:
-            (out, err), proc = execfuncs.lexec(command, 
-                    user = user,
-                    sudo = sudo,
-                    stdin = stdin,
-                    env = env)
-        else:
-            if with_lock:
-                with self._lock:
-                    (out, err), proc = sshfuncs.rexec(
-                        command, 
-                        host = self.get("hostname"),
-                        user = self.get("username"),
-                        port = self.get("port"),
-                        agent = True,
-                        sudo = sudo,
-                        stdin = stdin,
-                        identity = self.get("identity"),
-                        server_key = self.get("serverKey"),
-                        env = env,
-                        tty = tty,
-                        forward_x11 = forward_x11,
-                        timeout = timeout,
-                        retry = retry,
-                        err_on_timeout = err_on_timeout,
-                        connect_timeout = connect_timeout,
-                        persistent = persistent,
-                        blocking = blocking, 
-                        strict_host_checking = strict_host_checking
-                        )
-            else:
-                (out, err), proc = sshfuncs.rexec(
-                    command, 
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    sudo = sudo,
-                    stdin = stdin,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey"),
-                    env = env,
-                    tty = tty,
-                    forward_x11 = forward_x11,
-                    timeout = timeout,
-                    retry = retry,
-                    err_on_timeout = err_on_timeout,
-                    connect_timeout = connect_timeout,
-                    persistent = persistent,
-                    blocking = blocking, 
-                    strict_host_checking = strict_host_checking
-                    )
+        if proc.poll():
+            msg = "Imposible to retrieve HOME directory"
+            self.error(msg, out, err)
+            raise RuntimeError, msg
 
-        return (out, err), proc
+        self._home_dir =  out.strip()
 
-    def run(self, command, home,
-            create_home = False,
-            pidfile = 'pidfile',
-            stdin = None, 
-            stdout = 'stdout', 
-            stderr = 'stderr', 
-            sudo = False,
-            tty = False):
-        
-        self.debug("Running command '%s'" % command)
-        
-        if self.localhost:
-            (out, err), proc = execfuncs.lspawn(command, pidfile, 
-                    stdout = stdout, 
-                    stderr = stderr, 
-                    stdin = stdin, 
-                    home = home, 
-                    create_home = create_home, 
-                    sudo = sudo,
-                    user = user) 
-        else:
-            with self._lock:
-                (out, err), proc = sshfuncs.rspawn(
-                    command,
-                    pidfile = pidfile,
-                    home = home,
-                    create_home = create_home,
-                    stdin = stdin if stdin is not None else '/dev/null',
-                    stdout = stdout if stdout else '/dev/null',
-                    stderr = stderr if stderr else '/dev/null',
-                    sudo = sudo,
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey"),
-                    tty = tty
-                    )
+    def filter_existing_files(self, src, dst):
+        """ Removes files that already exist in the Linux host from src list
+        """
+        # construct a dictionary with { dst: src }
+        dests = dict(map(lambda x: ( os.path.join(dst, os.path.basename(x) ),  x ), 
+            src.strip().split(" ") ) ) if src.strip().find(" ") != -1 else dict({dst: src})
 
-        return (out, err), proc
+        command = []
+        for d in dests.keys():
+            command.append(" [ -f %(dst)s ] && echo '%(dst)s' " % {'dst' : d} )
 
-    def getpid(self, home, pidfile = "pidfile"):
-        if self.localhost:
-            pidtuple =  execfuncs.lgetpid(os.path.join(home, pidfile))
-        else:
-            with self._lock:
-                pidtuple = sshfuncs.rgetpid(
-                    os.path.join(home, pidfile),
-                    host = self.get("hostname"),
-                    user = self.get("username"),
-                    port = self.get("port"),
-                    agent = True,
-                    identity = self.get("identity"),
-                    server_key = self.get("serverKey")
-                    )
-        
-        return pidtuple
+        command = ";".join(command)
 
-    def status(self, pid, ppid):
-        if self.localhost:
-            status = execfuncs.lstatus(pid, ppid)
-        else:
-            with self._lock:
-                status = sshfuncs.rstatus(
-                        pid, ppid,
-                        host = self.get("hostname"),
-                        user = self.get("username"),
-                        port = self.get("port"),
-                        agent = True,
-                        identity = self.get("identity"),
-                        server_key = self.get("serverKey")
-                        )
-           
-        return status
+        (out, err), proc = self.execute(command, retry = 1, with_lock = True)
     
-    def kill(self, pid, ppid, sudo = False):
-        out = err = ""
-        proc = None
-        status = self.status(pid, ppid)
+        for d in dests.keys():
+            if out.find(d) > -1:
+                del dests[d]
 
-        if status == sshfuncs.ProcStatus.RUNNING:
-            if self.localhost:
-                (out, err), proc = execfuncs.lkill(pid, ppid, sudo)
-            else:
-                with self._lock:
-                    (out, err), proc = sshfuncs.rkill(
-                        pid, ppid,
-                        host = self.get("hostname"),
-                        user = self.get("username"),
-                        port = self.get("port"),
-                        agent = True,
-                        sudo = sudo,
-                        identity = self.get("identity"),
-                        server_key = self.get("serverKey")
-                        )
+        if not dests:
+            return ""
 
-        return (out, err), proc
+        return " ".join(dests.values())