Adding Linux Application scalability tests
[nepi.git] / src / neco / resources / linux / application.py
index 4f2b989..e7a34d5 100644 (file)
@@ -3,11 +3,13 @@ from neco.execution.trace import Trace, TraceAttr
 from neco.execution.resource import ResourceManager, clsinit, ResourceState
 from neco.resources.linux.node import LinuxNode
 from neco.util import sshfuncs 
+from neco.util.timefuncs import strfnow, strfdiff
 
 import logging
 import os
 
 reschedule_delay = "0.5s"
+state_check_delay = 1
 
 # TODO: Resolve wildcards in commands!! 
 
@@ -96,6 +98,9 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
+        # timestamp of last state check of the application
+        self._last_state_check = strfnow()
+
         self._logger = logging.getLogger("LinuxApplication")
     
     def log_message(self, msg):
@@ -181,6 +186,9 @@ class LinuxApplication(ResourceManager):
         # upload code
         self.upload_code()
 
+        # upload stdin
+        self.upload_stdin()
+
         # install dependencies
         self.install_dependencies()
 
@@ -194,8 +202,14 @@ class LinuxApplication(ResourceManager):
         x11 = self.get("forwardX11")
         if not x11 and command:
             self.info("Uploading command '%s'" % command)
-        
-            # TODO: missing set PATH and PYTHONPATH!!
+
+            # Export environment
+            environ = ""
+            env = self.get("env") or ""
+            for var in env.split(" "):
+                environ += 'export %s\n' % var
+
+            command = environ + command
 
             # If the command runs asynchronous, pre upload the command 
             # to the app.sh file in the remote host
@@ -255,6 +269,15 @@ class LinuxApplication(ResourceManager):
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
 
+    def upload_stdin(self):
+        stdin = self.get("stdin")
+        if stdin:
+            # create dir for sources
+            self.info(" Uploading stdin ")
+
+            dst = os.path.join(self.app_home, "stdin")
+            self.node.upload(stdin, dst, text = True)
+
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
@@ -303,13 +326,13 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.get("command")
-        env = self.get("env")
-        stdin = 'stdin' if self.get("stdin") else None
-        stdout = 'stdout' if self.get("stdout") else 'stdout'
-        stderr = 'stderr' if self.get("stderr") else 'stderr'
+        command = self.get('command')
+        env = self.get('env')
+        stdin = 'stdin' if self.get('stdin') else None
+        stdout = 'stdout' if self.get('stdout') else 'stdout'
+        stderr = 'stderr' if self.get('stderr') else 'stderr'
         sudo = self.get('sudo') or False
-        x11 = self.get("forwardX11") or False
+        x11 = self.get('forwardX11') or False
         failed = False
 
         super(LinuxApplication, self).start()
@@ -322,24 +345,29 @@ class LinuxApplication(ResourceManager):
         self.info("Starting command '%s'" % command)
 
         if x11:
+            if env:
+                # Export environment
+                environ = ""
+                for var in env.split(" "):
+                    environ += ' %s ' % var
+
+                command = "(" + environ + " ; " + command + ")"
+                command = self.replace_paths(command)
+
             # If the command requires X11 forwarding, we
             # can't run it asynchronously
             (out, err), proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
-                    stdout = stdout,
-                    stderr = stderr,
-                    env = env,
                     forward_x11 = x11)
 
+            self._state = ResourceState.FINISHED
+
             if proc.poll() and err:
                 failed = True
         else:
             # Command was  previously uploaded, now run the remote
             # bash file asynchronously
-            if env:
-                env = self.replace_paths(env)
-
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
@@ -378,9 +406,11 @@ class LinuxApplication(ResourceManager):
             raise RuntimeError, msg
 
     def stop(self):
+        command = self.get('command') or ''
         state = self.state
+        
         if state == ResourceState.STARTED:
-            self.info("Stopping command %s" % command)
+            self.info("Stopping command '%s'" % command)
 
             (out, err), proc = self.node.kill(self.pid, self.ppid)
 
@@ -407,24 +437,31 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+            # To avoid overwhelming the remote hosts and the local processor
+            # with too many ssh queries, the state is only requested
+            # every 'state_check_delay' .
+            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                # check if execution errors occurred
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
 
-            if out or err:
-                if err.find("No such file or directory") >= 0 :
-                    # The resource is marked as started, but the
-                    # command was not yet executed
-                    return ResourceState.READY
+                if out or err:
+                    if err.find("No such file or directory") >= 0 :
+                        # The resource is marked as started, but the
+                        # command was not yet executed
+                        return ResourceState.READY
 
-                # check if execution errors occurred
-                msg = " Failed to execute command '%s'" % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
+                    msg = " Failed to execute command '%s'" % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+
+                elif self.pid and self.ppid:
+                    status = self.node.status(self.pid, self.ppid)
+
+                    if status == sshfuncs.FINISHED:
+                        self._state = ResourceState.FINISHED
 
-            elif self.pid and self.ppid:
-                status = self.node.status(self.pid, self.ppid)
 
-                if status == sshfuncs.FINISHED:
-                    self._state = ResourceState.FINISHED
+                self._last_state_check = strfnow()
 
         return self._state
 
@@ -452,7 +489,8 @@ class LinuxApplication(ResourceManager):
             .replace("${BUILD}", absolute_dir(self.build_dir))
             .replace("${APP_HOME}", absolute_dir(self.app_home))
             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
-            .replace("${EXP_HOME}", self.node.exp_home) )
+            .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
+            )
         
     def valid_connection(self, guid):
         # TODO: Validate!
@@ -462,25 +500,3 @@ class LinuxApplication(ResourceManager):
         self._node = resources[0] if len(resources) == 1 else None
         return self._node
 
-    def hash_app(self):
-        """ Generates a hash representing univokely the application.
-        Is used to determine whether the home directory should be cleaned
-        or not.
-
-        """
-        command = self.get("command")
-        forwards_x11 = self.get("forwardX11")
-        env = self.get("env")
-        sudo = self.get("sudo")
-        depends = self.get("depends")
-        sources = self.get("sources")
-        cls._register_attribute(sources)
-        cls._register_attribute(build)
-        cls._register_attribute(install)
-        cls._register_attribute(stdin)
-        cls._register_attribute(stdout)
-        cls._register_attribute(stderr)
-        cls._register_attribute(tear_down)
-        skey = "".join(map(str, args))
-        return hashlib.md5(skey).hexdigest()
-