Bugfixing LinuxNode and LinuxApplication
[nepi.git] / src / nepi / resources / linux / application.py
index 080c3a4..5f4c998 100644 (file)
@@ -21,16 +21,13 @@ from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.trace import Trace, TraceAttr
 from nepi.execution.resource import ResourceManager, clsinit, ResourceState
 from nepi.resources.linux.node import LinuxNode
-from nepi.util import sshfuncs 
+from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
 
-reschedule_delay = "0.5s"
-state_check_delay = 1
-
 # TODO: Resolve wildcards in commands!!
-# TODO: If command is not set give a warning but do not generate an error!
+
 
 @clsinit
 class LinuxApplication(ResourceManager):
@@ -215,25 +212,21 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
+        # Upload command
         command = self.get("command")
         x11 = self.get("forwardX11")
-        if not x11 and command:
+        env = self.get("env")
+        
+        if command and not x11:
             self.info("Uploading command '%s'" % command)
 
-            # Export environment
-            environ = ""
-            if self.get("env"):
-                for var in self.get("env").split(" "):
-                    environ += 'export %s\n' % var
-
-            command = environ + command
-
-            # If the command runs asynchronous, pre upload the command 
-            # to the app.sh file in the remote host
-            dst = os.path.join(self.app_home, "app.sh")
+            # replace application specific paths in the command
             command = self.replace_paths(command)
-            self.node.upload(command, dst, text = True)
 
+            self.node.upload_command(command, self.app_home, 
+                    shfile = "app.sh",
+                    env = env)
+       
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
@@ -253,25 +246,29 @@ class LinuxApplication(ResourceManager):
                     http_sources.append(source)
                     sources.remove(source)
 
-            # Download http sources
+            # Download http sources remotely
             if http_sources:
-                cmd = " wget -c --directory-prefix=${SOURCES} "
-                verif = ""
+                command = " wget -c --directory-prefix=${SOURCES} "
+                check = ""
 
                 for source in http_sources:
-                    cmd += " %s " % (source)
-                    verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+                    command += " %s " % (source)
+                    check += " ls ${SOURCES}/%s ;" % os.path.basename(source)
                 
-                # Wget output goes to stderr :S
-                cmd += " 2> /dev/null ; "
-
-                # Add verification
-                cmd += " %s " % verif
+                # Append the command to check that the sources were downloaded
+                command += " ; %s " % check
 
+                # replace application specific paths in the command
+                command = self.replace_paths(command)
+                
                 # Upload the command to a file, and execute asynchronously
-                self.upload_and_run(cmd, 
-                        "http_sources.sh", "http_sources_pid", 
-                        "http_sources_out", "http_sources_err")
+                self.node.run_and_wait(command, self.app_home,
+                        shfile = "http_sources.sh",
+                        pidfile = "http_sources_pidfile", 
+                        ecodefile = "http_sources_exitcode", 
+                        stdout = "http_sources_stdout", 
+                        stderr = "http_sources_stderr")
+
             if sources:
                 self.node.upload(sources, self.src_dir)
 
@@ -299,7 +296,7 @@ class LinuxApplication(ResourceManager):
         depends = self.get("depends")
         if depends:
             self.info(" Installing dependencies %s" % depends)
-            self.node.install_packages(depends, home = self.app_home)
+            self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
@@ -309,26 +306,40 @@ class LinuxApplication(ResourceManager):
             # create dir for build
             self.node.mkdir(self.build_dir)
 
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+
             # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(build, 
-                    "build.sh", "build_pid", 
-                    "build_out", "build_err")
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "build.sh",
+                    pidfile = "build_pidfile", 
+                    ecodefile = "build_exitcode", 
+                    stdout = "build_stdout", 
+                    stderr = "build_stderr")
  
     def install(self):
         install = self.get("install")
         if install:
             self.info(" Installing sources ")
 
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+
             # Upload the command to a file, and execute asynchronously
-            self.upload_and_run(install, 
-                    "install.sh", "install_pid", 
-                    "install_out", "install_err")
+            self.node.run_and_wait(command, self.app_home,
+                    shfile = "install.sh",
+                    pidfile = "install_pidfile", 
+                    ecodefile = "install_exitcode", 
+                    stdout = "install_stdout", 
+                    stderr = "install_stderr")
 
     def deploy(self):
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
             self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+            
+            reschedule_delay = "0.5s"
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
@@ -352,16 +363,25 @@ class LinuxApplication(ResourceManager):
         x11 = self.get('forwardX11') or False
         failed = False
 
-        super(LinuxApplication, self).start()
-
         if not command:
-            self.info("No command to start ")
+            # If no command was given, then the application 
+            # is directly marked as FINISHED
             self._state = ResourceState.FINISHED
-            return 
+        else:
+            super(LinuxApplication, self).start()
     
         self.info("Starting command '%s'" % command)
 
         if x11:
+            # If X11 forwarding was specified, then the application
+            # can not run detached, so instead of invoking asynchronous
+            # 'run' we invoke synchronous 'execute'.
+            if not command:
+                msg = "No command is defined but X11 forwarding has been set"
+                self.error(msg)
+                self._state = ResourceState.FAILED
+                raise RuntimeError, msg
+
             if env:
                 # Export environment
                 environ = ""
@@ -392,28 +412,22 @@ class LinuxApplication(ResourceManager):
                 stderr = stderr,
                 sudo = sudo)
 
+            # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            
             if proc.poll() and err:
-                failed = True
+                self.error(msg, out, err)
+                raise RuntimeError, msg
         
-            if not failed:
-                pid, ppid = self.node.wait_pid(home = self.app_home)
-                if pid: self._pid = int(pid)
-                if ppid: self._ppid = int(ppid)
+            # Check status of process running in background
+            pid, ppid = self.node.wait_pid(self.app_home)
+            if pid: self._pid = int(pid)
+            if ppid: self._ppid = int(ppid)
 
+            # If the process is not running, check for error information
+            # on the remote machine
             if not self.pid or not self.ppid:
-                failed = True
-            (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
-
-            if failed or out or chkerr:
-                # check if execution errors occurred
-                msg = " Failed to start command '%s' " % command
-                out = out
-                if err:
-                    err = err
-                elif chkerr:
-                    err = chkerr
-
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
                 self.error(msg, out, err)
 
                 msg2 = " Setting state to Failed"
@@ -456,10 +470,11 @@ class LinuxApplication(ResourceManager):
         if self._state == ResourceState.STARTED:
             # To avoid overwhelming the remote hosts and the local processor
             # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' .
+            # every 'state_check_delay' seconds.
+            state_check_delay = 0.5
             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                 # check if execution errors occurred
-                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+                (out, err), proc = self.node.check_errors(self.app_home)
 
                 if out or err:
                     if err.find("No such file or directory") >= 0 :
@@ -474,7 +489,7 @@ class LinuxApplication(ResourceManager):
                 elif self.pid and self.ppid:
                     status = self.node.status(self.pid, self.ppid)
 
-                    if status == sshfuncs.FINISHED:
+                    if status == ProcStatus.FINISHED:
                         self._state = ResourceState.FINISHED
 
 
@@ -482,18 +497,6 @@ class LinuxApplication(ResourceManager):
 
         return self._state
 
-    def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
-        dst = os.path.join(self.app_home, fname)
-        cmd = self.replace_paths(cmd)
-        self.node.upload(cmd, dst, text = True)
-
-        cmd = "bash ./%s" % fname
-        (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
-            pidfile = pidfile,
-            stdout = outfile, 
-            stderr = errfile, 
-            raise_on_error = True)
-
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.