Fixing issues in LinuxNode whith high concurrency
[nepi.git] / src / nepi / resources / linux / application.py
index 453010e..3519138 100644 (file)
@@ -171,8 +171,13 @@ class LinuxApplication(ResourceManager):
         self._pid = None
         self._ppid = None
         self._home = "app-%s" % self.guid
+        # whether the command should run in foreground attached
+        # to a terminal
         self._in_foreground = False
 
+        # whether to use sudo to kill the application process
+        self._sudo_kill = False
+
         # keep a reference to the running process handler when 
         # the command is not executed as remote daemon in background
         self._proc = None
@@ -238,7 +243,7 @@ class LinuxApplication(ResourceManager):
         if attr == TraceAttr.ALL:
             (out, err), proc = self.node.check_output(self.run_home, name)
             
-            if err and proc.poll():
+            if proc.poll():
                 msg = " Couldn't read trace %s " % name
                 self.error(msg, out, err)
                 return None
@@ -252,7 +257,7 @@ class LinuxApplication(ResourceManager):
 
         (out, err), proc = self.node.execute(cmd)
 
-        if err and proc.poll():
+        if proc.poll():
             msg = " Couldn't find trace %s " % name
             self.error(msg, out, err)
             return None
@@ -329,7 +334,8 @@ class LinuxApplication(ResourceManager):
 
             self.node.upload_command(command, 
                     shfile = shfile,
-                    env = env)
+                    env = env,
+                    overwrite = False)
 
     def execute_deploy_command(self, command):
         if command:
@@ -430,7 +436,9 @@ class LinuxApplication(ResourceManager):
             self.node.upload(stdin, dst, overwrite = False, text = True)
 
             # create "stdin" symlink on ${APP_HOME} directory
-            command = "( cd %s ; ln -s %s stdin )" % ( self.app_home, dst)
+            command = "( cd %(app_home)s ; [ ! -f stdin ] &&  ln -s %(stdin)s stdin )" % ({
+                "app_home": self.app_home, 
+                "stdin": dst })
 
             return command
 
@@ -438,7 +446,7 @@ class LinuxApplication(ResourceManager):
         depends = self.get("depends")
         if depends:
             self.info("Installing dependencies %s" % depends)
-            self.node.install_packages(depends, self.app_home, self.run_home)
+            return self.node.install_packages_command(depends)
 
     def build(self):
         build = self.get("build")
@@ -488,13 +496,13 @@ class LinuxApplication(ResourceManager):
         else:
 
             if self.in_foreground:
-                self._start_in_foreground()
+                self._run_in_foreground()
             else:
-                self._start_in_background()
+                self._run_in_background()
 
             super(LinuxApplication, self).start()
 
-    def _start_in_foreground(self):
+    def _run_in_foreground(self):
         command = self.get("command")
         sudo = self.get("sudo") or False
         x11 = self.get("forwardX11")
@@ -506,17 +514,12 @@ class LinuxApplication(ResourceManager):
         # Command will be launched in foreground and attached to the
         # terminal using the node 'execute' in non blocking mode.
 
-        # Export environment
-        env = self.get("env")
-        environ = self.node.format_environment(env, inline = True)
-        command = environ + command
-        command = self.replace_paths(command)
-
         # We save the reference to the process in self._proc 
         # to be able to kill the process from the stop method.
         # We also set blocking = False, since we don't want the
         # thread to block until the execution finishes.
-        (out, err), self._proc = self.node.execute(command,
+        (out, err), self._proc = self.execute_command(self, command, 
+                env = env,
                 sudo = sudo,
                 stdin = stdin,
                 forward_x11 = x11,
@@ -527,7 +530,7 @@ class LinuxApplication(ResourceManager):
             self.error(msg, out, err)
             raise RuntimeError, msg
 
-    def _start_in_background(self):
+    def _run_in_background(self):
         command = self.get("command")
         env = self.get("env")
         sudo = self.get("sudo") or False
@@ -581,14 +584,14 @@ class LinuxApplication(ResourceManager):
         command = self.get('command') or ''
 
         if self.state == ResourceState.STARTED:
-            stopped = True
-
+        
             self.info("Stopping command '%s'" % command)
         
             # If the command is running in foreground (it was launched using
             # the node 'execute' method), then we use the handler to the Popen
             # process to kill it. Else we send a kill signal using the pid and ppid
             # retrieved after running the command with the node 'run' method
+            stopped = True
 
             if self._proc:
                 self._proc.kill()
@@ -596,17 +599,17 @@ class LinuxApplication(ResourceManager):
                 # Only try to kill the process if the pid and ppid
                 # were retrieved
                 if self.pid and self.ppid:
-                    (out, err), proc = self.node.kill(self.pid, self.ppid)
+                    (out, err), proc = self.node.kill(self.pid, self.ppid,
+                            sudo = self._sudo_kill)
 
-                    if out or err:
+                    if proc.poll() or err:
                         # check if execution errors occurred
                         msg = " Failed to STOP command '%s' " % self.get("command")
                         self.error(msg, out, err)
                         self.fail()
-                        stopped = False
 
-            if stopped:
-                super(LinuxApplication, self).stop()
+        if self.state == ResourceState.STARTED:
+            super(LinuxApplication, self).stop()
 
     def release(self):
         self.info("Releasing resource")
@@ -618,6 +621,8 @@ class LinuxApplication(ResourceManager):
         self.stop()
 
         if self.state == ResourceState.STOPPED:
+            self.info("Resource released")
+
             super(LinuxApplication, self).release()
     
     @property
@@ -644,31 +649,52 @@ class LinuxApplication(ResourceManager):
 
             else:
                 # We need to query the status of the command we launched in 
-                # background. In oredr to avoid overwhelming the remote host and
+                # background. In order to avoid overwhelming the remote host and
                 # the local processor with too many ssh queries, the state is only
                 # requested every 'state_check_delay' seconds.
                 state_check_delay = 0.5
                 if tdiffsec(tnow(), self._last_state_check) > state_check_delay:
-                    # check if execution errors occurred
-                    (out, err), proc = self.node.check_errors(self.run_home)
-
-                    if err:
-                        msg = " Failed to execute command '%s'" % self.get("command")
-                        self.error(msg, out, err)
-                        self.fail()
-
-                    elif self.pid and self.ppid:
-                        # No execution errors occurred. Make sure the background
-                        # process with the recorded pid is still running.
+                    if self.pid and self.ppid:
+                        # Make sure the process is still running in background
                         status = self.node.status(self.pid, self.ppid)
 
                         if status == ProcStatus.FINISHED:
-                            self._state = ResourceState.FINISHED
+                            # If the program finished, check if execution
+                            # errors occurred
+                            (out, err), proc = self.node.check_errors(
+                                    self.run_home)
+
+                            if err:
+                                msg = " Failed to execute command '%s'" % \
+                                        self.get("command")
+                                self.error(msg, out, err)
+                                self.fail()
+                            else:
+                               self._state = ResourceState.FINISHED
 
                     self._last_state_check = tnow()
 
         return self._state
 
+    def execute_command(self, command, 
+            env = None,
+            sudo = False,
+            stdin = None,
+            forward_x11 = False,
+            blocking = False):
+
+        environ = ""
+        if env:
+            environ = self.node.format_environment(env, inline = True)
+        command = environ + command
+        command = self.replace_paths(command)
+
+        return self.node.execute(command,
+                sudo = sudo,
+                stdin = stdin,
+                forward_x11 = forward_x11,
+                blocking = blocking)
+
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.