rename src/nepi/ into just nepi/
[nepi.git] / nepi / resources / linux / application.py
diff --git a/nepi/resources/linux/application.py b/nepi/resources/linux/application.py
new file mode 100644 (file)
index 0000000..9fd605d
--- /dev/null
@@ -0,0 +1,770 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License version 2 as
+#    published by the Free Software Foundation;
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.trace import Trace, TraceAttr
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState
+from nepi.resources.linux.node import LinuxNode
+from nepi.util.sshfuncs import ProcStatus
+from nepi.util.timefuncs import tnow, tdiffsec
+
+import os
+import subprocess
+
+# TODO: Resolve wildcards in commands!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
+
+@clsinit_copy
+class LinuxApplication(ResourceManager):
+    """
+    .. class:: Class Args :
+      
+        :param ec: The Experiment controller
+        :type ec: ExperimentController
+        :param guid: guid of the RM
+        :type guid: int
+
+    .. note::
+
+        A LinuxApplication RM represents a process that can be executed in
+        a remote Linux host using SSH.
+
+        The LinuxApplication RM takes care of uploadin sources and any files
+        needed to run the experiment, to the remote host. 
+        It also allows to provide source compilation (build) and installation 
+        instructions, and takes care of automating the sources build and 
+        installation tasks for the user.
+
+        It is important to note that files uploaded to the remote host have
+        two possible scopes: single-experiment or multi-experiment.
+        Single experiment files are those that will not be re-used by other 
+        experiments. Multi-experiment files are those that will.
+        Sources and shared files are always made available to all experiments.
+
+        Directory structure:
+
+        The directory structure used by LinuxApplication RM at the Linux
+        host is the following:
+
+        ${HOME}/.nepi/nepi-usr --> Base directory for multi-experiment files
+                      |
+        ${LIB}        |- /lib --> Base directory for libraries
+        ${BIN}        |- /bin --> Base directory for binary files
+        ${SRC}        |- /src --> Base directory for sources
+        ${SHARE}      |- /share --> Base directory for other files
+
+        ${HOME}/.nepi/nepi-exp --> Base directory for single-experiment files
+                      |
+        ${EXP_HOME}   |- /<exp-id>  --> Base directory for experiment exp-id
+                          |
+        ${APP_HOME}       |- /<app-guid> --> Base directory for application 
+                               |     specific files (e.g. command.sh, input)
+                               | 
+        ${RUN_HOME}            |- /<run-id> --> Base directory for run specific
+
+    """
+
+    _rtype = "linux::Application"
+    _help = "Runs an application on a Linux host with a BASH command "
+    _platform = "linux"
+
+    @classmethod
+    def _register_attributes(cls):
+        command = Attribute("command", "Command to execute at application start. "
+                "Note that commands will be executed in the ${RUN_HOME} directory, "
+                "make sure to take this into account when using relative paths. ", 
+                flags = Flags.Design)
+        forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
+                flags = Flags.Design)
+        env = Attribute("env", "Environment variables string for command execution",
+                flags = Flags.Design)
+        sudo = Attribute("sudo", "Run with root privileges", 
+                flags = Flags.Design)
+        depends = Attribute("depends", 
+                "Space-separated list of packages required to run the application",
+                flags = Flags.Design)
+        sources = Attribute("sources", 
+                "semi-colon separated list of regular files to be uploaded to ${SRC} "
+                "directory prior to building. Archives won't be expanded automatically. "
+                "Sources are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all sources). ",
+                flags = Flags.Design)
+        files = Attribute("files", 
+                "semi-colon separated list of regular miscellaneous files to be uploaded "
+                "to ${SHARE} directory. "
+                "Files are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all files). ",
+                flags = Flags.Design)
+        libs = Attribute("libs", 
+                "semi-colon separated list of libraries (e.g. .so files) to be uploaded "
+                "to ${LIB} directory. "
+                "Libraries are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all files). ",
+                flags = Flags.Design)
+        bins = Attribute("bins", 
+                "semi-colon separated list of binary files to be uploaded "
+                "to ${BIN} directory. "
+                "Binaries are globally available for all experiments unless "
+                "cleanHome is set to True (This will delete all files). ",
+                flags = Flags.Design)
+        code = Attribute("code", 
+                "Plain text source code to be uploaded to the ${APP_HOME} directory. ",
+                flags = Flags.Design)
+        build = Attribute("build", 
+                "Build commands to execute after deploying the sources. "
+                "Sources are uploaded to the ${SRC} directory and code "
+                "is uploaded to the ${APP_HOME} directory. \n"
+                "Usage example: tar xzf ${SRC}/my-app.tgz && cd my-app && "
+                "./configure && make && make clean.\n"
+                "Make sure to make the build commands return with a nonzero exit "
+                "code on error.",
+                flags = Flags.Design)
+        install = Attribute("install", 
+                "Commands to transfer built files to their final destinations. "
+                "Install commands are executed after build commands. ",
+                flags = Flags.Design)
+        stdin = Attribute("stdin", "Standard input for the 'command'", 
+                flags = Flags.Design)
+        tear_down = Attribute("tearDown", "Command to be executed just before " 
+                "releasing the resource", 
+                flags = Flags.Design)
+
+        cls._register_attribute(command)
+        cls._register_attribute(forward_x11)
+        cls._register_attribute(env)
+        cls._register_attribute(sudo)
+        cls._register_attribute(depends)
+        cls._register_attribute(sources)
+        cls._register_attribute(code)
+        cls._register_attribute(files)
+        cls._register_attribute(bins)
+        cls._register_attribute(libs)
+        cls._register_attribute(build)
+        cls._register_attribute(install)
+        cls._register_attribute(stdin)
+        cls._register_attribute(tear_down)
+
+    @classmethod
+    def _register_traces(cls):
+        stdout = Trace("stdout", "Standard output stream", enabled = True)
+        stderr = Trace("stderr", "Standard error stream", enabled = True)
+
+        cls._register_trace(stdout)
+        cls._register_trace(stderr)
+
+    def __init__(self, ec, guid):
+        super(LinuxApplication, self).__init__(ec, guid)
+        self._pid = None
+        self._ppid = None
+        self._node = None
+        self._home = "app-%s" % self.guid
+
+        # whether the command should run in foreground attached
+        # to a terminal
+        self._in_foreground = False
+
+        # whether to use sudo to kill the application process
+        self._sudo_kill = False
+
+        # keep a reference to the running process handler when 
+        # the command is not executed as remote daemon in background
+        self._proc = None
+
+        # timestamp of last state check of the application
+        self._last_state_check = tnow()
+        
+    def log_message(self, msg):
+        return " guid %d - host %s - %s " % (self.guid, 
+                self.node.get("hostname"), msg)
+
+    @property
+    def node(self):
+        if not self._node:
+            node = self.get_connected(LinuxNode.get_rtype())
+            if not node: 
+                msg = "Application %s guid %d NOT connected to Node" % (
+                        self._rtype, self.guid)
+                raise RuntimeError(msg)
+
+            self._node = node[0]
+
+        return self._node
+
+    @property
+    def app_home(self):
+        return os.path.join(self.node.exp_home, self._home)
+
+    @property
+    def run_home(self):
+        return os.path.join(self.app_home, self.ec.run_id)
+
+    @property
+    def pid(self):
+        return self._pid
+
+    @property
+    def ppid(self):
+        return self._ppid
+
+    @property
+    def in_foreground(self):
+        """ Returns True if the command needs to be executed in foreground.
+        This means that command will be executed using 'execute' instead of
+        'run' ('run' executes a command in background and detached from the 
+        terminal)
+        
+        When using X11 forwarding option, the command can not run in background
+        and detached from a terminal, since we need to keep the terminal attached 
+        to interact with it.
+        """
+        return self.get("forwardX11") or self._in_foreground
+
+    def trace_filepath(self, filename):
+        return os.path.join(self.run_home, filename)
+
+    def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        self.info("Retrieving '%s' trace %s " % (name, attr))
+
+        path = self.trace_filepath(name)
+        
+        command = "(test -f %s && echo 'success') || echo 'error'" % path
+        (out, err), proc = self.node.execute(command)
+
+        if (err and proc.poll()) or out.find("error") != -1:
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
+            return None
+    
+        if attr == TraceAttr.PATH:
+            return path
+
+        if attr == TraceAttr.ALL:
+            (out, err), proc = self.node.check_output(self.run_home, name)
+            
+            if proc.poll():
+                msg = " Couldn't read trace %s " % name
+                self.error(msg, out, err)
+                return None
+
+            return out
+
+        if attr == TraceAttr.STREAM:
+            cmd = "dd if=%s bs=%d count=1 skip=%d" % (path, block, offset)
+        elif attr == TraceAttr.SIZE:
+            cmd = "stat -c%%s %s " % path
+
+        (out, err), proc = self.node.execute(cmd)
+
+        if proc.poll():
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
+            return None
+        
+        if attr == TraceAttr.SIZE:
+            out = int(out.strip())
+
+        return out
+
+    def do_provision(self):
+        # take a snapshot of the system if user is root
+        # to ensure that cleanProcess will not kill
+        # pre-existent processes
+        if self.node.get("username") == 'root':
+            import pickle
+            procs = dict()
+            ps_aux = "ps aux |awk '{print $2,$11}'"
+            (out, err), proc = self.node.execute(ps_aux)
+            if len(out) != 0:
+                for line in out.strip().split("\n"):
+                    parts = line.strip().split(" ")
+                    procs[parts[0]] = parts[1]
+                with open("/tmp/save.proc", "wb") as pickle_file:
+                    pickle.dump(procs, pickle_file)
+            
+        # create run dir for application
+        self.node.mkdir(self.run_home)
+   
+        # List of all the provision methods to invoke
+        steps = [
+            # upload sources
+            self.upload_sources,
+            # upload files
+            self.upload_files,
+            # upload binaries
+            self.upload_binaries,
+            # upload libraries
+            self.upload_libraries,
+            # upload code
+            self.upload_code,
+            # upload stdin
+            self.upload_stdin,
+            # install dependencies
+            self.install_dependencies,
+            # build
+            self.build,
+            # Install
+            self.install]
+
+        command = []
+
+        # Since provisioning takes a long time, before
+        # each step we check that the EC is still 
+        for step in steps:
+            if self.ec.abort:
+                self.debug("Interrupting provisioning. EC says 'ABORT")
+                return
+            
+            ret = step()
+            if ret:
+                command.append(ret)
+
+        # upload deploy script
+        deploy_command = ";".join(command)
+        self.execute_deploy_command(deploy_command)
+
+        # upload start script
+        self.upload_start_command()
+       
+        self.info("Provisioning finished")
+
+        super(LinuxApplication, self).do_provision()
+
+    def upload_start_command(self, overwrite = False):
+        # Upload command to remote bash script
+        # - only if command can be executed in background and detached
+        command = self.get("command")
+
+        if command and not self.in_foreground:
+            self.info("Uploading command '%s'" % command)
+
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+            # replace application specific paths in the environment
+            env = self.get("env")
+            env = env and self.replace_paths(env)
+
+            shfile = os.path.join(self.app_home, "start.sh")
+
+            self.node.upload_command(command, 
+                    shfile = shfile,
+                    env = env,
+                    overwrite = overwrite)
+
+    def execute_deploy_command(self, command, prefix="deploy"):
+        if command:
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+            
+            # replace application specific paths in the environment
+            env = self.get("env")
+            env = env and self.replace_paths(env)
+
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
+            shfile = os.path.join(self.app_home, "%s.sh" % prefix)
+            self.node.run_and_wait(command, self.run_home,
+                    shfile = shfile, 
+                    overwrite = False,
+                    pidfile = "%s_pidfile" % prefix, 
+                    ecodefile = "%s_exitcode" % prefix, 
+                    stdout = "%s_stdout" % prefix, 
+                    stderr = "%s_stderr" % prefix)
+
+    def upload_sources(self, sources = None, src_dir = None):
+        if not sources:
+            sources = self.get("sources")
+   
+        command = ""
+
+        if not src_dir:
+            src_dir = self.node.src_dir
+
+        if sources:
+            self.info("Uploading sources ")
+
+            sources = [str.strip(source) for source in sources.split(";")]
+
+            # Separate sources that should be downloaded from 
+            # the web, from sources that should be uploaded from
+            # the local machine
+            command = []
+            for source in list(sources):
+                if source.startswith("http") or source.startswith("https"):
+                    # remove the hhtp source from the sources list
+                    sources.remove(source)
+
+                    command.append( " ( " 
+                            # Check if the source already exists
+                            " ls %(src_dir)s/%(basename)s "
+                            " || ( "
+                            # If source doesn't exist, download it and check
+                            # that it it downloaded ok
+                            "   wget -c --directory-prefix=%(src_dir)s %(source)s && "
+                            "   ls %(src_dir)s/%(basename)s "
+                            " ) ) " % {
+                                "basename": os.path.basename(source),
+                                "source": source,
+                                "src_dir": src_dir
+                                })
+
+            command = " && ".join(command)
+
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+       
+            if sources:
+                sources = ';'.join(sources)
+                self.node.upload(sources, src_dir, overwrite = False)
+
+        return command
+
+    def upload_files(self, files = None):
+        if not files:
+            files = self.get("files")
+
+        if files:
+            self.info("Uploading files %s " % files)
+            self.node.upload(files, self.node.share_dir, overwrite = False)
+
+    def upload_libraries(self, libs = None):
+        if not libs:
+            libs = self.get("libs")
+
+        if libs:
+            self.info("Uploading libraries %s " % libaries)
+            self.node.upload(libs, self.node.lib_dir, overwrite = False)
+
+    def upload_binaries(self, bins = None):
+        if not bins:
+            bins = self.get("bins")
+
+        if bins:
+            self.info("Uploading binaries %s " % binaries)
+            self.node.upload(bins, self.node.bin_dir, overwrite = False)
+
+    def upload_code(self, code = None):
+        if not code:
+            code = self.get("code")
+
+        if code:
+            self.info("Uploading code")
+
+            dst = os.path.join(self.app_home, "code")
+            self.node.upload(code, dst, overwrite = False, text = True)
+
+    def upload_stdin(self, stdin = None):
+        if not stdin:
+           stdin = self.get("stdin")
+
+        if stdin:
+            # create dir for sources
+            self.info("Uploading stdin")
+            
+            # upload stdin file to ${SHARE_DIR} directory
+            if os.path.isfile(stdin):
+                basename = os.path.basename(stdin)
+                dst = os.path.join(self.node.share_dir, basename)
+            else:
+                dst = os.path.join(self.app_home, "stdin")
+
+            self.node.upload(stdin, dst, overwrite = False, text = True)
+
+            # create "stdin" symlink on ${APP_HOME} directory
+            command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
+                "app_home": self.app_home, 
+                "stdin": dst })
+
+            return command
+
+    def install_dependencies(self, depends = None):
+        if not depends:
+            depends = self.get("depends")
+
+        if depends:
+            self.info("Installing dependencies %s" % depends)
+            return self.node.install_packages_command(depends)
+
+    def build(self, build = None):
+        if not build:
+            build = self.get("build")
+
+        if build:
+            self.info("Building sources ")
+            
+            # replace application specific paths in the command
+            return self.replace_paths(build)
+
+    def install(self, install = None):
+        if not install:
+            install = self.get("install")
+
+        if install:
+            self.info("Installing sources ")
+
+            # replace application specific paths in the command
+            return self.replace_paths(install)
+
+    def do_deploy(self):
+        # Wait until node is associated and deployed
+        node = self.node
+        if not node or node.state < ResourceState.READY:
+            self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state)
+            self.ec.schedule(self.reschedule_delay, self.deploy)
+        else:
+            command = self.get("command") or ""
+            self.info("Deploying command '%s' " % command)
+            self.do_discover()
+            self.do_provision()
+
+            super(LinuxApplication, self).do_deploy()
+   
+    def do_start(self):
+        command = self.get("command")
+
+        self.info("Starting command '%s'" % command)
+
+        if not command:
+            # If no command was given (i.e. Application was used for dependency
+            # installation), then the application is directly marked as STOPPED
+            super(LinuxApplication, self).set_stopped()
+        else:
+            if self.in_foreground:
+                self._run_in_foreground()
+            else:
+                self._run_in_background()
+
+            super(LinuxApplication, self).do_start()
+
+    def _run_in_foreground(self):
+        command = self.get("command")
+        sudo = self.get("sudo") or False
+        x11 = self.get("forwardX11")
+        env = self.get("env")
+
+        # Command will be launched in foreground and attached to the
+        # terminal using the node 'execute' in non blocking mode.
+
+        # We save the reference to the process in self._proc 
+        # to be able to kill the process from the stop method.
+        # We also set blocking = False, since we don't want the
+        # thread to block until the execution finishes.
+        (out, err), self._proc = self.execute_command(command, 
+                env = env,
+                sudo = sudo,
+                forward_x11 = x11,
+                blocking = False)
+
+        if self._proc.poll():
+            self.error(msg, out, err)
+            raise RuntimeError(msg)
+
+    def _run_in_background(self):
+        command = self.get("command")
+        env = self.get("env")
+        sudo = self.get("sudo") or False
+
+        stdout = "stdout"
+        stderr = "stderr"
+        stdin = os.path.join(self.app_home, "stdin") if self.get("stdin") \
+                else None
+
+        # Command will be run as a daemon in baground and detached from any
+        # terminal.
+        # The command to run was previously uploaded to a bash script
+        # during deployment, now we launch the remote script using 'run'
+        # method from the node.
+        cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
+        (out, err), proc = self.node.run(cmd, self.run_home, 
+            stdin = stdin, 
+            stdout = stdout,
+            stderr = stderr,
+            sudo = sudo)
+
+        # check if execution errors occurred
+        msg = " Failed to start command '%s' " % command
+        
+        if proc.poll():
+            self.error(msg, out, err)
+            raise RuntimeError(msg)
+    
+        # Wait for pid file to be generated
+        pid, ppid = self.node.wait_pid(self.run_home)
+        if pid: self._pid = int(pid)
+        if ppid: self._ppid = int(ppid)
+
+        # If the process is not running, check for error information
+        # on the remote machine
+        if not self.pid or not self.ppid:
+            (out, err), proc = self.node.check_errors(self.run_home,
+                    stderr = stderr) 
+
+            # Out is what was written in the stderr file
+            if err:
+                msg = " Failed to start command '%s' " % command
+                self.error(msg, out, err)
+                raise RuntimeError(msg)
+    
+    def do_stop(self):
+        """ Stops application execution
+        """
+        command = self.get('command') or ''
+
+        if self.state == ResourceState.STARTED:
+        
+            self.info("Stopping command '%s' " % command)
+        
+            # If the command is running in foreground (it was launched using
+            # the node 'execute' method), then we use the handler to the Popen
+            # process to kill it. Else we send a kill signal using the pid and ppid
+            # retrieved after running the command with the node 'run' method
+            if self._proc:
+                self._proc.kill()
+            else:
+                # Only try to kill the process if the pid and ppid
+                # were retrieved
+                if self.pid and self.ppid:
+                    (out, err), proc = self.node.kill(self.pid, self.ppid,
+                            sudo = self._sudo_kill)
+
+                    """
+                    # TODO: check if execution errors occurred
+                    if (proc and proc.poll()) or err:
+                        msg = " Failed to STOP command '%s' " % self.get("command")
+                        self.error(msg, out, err)
+                    """
+
+            super(LinuxApplication, self).do_stop()
+
+    def do_release(self):
+        self.info("Releasing resource")
+
+        self.do_stop()
+        
+        tear_down = self.get("tearDown")
+        if tear_down:
+            self.node.execute(tear_down)
+
+        hard_release = self.get("hardRelease")
+        if hard_release:
+            self.node.rmdir(self.app_home)
+
+        super(LinuxApplication, self).do_release()
+        
+    @property
+    def state(self):
+        """ Returns the state of the application
+        """
+        if self._state == ResourceState.STARTED:
+            if self.in_foreground:
+                # Check if the process we used to execute the command
+                # is still running ...
+                retcode = self._proc.poll()
+
+                # retcode == None -> running
+                # retcode > 0 -> error
+                # retcode == 0 -> finished
+                if retcode:
+                    out = ""
+                    msg = " Failed to execute command '%s'" % self.get("command")
+                    err = self._proc.stderr.read()
+                    self.error(msg, out, err)
+                    self.do_fail()
+
+                elif retcode == 0:
+                    self.set_stopped()
+            else:
+                # We need to query the status of the command we launched in 
+                # background. In order to avoid overwhelming the remote host and
+                # the local processor with too many ssh queries, the state is only
+                # requested every 'state_check_delay' seconds.
+                state_check_delay = 0.5
+                if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
+                    if self.pid and self.ppid:
+                        # Make sure the process is still running in background
+                        status = self.node.status(self.pid, self.ppid)
+
+                        if status == ProcStatus.FINISHED:
+                            # If the program finished, check if execution
+                            # errors occurred
+                            (out, err), proc = self.node.check_errors(
+                                    self.run_home)
+
+                            if err:
+                                msg = "Failed to execute command '%s'" % \
+                                        self.get("command")
+                                self.error(msg, out, err)
+                                self.do_fail()
+                            else:
+                                self.set_stopped()
+
+                    self._last_state_check = tnow()
+
+        return self._state
+
+    def execute_command(self, command, 
+            env=None,
+            sudo=False,
+            tty=False,
+            forward_x11=False,
+            blocking=False):
+
+        environ = ""
+        if env:
+            environ = self.node.format_environment(env, inline=True)
+        command = environ + command
+        command = self.replace_paths(command)
+
+        return self.node.execute(command,
+                sudo=sudo,
+                tty=tty,
+                forward_x11=forward_x11,
+                blocking=blocking)
+
+    def replace_paths(self, command, node=None, app_home=None, run_home=None):
+        """
+        Replace all special path tags with shell-escaped actual paths.
+        """
+        if not node:
+            node=self.node
+
+        if not app_home:
+            app_home=self.app_home
+
+        if not run_home:
+            run_home = self.run_home
+
+        return ( command
+            .replace("${USR}", node.usr_dir)
+            .replace("${LIB}", node.lib_dir)
+            .replace("${BIN}", node.bin_dir)
+            .replace("${SRC}", node.src_dir)
+            .replace("${SHARE}", node.share_dir)
+            .replace("${EXP}", node.exp_dir)
+            .replace("${EXP_HOME}", node.exp_home)
+            .replace("${APP_HOME}", app_home)
+            .replace("${RUN_HOME}", run_home)
+            .replace("${NODE_HOME}", node.node_home)
+            .replace("${HOME}", node.home_dir)
+            )
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+