Fixing tickets http://newyans.pl.sophia.inria.fr/trac/ticket/37 and http://newyans...
[nepi.git] / src / nepi / resources / linux / application.py
index d0a8c32..c1de543 100644 (file)
@@ -19,8 +19,8 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
-from nepi.execution.resource import ResourceManager, clsinit, ResourceState, \
-    reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay
 from nepi.resources.linux.node import LinuxNode
 from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import tnow, tdiffsec
@@ -29,12 +29,9 @@ import os
 import subprocess
 
 # TODO: Resolve wildcards in commands!!
-# TODO: During provisioning, everything that is not scp could be
-#       uploaded to a same script, http_sources download, etc...
-#       and like that require performing less ssh connections!!!
+# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 
-
-@clsinit
+@clsinit_copy
 class LinuxApplication(ResourceManager):
     """
     .. class:: Class Args :
@@ -85,6 +82,8 @@ class LinuxApplication(ResourceManager):
     """
 
     _rtype = "LinuxApplication"
+    _help = "Runs an application on a Linux host with a BASH command "
+    _backend_type = "linux"
 
     @classmethod
     def _register_attributes(cls):
@@ -175,8 +174,13 @@ class LinuxApplication(ResourceManager):
         self._pid = None
         self._ppid = None
         self._home = "app-%s" % self.guid
+        # whether the command should run in foreground attached
+        # to a terminal
         self._in_foreground = False
 
+        # whether to use sudo to kill the application process
+        self._sudo_kill = False
+
         # keep a reference to the running process handler when 
         # the command is not executed as remote daemon in background
         self._proc = None
@@ -242,7 +246,7 @@ class LinuxApplication(ResourceManager):
         if attr == TraceAttr.ALL:
             (out, err), proc = self.node.check_output(self.run_home, name)
             
-            if err and proc.poll():
+            if proc.poll():
                 msg = " Couldn't read trace %s " % name
                 self.error(msg, out, err)
                 return None
@@ -256,7 +260,7 @@ class LinuxApplication(ResourceManager):
 
         (out, err), proc = self.node.execute(cmd)
 
-        if err and proc.poll():
+        if proc.poll():
             msg = " Couldn't find trace %s " % name
             self.error(msg, out, err)
             return None
@@ -265,11 +269,12 @@ class LinuxApplication(ResourceManager):
             out = int(out.strip())
 
         return out
-            
-    def provision(self):
+
+    def do_provision(self):
         # create run dir for application
         self.node.mkdir(self.run_home)
-    
+   
+        # List of all the provision methods to invoke
         steps = [
             # upload sources
             self.upload_sources,
@@ -290,14 +295,31 @@ class LinuxApplication(ResourceManager):
             # Install
             self.install]
 
+        command = []
+
         # Since provisioning takes a long time, before
         # each step we check that the EC is still 
         for step in steps:
-            if self.ec.finished:
-                raise RuntimeError, "EC finished"
+            if self.ec.abort:
+                self.debug("Interrupting provisioning. EC says 'ABORT")
+                return
+            
+            ret = step()
+            if ret:
+                command.append(ret)
 
-            step()
+        # upload deploy script
+        deploy_command = ";".join(command)
+        self.execute_deploy_command(deploy_command)
 
+        # upload start script
+        self.upload_start_command()
+       
+        self.info("Provisioning finished")
+
+        super(LinuxApplication, self).do_provision()
+
+    def upload_start_command(self):
         # Upload command to remote bash script
         # - only if command can be executed in background and detached
         command = self.get("command")
@@ -312,18 +334,31 @@ class LinuxApplication(ResourceManager):
             env = self.get("env")
             env = env and self.replace_paths(env)
 
-            shfile = os.path.join(self.app_home, "app.sh")
+            shfile = os.path.join(self.app_home, "start.sh")
 
             self.node.upload_command(command, 
                     shfile = shfile,
-                    env = env)
-       
-        self.info("Provisioning finished")
+                    env = env,
+                    overwrite = False)
 
-        super(LinuxApplication, self).provision()
+    def execute_deploy_command(self, command):
+        if command:
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
+            shfile = os.path.join(self.app_home, "deploy.sh")
+            self.node.run_and_wait(command, self.run_home,
+                    shfile = shfile, 
+                    overwrite = False,
+                    pidfile = "deploy_pidfile", 
+                    ecodefile = "deploy_exitcode", 
+                    stdout = "deploy_stdout", 
+                    stderr = "deploy_stderr")
 
     def upload_sources(self):
         sources = self.get("sources")
+   
+        command = ""
 
         if sources:
             self.info("Uploading sources ")
@@ -352,27 +387,17 @@ class LinuxApplication(ResourceManager):
                                 "source": source
                                 })
 
-            if command:
-                command = " && ".join(command)
-
-                # replace application specific paths in the command
-                command = self.replace_paths(command)
-                
-                # Upload the command to a bash script and run it
-                # in background ( but wait until the command has
-                # finished to continue )
-                self.node.run_and_wait(command, self.run_home,
-                        shfile = os.path.join(self.app_home, "http_sources.sh"),
-                        overwrite = False,
-                        pidfile = "http_sources_pidfile", 
-                        ecodefile = "http_sources_exitcode", 
-                        stdout = "http_sources_stdout", 
-                        stderr = "http_sources_stderr")
+            command = " && ".join(command)
 
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+       
             if sources:
                 sources = ' '.join(sources)
                 self.node.upload(sources, self.node.src_dir, overwrite = False)
 
+        return command
+
     def upload_files(self):
         files = self.get("files")
 
@@ -409,14 +434,23 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.info("Uploading stdin")
             
-            dst = os.path.join(self.app_home, "stdin")
+            # upload stdin file to ${SHARE_DIR} directory
+            basename = os.path.basename(stdin)
+            dst = os.path.join(self.node.share_dir, basename)
             self.node.upload(stdin, dst, overwrite = False, text = True)
 
+            # create "stdin" symlink on ${APP_HOME} directory
+            command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
+                "app_home": self.app_home, 
+                "stdin": dst })
+
+            return command
+
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
             self.info("Installing dependencies %s" % depends)
-            self.node.install_packages(depends, self.app_home, self.run_home)
+            return self.node.install_packages_command(depends)
 
     def build(self):
         build = self.get("build")
@@ -425,19 +459,8 @@ class LinuxApplication(ResourceManager):
             self.info("Building sources ")
             
             # replace application specific paths in the command
-            command = self.replace_paths(build)
+            return self.replace_paths(build)
 
-            # Upload the command to a bash script and run it
-            # in background ( but wait until the command has
-            # finished to continue )
-            self.node.run_and_wait(command, self.run_home,
-                    shfile = os.path.join(self.app_home, "build.sh"),
-                    overwrite = False,
-                    pidfile = "build_pidfile", 
-                    ecodefile = "build_exitcode", 
-                    stdout = "build_stdout", 
-                    stderr = "build_stderr")
     def install(self):
         install = self.get("install")
 
@@ -445,38 +468,23 @@ class LinuxApplication(ResourceManager):
             self.info("Installing sources ")
 
             # replace application specific paths in the command
-            command = self.replace_paths(install)
+            return self.replace_paths(install)
 
-            # Upload the command to a bash script and run it
-            # in background ( but wait until the command has
-            # finished to continue )
-            self.node.run_and_wait(command, self.run_home,
-                    shfile = os.path.join(self.app_home, "install.sh"),
-                    overwrite = False,
-                    pidfile = "install_pidfile", 
-                    ecodefile = "install_exitcode", 
-                    stdout = "install_stdout", 
-                    stderr = "install_stderr")
-
-    def deploy(self):
+    def do_deploy(self):
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            try:
-                command = self.get("command") or ""
-                self.info("Deploying command '%s' " % command)
-                self.discover()
-                self.provision()
-            except:
-                self._state = ResourceState.FAILED
-                raise
-
-            super(LinuxApplication, self).deploy()
-
-    def start(self):
+            command = self.get("command") or ""
+            self.info("Deploying command '%s' " % command)
+            self.do_discover()
+            self.do_provision()
+
+            super(LinuxApplication, self).do_deploy()
+   
+    def do_start(self):
         command = self.get("command")
 
         self.info("Starting command '%s'" % command)
@@ -484,20 +492,20 @@ class LinuxApplication(ResourceManager):
         if not command:
             # If no command was given (i.e. Application was used for dependency
             # installation), then the application is directly marked as FINISHED
-            self._state = ResourceState.FINISHED
+            super(LinuxApplication, self).do_finish()
         else:
-
             if self.in_foreground:
-                self._start_in_foreground()
+                self._run_in_foreground()
             else:
-                self._start_in_background()
+                self._run_in_background()
 
-            super(LinuxApplication, self).start()
+            super(LinuxApplication, self).do_start()
 
-    def _start_in_foreground(self):
+    def _run_in_foreground(self):
         command = self.get("command")
         sudo = self.get("sudo") or False
         x11 = self.get("forwardX11")
+        env = self.get("env")
 
         # For a command being executed in foreground, if there is stdin,
         # it is expected to be text string not a file or pipe
@@ -506,28 +514,22 @@ class LinuxApplication(ResourceManager):
         # Command will be launched in foreground and attached to the
         # terminal using the node 'execute' in non blocking mode.
 
-        # Export environment
-        env = self.get("env")
-        environ = self.node.format_environment(env, inline = True)
-        command = environ + command
-        command = self.replace_paths(command)
-
         # We save the reference to the process in self._proc 
         # to be able to kill the process from the stop method.
         # We also set blocking = False, since we don't want the
         # thread to block until the execution finishes.
-        (out, err), self._proc = self.node.execute(command,
+        (out, err), self._proc = self.execute_command(command, 
+                env = env,
                 sudo = sudo,
                 stdin = stdin,
                 forward_x11 = x11,
                 blocking = False)
 
         if self._proc.poll():
-            self._state = ResourceState.FAILED
             self.error(msg, out, err)
             raise RuntimeError, msg
 
-    def _start_in_background(self):
+    def _run_in_background(self):
         command = self.get("command")
         env = self.get("env")
         sudo = self.get("sudo") or False
@@ -542,7 +544,7 @@ class LinuxApplication(ResourceManager):
         # The command to run was previously uploaded to a bash script
         # during deployment, now we launch the remote script using 'run'
         # method from the node.
-        cmd = "bash %s" % os.path.join(self.app_home, "app.sh")
+        cmd = "bash %s" % os.path.join(self.app_home, "start.sh")
         (out, err), proc = self.node.run(cmd, self.run_home, 
             stdin = stdin, 
             stdout = stdout,
@@ -553,7 +555,6 @@ class LinuxApplication(ResourceManager):
         msg = " Failed to start command '%s' " % command
         
         if proc.poll():
-            self._state = ResourceState.FAILED
             self.error(msg, out, err)
             raise RuntimeError, msg
     
@@ -570,56 +571,50 @@ class LinuxApplication(ResourceManager):
 
             # Out is what was written in the stderr file
             if err:
-                self._state = ResourceState.FAILED
                 msg = " Failed to start command '%s' " % command
                 self.error(msg, out, err)
                 raise RuntimeError, msg
-        
-    def stop(self):
+    
+    def do_stop(self):
         """ Stops application execution
         """
         command = self.get('command') or ''
 
         if self.state == ResourceState.STARTED:
-            stopped = True
-
-            self.info("Stopping command '%s'" % command)
+        
+            self.info("Stopping command '%s' " % command)
         
             # If the command is running in foreground (it was launched using
             # the node 'execute' method), then we use the handler to the Popen
             # process to kill it. Else we send a kill signal using the pid and ppid
             # retrieved after running the command with the node 'run' method
-
             if self._proc:
                 self._proc.kill()
             else:
                 # Only try to kill the process if the pid and ppid
                 # were retrieved
                 if self.pid and self.ppid:
-                    (out, err), proc = self.node.kill(self.pid, self.ppid)
+                    (out, err), proc = self.node.kill(self.pid, self.ppid,
+                            sudo = self._sudo_kill)
 
-                    if out or err:
-                        # check if execution errors occurred
+                    # TODO: check if execution errors occurred
+                    if proc.poll() or err:
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
-                        self._state = ResourceState.FAILED
-                        stopped = False
-
-            if stopped:
-                super(LinuxApplication, self).stop()
+        
+            super(LinuxApplication, self).do_stop()
 
-    def release(self):
+    def do_release(self):
         self.info("Releasing resource")
 
         tear_down = self.get("tearDown")
         if tear_down:
             self.node.execute(tear_down)
 
-        self.stop()
+        self.do_stop()
 
-        if self.state == ResourceState.STOPPED:
-            super(LinuxApplication, self).release()
-    
+        super(LinuxApplication, self).do_release()
+        
     @property
     def state(self):
         """ Returns the state of the application
@@ -638,37 +633,58 @@ class LinuxApplication(ResourceManager):
                     msg = " Failed to execute command '%s'" % self.get("command")
                     err = self._proc.stderr.read()
                     self.error(msg, out, err)
-                    self._state = ResourceState.FAILED
-                elif retcode == 0:
-                    self._state = ResourceState.FINISHED
+                    self.do_fail()
 
+                elif retcode == 0:
+                    self.do_finish()
             else:
                 # We need to query the status of the command we launched in 
-                # background. In oredr to avoid overwhelming the remote host and
+                # background. In order to avoid overwhelming the remote host and
                 # the local processor with too many ssh queries, the state is only
                 # requested every 'state_check_delay' seconds.
                 state_check_delay = 0.5
                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
-                    # check if execution errors occurred
-                    (out, err), proc = self.node.check_errors(self.run_home)
-
-                    if err:
-                        msg = " Failed to execute command '%s'" % self.get("command")
-                        self.error(msg, out, err)
-                        self._state = ResourceState.FAILED
-
-                    elif self.pid and self.ppid:
-                        # No execution errors occurred. Make sure the background
-                        # process with the recorded pid is still running.
+                    if self.pid and self.ppid:
+                        # Make sure the process is still running in background
                         status = self.node.status(self.pid, self.ppid)
 
                         if status == ProcStatus.FINISHED:
-                            self._state = ResourceState.FINISHED
+                            # If the program finished, check if execution
+                            # errors occurred
+                            (out, err), proc = self.node.check_errors(
+                                    self.run_home)
+
+                            if err:
+                                msg = "Failed to execute command '%s'" % \
+                                        self.get("command")
+                                self.error(msg, out, err)
+                                self.do_fail()
+                            else:
+                                self.do_finish()
 
                     self._last_state_check = tnow()
 
         return self._state
 
+    def execute_command(self, command, 
+            env = None,
+            sudo = False,
+            stdin = None,
+            forward_x11 = False,
+            blocking = False):
+
+        environ = ""
+        if env:
+            environ = self.node.format_environment(env, inline = True)
+        command = environ + command
+        command = self.replace_paths(command)
+
+        return self.node.execute(command,
+                sudo = sudo,
+                stdin = stdin,
+                forward_x11 = forward_x11,
+                blocking = blocking)
+
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.