Minor typos
[nepi.git] / src / neco / resources / linux / application.py
index b7b3f4e..882fb4c 100644 (file)
@@ -3,11 +3,13 @@ from neco.execution.trace import Trace, TraceAttr
 from neco.execution.resource import ResourceManager, clsinit, ResourceState
 from neco.resources.linux.node import LinuxNode
 from neco.util import sshfuncs 
+from neco.util.timefuncs import strfnow, strfdiff
 
 import logging
 import os
 
 reschedule_delay = "0.5s"
+state_check_delay = 1
 
 # TODO: Resolve wildcards in commands!! 
 
@@ -96,6 +98,9 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
+        # timestamp of last state check of the application
+        self._last_state_check = strfnow()
+
         self._logger = logging.getLogger("LinuxApplication")
     
     def log_message(self, msg):
@@ -110,7 +115,7 @@ class LinuxApplication(ResourceManager):
 
     @property
     def app_home(self):
-        return os.path.join(self.node.exp_dir, self._home)
+        return os.path.join(self.node.exp_home, self._home)
 
     @property
     def src_dir(self):
@@ -133,8 +138,8 @@ class LinuxApplication(ResourceManager):
 
         path = os.path.join(self.app_home, name)
         
-        cmd = "(test -f %s && echo 'success') || echo 'error'" % path
-        (out, err), proc = self.node.execute(cmd)
+        command = "(test -f %s && echo 'success') || echo 'error'" % path
+        (out, err), proc = self.node.execute(command)
 
         if (err and proc.poll()) or out.find("error") != -1:
             msg = " Couldn't find trace %s " % name
@@ -181,6 +186,9 @@ class LinuxApplication(ResourceManager):
         # upload code
         self.upload_code()
 
+        # upload stdin
+        self.upload_stdin()
+
         # install dependencies
         self.install_dependencies()
 
@@ -190,14 +198,23 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
-        command = self.replace_paths(self.get("command"))
-        x11 = self.get("forwardX11") or False
-        if not x11:
+        command = self.get("command")
+        x11 = self.get("forwardX11")
+        if not x11 and command:
             self.info("Uploading command '%s'" % command)
-            
+
+            # Export environment
+            environ = ""
+            env = self.get("env") or ""
+            for var in env.split(" "):
+                environ += 'export %s\n' % var
+
+            command = environ + command
+
             # If the command runs asynchronous, pre upload the command 
             # to the app.sh file in the remote host
             dst = os.path.join(self.app_home, "app.sh")
+            command = self.replace_paths(command)
             self.node.upload(command, dst, text = True)
 
         super(LinuxApplication, self).provision()
@@ -220,14 +237,26 @@ class LinuxApplication(ResourceManager):
                     sources.remove(source)
 
             # Download http sources
-            for source in http_sources:
-                dst = os.path.join(self.src_dir, source.split("/")[-1])
-                # TODO: Check if the tar.gz is already downloaded using a hash
-                # and don't download twice !!
-                command = "wget -o %s %s" % (dst, source)
-                self.node.execute(command)
-
-            self.node.upload(sources, self.src_dir)
+            if http_sources:
+                cmd = " wget -c --directory-prefix=${SOURCES} "
+                verif = ""
+
+                for source in http_sources:
+                    cmd += " %s " % (source)
+                    verif += " ls ${SOURCES}/%s ;" % os.path.basename(source)
+                
+                # Wget output goes to stderr :S
+                cmd += " 2> /dev/null ; "
+
+                # Add verification
+                cmd += " %s " % verif
+
+                # Upload the command to a file, and execute asynchronously
+                self.upload_and_run(cmd, 
+                        "http_sources.sh", "http_sources_pid", 
+                        "http_sources_out", "http_sources_err")
+            if sources:
+                self.node.upload(sources, self.src_dir)
 
     def upload_code(self):
         code = self.get("code")
@@ -240,6 +269,15 @@ class LinuxApplication(ResourceManager):
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
 
+    def upload_stdin(self):
+        stdin = self.get("stdin")
+        if stdin:
+            # create dir for sources
+            self.info(" Uploading stdin ")
+
+            dst = os.path.join(self.app_home, "stdin")
+            self.node.upload(stdin, dst, text = True)
+
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
@@ -254,32 +292,22 @@ class LinuxApplication(ResourceManager):
             # create dir for build
             self.node.mkdir(self.build_dir)
 
-            cmd = self.replace_paths(build)
-
-            (out, err), proc = self.run_and_wait(cmd, self.app_home,
-                pidfile = "build_pid",
-                stdout = "build_out", 
-                stderr = "build_err", 
-                raise_on_error = True)
+            # Upload the command to a file, and execute asynchronously
+            self.upload_and_run(build, 
+                    "build.sh", "build_pid", 
+                    "build_out", "build_err")
  
     def install(self):
         install = self.get("install")
         if install:
             self.info(" Installing sources ")
 
-            cmd = self.replace_paths(install)
-
-            (out, err), proc = self.run_and_wait(cmd, self.app_home, 
-                pidfile = "install_pid",
-                stdout = "install_out", 
-                stderr = "install_err", 
-                raise_on_error = True)
+            # Upload the command to a file, and execute asynchronously
+            self.upload_and_run(install, 
+                    "install.sh", "install_pid", 
+                    "install_out", "install_err")
 
     def deploy(self):
-        command = self.replace_paths(self.get("command"))
-        
-        self.info(" Deploying command '%s' " % command)
-
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
@@ -287,6 +315,8 @@ class LinuxApplication(ResourceManager):
             self.ec.schedule(reschedule_delay, self.deploy)
         else:
             try:
+                command = self.get("command") or ""
+                self.info(" Deploying command '%s' " % command)
                 self.discover()
                 self.provision()
             except:
@@ -296,33 +326,53 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.replace_paths(self.get("command"))
-        env = self.get("env")
-        stdin = 'stdin' if self.get("stdin") else None
+        command = self.get('command')
+        env = self.get('env')
+        stdin = 'stdin' if self.get('stdin') else None
+        stdout = 'stdout' if self.get('stdout') else 'stdout'
+        stderr = 'stderr' if self.get('stderr') else 'stderr'
         sudo = self.get('sudo') or False
-        x11 = self.get("forwardX11") or False
+        x11 = self.get('forwardX11') or False
         failed = False
 
         super(LinuxApplication, self).start()
 
+        if not command:
+            self.info("No command to start ")
+            self._state = ResourceState.FINISHED
+            return 
+    
         self.info("Starting command '%s'" % command)
 
         if x11:
+            if env:
+                # Export environment
+                environ = ""
+                for var in env.split(" "):
+                    environ += ' %s ' % var
+
+                command = "(" + environ + " ; " + command + ")"
+                command = self.replace_paths(command)
+
+            # If the command requires X11 forwarding, we
+            # can't run it asynchronously
             (out, err), proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
-                    stdout = 'stdout',
-                    stderr = 'stderr',
-                    env = env,
                     forward_x11 = x11)
 
+            self._state = ResourceState.FINISHED
+
             if proc.poll() and err:
                 failed = True
         else:
-            # Run the command asynchronously
-            command = "bash ./app.sh"
-            (out, err), proc = self.node.run(command, self.app_home, 
+            # Command was  previously uploaded, now run the remote
+            # bash file asynchronously
+            cmd = "bash ./app.sh"
+            (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
+                stdout = stdout,
+                stderr = stderr,
                 sudo = sudo)
 
             if proc.poll() and err:
@@ -336,29 +386,31 @@ class LinuxApplication(ResourceManager):
             if not self.pid or not self.ppid:
                 failed = True
  
-        (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
+            (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
 
-        if failed or out or chkerr:
-            # check if execution errors occurred
-            msg = " Failed to start command '%s' " % command
-            out = out
-            if err:
-                err = err
-            elif chkerr:
-                err = chkerr
+            if failed or out or chkerr:
+                # check if execution errors occurred
+                msg = " Failed to start command '%s' " % command
+                out = out
+                if err:
+                    err = err
+                elif chkerr:
+                    err = chkerr
 
-            self.error(msg, out, err)
+                self.error(msg, out, err)
 
-            msg2 = " Setting state to Failed"
-            self.debug(msg2)
-            self._state = ResourceState.FAILED
+                msg2 = " Setting state to Failed"
+                self.debug(msg2)
+                self._state = ResourceState.FAILED
 
-            raise RuntimeError, msg
+                raise RuntimeError, msg
 
     def stop(self):
+        command = self.get('command') or ''
         state = self.state
+        
         if state == ResourceState.STARTED:
-            self.info("Stopping command %s" % command)
+            self.info("Stopping command '%s'" % command)
 
             (out, err), proc = self.node.kill(self.pid, self.ppid)
 
@@ -385,65 +437,66 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+            # To avoid overwhelming the remote hosts and the local processor
+            # with too many ssh queries, the state is only requested
+            # every 'state_check_delay' .
+            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                # check if execution errors occurred
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
 
-            if out or err:
-                if err.find("No such file or directory") >= 0 :
-                    # The resource is marked as started, but the
-                    # command was not yet executed
-                    return ResourceState.READY
+                if out or err:
+                    if err.find("No such file or directory") >= 0 :
+                        # The resource is marked as started, but the
+                        # command was not yet executed
+                        return ResourceState.READY
 
-                # check if execution errors occurred
-                msg = " Failed to execute command '%s'" % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                    msg = " Failed to execute command '%s'" % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
 
-            elif self.pid and self.ppid:
-                status = self.node.status(self.pid, self.ppid)
+                elif self.pid and self.ppid:
+                    status = self.node.status(self.pid, self.ppid)
 
-                if status == sshfuncs.FINISHED:
-                    self._state = ResourceState.FINISHED
+                    if status == sshfuncs.FINISHED:
+                        self._state = ResourceState.FINISHED
 
-        return self._state
 
-    def valid_connection(self, guid):
-        # TODO: Validate!
-        return True
-        # XXX: What if it is connected to more than one node?
-        resources = self.find_resources(exact_tags = [tags.NODE])
-        self._node = resources[0] if len(resources) == 1 else None
-        return self._node
+                self._last_state_check = strfnow()
 
-    def hash_app(self):
-        """ Generates a hash representing univokely the application.
-        Is used to determine whether the home directory should be cleaned
-        or not.
+        return self._state
 
-        """
-        command = self.get("command")
-        forwards_x11 = self.get("forwardX11")
-        env = self.get("env")
-        sudo = self.get("sudo")
-        depends = self.get("depends")
-        sources = self.get("sources")
-        cls._register_attribute(sources)
-        cls._register_attribute(build)
-        cls._register_attribute(install)
-        cls._register_attribute(stdin)
-        cls._register_attribute(stdout)
-        cls._register_attribute(stderr)
-        cls._register_attribute(tear_down)
-        skey = "".join(map(str, args))
-        return hashlib.md5(skey).hexdigest()
+    def upload_and_run(self, cmd, fname, pidfile, outfile, errfile):
+        dst = os.path.join(self.app_home, fname)
+        cmd = self.replace_paths(cmd)
+        self.node.upload(cmd, dst, text = True)
+
+        cmd = "bash ./%s" % fname
+        (out, err), proc = self.node.run_and_wait(cmd, self.app_home,
+            pidfile = pidfile,
+            stdout = outfile, 
+            stderr = errfile, 
+            raise_on_error = True)
 
     def replace_paths(self, command):
         """
         Replace all special path tags with shell-escaped actual paths.
         """
-        return ( command
-            .replace("${SOURCES}", self.src_dir)
-            .replace("${BUILD}", self.build_dir) 
-            .replace("${APPHOME}", self.app_home) 
-            .replace("${NODEHOME}", self.node.home) )
+        def absolute_dir(d):
+            return d if d.startswith("/") else os.path.join("${HOME}", d)
 
+        return ( command
+            .replace("${SOURCES}", absolute_dir(self.src_dir))
+            .replace("${BUILD}", absolute_dir(self.build_dir))
+            .replace("${APP_HOME}", absolute_dir(self.app_home))
+            .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
+            .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
+            )
+        
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+        # XXX: What if it is connected to more than one node?
+        resources = self.find_resources(exact_tags = [tags.NODE])
+        self._node = resources[0] if len(resources) == 1 else None
+        return self._node