Adding CCN RMs for Linux Backend
[nepi.git] / src / nepi / resources / linux / application.py
index 0364939..727375d 100644 (file)
@@ -1,17 +1,33 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux.node import LinuxNode
-from nepi.util import sshfuncs 
+from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
 
-import logging
 import os
 
-reschedule_delay = "0.5s"
-state_check_delay = 1
+# TODO: Resolve wildcards in commands!!
 
-# TODO: Resolve wildcards in commands!! 
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -86,11 +102,9 @@ class LinuxApplication(ResourceManager):
     def _register_traces(cls):
         stdout = Trace("stdout", "Standard output stream")
         stderr = Trace("stderr", "Standard error stream")
-        buildlog = Trace("buildlog", "Output of the build process")
 
         cls._register_trace(stdout)
         cls._register_trace(stderr)
-        cls._register_trace(buildlog)
 
     def __init__(self, ec, guid):
         super(LinuxApplication, self).__init__(ec, guid)
@@ -98,10 +112,12 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
+        # keep a reference to the running process handler when 
+        # the command is not executed as remote daemon in background
+        self._proc = None
+
         # timestamp of last state check of the application
         self._last_state_check = strfnow()
-
-        self._logger = logging.getLogger("LinuxApplication")
     
     def log_message(self, msg):
         return " guid %d - host %s - %s " % (self.guid, 
@@ -133,6 +149,19 @@ class LinuxApplication(ResourceManager):
     def ppid(self):
         return self._ppid
 
+    @property
+    def in_foreground(self):
+        """ Returns True if the command needs to be executed in foreground.
+        This means that command will be executed using 'execute' instead of
+        'run' ('run' executes a command in background and detached from the 
+        terminal)
+
+        When using X11 forwarding option, the command can not run in background
+        and detached from a terminal, since we need to keep the terminal attached 
+        to interact with it.
+        """
+        return self.get("forwardX11") or False
+
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
 
@@ -176,7 +205,7 @@ class LinuxApplication(ResourceManager):
 
         return out
             
-    def provision(self, filters = None):
+    def provision(self):
         # create home dir for application
         self.node.mkdir(self.app_home)
 
@@ -198,32 +227,32 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
+        # Upload command to remote bash script
+        # - only if command can be executed in background and detached
         command = self.get("command")
-        x11 = self.get("forwardX11")
-        if not x11 and command:
-            self.info("Uploading command '%s'" % command)
 
-            # Export environment
-            environ = ""
-            env = self.get("env") or ""
-            for var in env.split(" "):
-                environ += 'export %s\n' % var
-
-            command = environ + command
+        if command and not self.in_foreground:
+            self.info("Uploading command '%s'" % command)
 
-            # If the command runs asynchronous, pre upload the command 
-            # to the app.sh file in the remote host
-            dst = os.path.join(self.app_home, "app.sh")
+            # replace application specific paths in the command
             command = self.replace_paths(command)
-            self.node.upload(command, dst, text = True)
+            
+            # replace application specific paths in the environment
+            env = self.get("env")
+            env = env and self.replace_paths(env)
+
+            self.node.upload_command(command, self.app_home, 
+                    shfile = "app.sh",
+                    env = env)
+       
+        self.info("Provisioning finished")
 
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
-        # TODO: check if sources need to be uploaded and upload them
         sources = self.get("sources")
         if sources:
-            self.info(" Uploading sources ")
+            self.info("Uploading sources ")
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
@@ -236,25 +265,34 @@ class LinuxApplication(ResourceManager):
                     http_sources.append(source)
                     sources.remove(source)
 
-            # Download http sources
+            # Download http sources remotely
             if http_sources:
-                cmd = " wget -c --directory-prefix=${SOURCES} "
-                verif = ""
+                command = [" wget -c --directory-prefix=${SOURCES} "]
+                check = []
 
                 for source in http_sources:
-                    cmd += " %s " % (source)
-                    verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+                    command.append(" %s " % (source))
+                    check.append(" ls ${SOURCES}/%s " % os.path.basename(source))
                 
-                # Wget output goes to stderr :S
-                cmd += " 2> /dev/null ; "
+                command = " ".join(command)
+                check = " ; ".join(check)
 
-                # Add verification
-                cmd += " %s " % verif
+                # Append the command to check that the sources were downloaded
+                command += " ; %s " % check
+
+                # replace application specific paths in the command
+                command = self.replace_paths(command)
+                
+                # Upload the command to a bash script and run it
+                # in background ( but wait until the command has
+                # finished to continue )
+                self.node.run_and_wait(command, self.app_home,
+                        shfile = "http_sources.sh",
+                        pidfile = "http_sources_pidfile", 
+                        ecodefile = "http_sources_exitcode", 
+                        stdout = "http_sources_stdout", 
+                        stderr = "http_sources_stderr")
 
-                # Upload the command to a file, and execute asynchronously
-                self.upload_and_run(cmd, 
-                        "http_sources.sh", "http_sources_pid", 
-                        "http_sources_out", "http_sources_err")
             if sources:
                 self.node.upload(sources, self.src_dir)
 
@@ -264,7 +302,7 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            self.info(" Uploading code ")
+            self.info("Uploading code ")
 
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
@@ -281,42 +319,60 @@ class LinuxApplication(ResourceManager):
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
-            self.info(" Installing dependencies %s" % depends)
-            self.node.install_packages(depends, home = self.app_home)
+            self.info("Installing dependencies %s" % depends)
+            self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
         if build:
-            self.info(" Building sources ")
+            self.info("Building sources ")
             
             # create dir for build
             self.node.mkdir(self.build_dir)
 
-            # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(build, 
-                    "build.sh", "build_pid", 
-                    "build_out", "build_err")
+            # replace application specific paths in the command
+            command = self.replace_paths(build)
+
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "build.sh",
+                    pidfile = "build_pidfile", 
+                    ecodefile = "build_exitcode", 
+                    stdout = "build_stdout", 
+                    stderr = "build_stderr")
  
     def install(self):
         install = self.get("install")
         if install:
-            self.info(" Installing sources ")
+            self.info("Installing sources ")
+
+            # replace application specific paths in the command
+            command = self.replace_paths(install)
 
-            # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(install, 
-                    "install.sh", "install_pid", 
-                    "install_out", "install_err")
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "install.sh",
+                    pidfile = "install_pidfile", 
+                    ecodefile = "install_exitcode", 
+                    stdout = "install_stdout", 
+                    stderr = "install_stderr")
 
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+            
+            reschedule_delay = "0.5s"
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
                 command = self.get("command") or ""
-                self.info(" Deploying command '%s' " % command)
+                self.info("Deploying command '%s' " % command)
                 self.discover()
                 self.provision()
             except:
@@ -326,48 +382,61 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.get('command')
-        env = self.get('env')
-        stdin = 'stdin' if self.get('stdin') else None
-        stdout = 'stdout' if self.get('stdout') else 'stdout'
-        stderr = 'stderr' if self.get('stderr') else 'stderr'
-        sudo = self.get('sudo') or False
-        x11 = self.get('forwardX11') or False
+        command = self.get("command")
+        env = self.get("env")
+        stdin = "stdin" if self.get("stdin") else None
+        stdout = "stdout" if self.get("stdout") else "stdout"
+        stderr = "stderr" if self.get("stderr") else "stderr"
+        sudo = self.get("sudo") or False
         failed = False
 
-        super(LinuxApplication, self).start()
-
-        if not command:
-            self.info("No command to start ")
-            self._state = ResourceState.FINISHED
-            return 
-    
         self.info("Starting command '%s'" % command)
 
-        if x11:
-            if env:
-                # Export environment
-                environ = ""
-                for var in env.split(" "):
-                    environ += ' %s ' % var
+        if self.in_foreground:
+            # If command should run in foreground, we invoke 'execute' method
+            # of the node
+            if not command:
+                msg = "No command is defined but X11 forwarding has been set"
+                self.error(msg)
+                self._state = ResourceState.FAILED
+                raise RuntimeError, msg
 
-                command = "(" + environ + " ; " + command + ")"
-                command = self.replace_paths(command)
+            # Export environment
+            environ = self.node.format_environment(env, inline = True)
 
-            # If the command requires X11 forwarding, we
-            # can't run it asynchronously
-            (out, err), proc = self.node.execute(command,
+            command = environ + command
+            command = self.replace_paths(command)
+            
+            x11 = self.get("forwardX11")
+
+            # We save the reference to the process in self._proc 
+            # to be able to kill the process from the stop method.
+            # We also set blocking = False, since we don't want the
+            # thread to block until the execution finishes.
+            (out, err), self._proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
-                    forward_x11 = x11)
+                    forward_x11 = x11,
+                    blocking = False)
 
-            self._state = ResourceState.FINISHED
+            if self._proc.poll():
+                out = ""
+                err = self._proc.stderr.read()
+                self._state = ResourceState.FAILED
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+            
+            super(LinuxApplication, self).start()
 
-            if proc.poll() and err:
-                failed = True
-        else:
-            # Command was  previously uploaded, now run the remote
-            # bash file asynchronously
+        elif command:
+            # If command is set (i.e. application is not used only for dependency
+            # installation), and it does not need to run in foreground, then we  
+            # invoke the 'run' method of the node to launch the application as a 
+            # daemon in background
+
+            # The real command to execute was previously uploaded to a remote bash
+            # script during deployment, now launch the remote script using 'run'
+            # method from the node
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
@@ -375,52 +444,65 @@ class LinuxApplication(ResourceManager):
                 stderr = stderr,
                 sudo = sudo)
 
-            if proc.poll() and err:
-                failed = True
+            # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            
+            if proc.poll():
+                self.error(msg, out, err)
+                raise RuntimeError, msg
         
-            if not failed:
-                pid, ppid = self.node.wait_pid(home = self.app_home)
-                if pid: self._pid = int(pid)
-                if ppid: self._ppid = int(ppid)
-
+            # Wait for pid file to be generated
+            pid, ppid = self.node.wait_pid(self.app_home)
+            if pid: self._pid = int(pid)
+            if ppid: self._ppid = int(ppid)
+  
+            # If the process is not running, check for error information
+            # on the remote machine
             if not self.pid or not self.ppid:
-                failed = True
-            (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
+                (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
-            if failed or out or chkerr:
-                # check if execution errors occurred
-                msg = " Failed to start command '%s' " % command
-                out = out
+                # Out is what was written in the stderr file
                 if err:
-                    err = err
-                elif chkerr:
-                    err = chkerr
-
-                self.error(msg, out, err)
-
-                msg2 = " Setting state to Failed"
-                self.debug(msg2)
-                self._state = ResourceState.FAILED
+                    msg = " Failed to start command '%s' " % command
+                    self.error(msg, out, err)
+                    raise RuntimeError, msg
 
-                raise RuntimeError, msg
+            super(LinuxApplication, self).start()
 
+        else:
+            # If no command was given (i.e. Application was used for dependency
+            # installation), then the application is directly marked as FINISHED
+            self._state = ResourceState.FINISHED
     def stop(self):
+        """ Stops application execution
+        """
         command = self.get('command') or ''
         state = self.state
-        
+
         if state == ResourceState.STARTED:
-            self.info("Stopping command '%s'" % command)
+            stopped = True
 
-            (out, err), proc = self.node.kill(self.pid, self.ppid)
+            self.info("Stopping command '%s'" % command)
+        
+            # If the command is running in foreground (it was launched using
+            # the node 'execute' method), then we use the handler to the Popen
+            # process to kill it. Else we send a kill signal using the pid and ppid
+            # retrieved after running the command with the node 'run' method
 
-            if out or err:
-                # check if execution errors occurred
-                msg = " Failed to STOP command '%s' " % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
-                stopped = False
+            if self._proc:
+                self._proc.kill()
             else:
+                (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+                if out or err:
+                    # check if execution errors occurred
+                    msg = " Failed to STOP command '%s' " % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+                    stopped = False
+
+            if stopped:
                 super(LinuxApplication, self).stop()
 
     def release(self):
@@ -436,47 +518,53 @@ class LinuxApplication(ResourceManager):
     
     @property
     def state(self):
+        """ Returns the state of the application
+        """
         if self._state == ResourceState.STARTED:
-            # To avoid overwhelming the remote hosts and the local processor
-            # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' .
-            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
-                # check if execution errors occurred
-                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
-
-                if out or err:
-                    if err.find("No such file or directory") >= 0 :
-                        # The resource is marked as started, but the
-                        # command was not yet executed
-                        return ResourceState.READY
-
+            if self.in_foreground:
+                # Check if the process we used to execute the command
+                # is still running ...
+                retcode = self._proc.poll()
+                
+                # retcode == None -> running
+                # retcode > 0 -> error
+                # retcode == 0 -> finished
+                if retcode:
+                    out = ""
                     msg = " Failed to execute command '%s'" % self.get("command")
+                    err = self._proc.stderr.read()
                     self.error(msg, out, err)
                     self._state = ResourceState.FAILED
+                elif retcode == 0:
+                    self._state = ResourceState.FINISHED
 
-                elif self.pid and self.ppid:
-                    status = self.node.status(self.pid, self.ppid)
-
-                    if status == sshfuncs.FINISHED:
-                        self._state = ResourceState.FINISHED
-
-
-                self._last_state_check = strfnow()
+            else:
+                # We need to query the status of the command we launched in 
+                # background. In oredr to avoid overwhelming the remote host and
+                # the local processor with too many ssh queries, the state is only
+                # requested every 'state_check_delay' seconds.
+                state_check_delay = 0.5
+                if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                    # check if execution errors occurred
+                    (out, err), proc = self.node.check_errors(self.app_home)
+
+                    if err:
+                        msg = " Failed to execute command '%s'" % self.get("command")
+                        self.error(msg, out, err)
+                        self._state = ResourceState.FAILED
+
+                    elif self.pid and self.ppid:
+                        # No execution errors occurred. Make sure the background
+                        # process with the recorded pid is still running.
+                        status = self.node.status(self.pid, self.ppid)
+
+                        if status == ProcStatus.FINISHED:
+                            self._state = ResourceState.FINISHED
+
+                    self._last_state_check = strfnow()
 
         return self._state
 
-    def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
-        dst = os.path.join(self.app_home, fname)
-        cmd = self.replace_paths(cmd)
-        self.node.upload(cmd, dst, text = True)
-
-        cmd = "bash ./%s" % fname
-        (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
-            pidfile = pidfile,
-            stdout = outfile, 
-            stderr = errfile, 
-            raise_on_error = True)
-
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.
@@ -495,8 +583,4 @@ class LinuxApplication(ResourceManager):
     def valid_connection(self, guid):
         # TODO: Validate!
         return True
-        # XXX: What if it is connected to more than one node?
-        resources = self.find_resources(exact_tags = [tags.NODE])
-        self._node = resources[0] if len(resources) == 1 else None
-        return self._node