Added unit tests for linux application
[nepi.git] / src / neco / resources / linux / application.py
index 321c696..b7b3f4e 100644 (file)
@@ -7,7 +7,7 @@ from neco.util import sshfuncs
 import logging
 import os
 
-DELAY ="1s"
+reschedule_delay = "0.5s"
 
 # TODO: Resolve wildcards in commands!! 
 
@@ -62,12 +62,6 @@ class LinuxApplication(ResourceManager):
         stdin = Attribute("stdin", "Standard input", flags = Flags.ExecReadOnly)
         stdout = Attribute("stdout", "Standard output", flags = Flags.ExecReadOnly)
         stderr = Attribute("stderr", "Standard error", flags = Flags.ExecReadOnly)
-        update_home = Attribute("updateHome", "If application hash has changed remove old directory and"
-                "re-upload before starting experiment. If not keep the same directory", 
-                default = True,
-                type = Types.Bool, 
-                flags = Flags.ExecReadOnly)
-
         tear_down = Attribute("tearDown", "Bash script to be executed before "
                 "releasing the resource", 
                 flags = Flags.ReadOnly)
@@ -84,7 +78,6 @@ class LinuxApplication(ResourceManager):
         cls._register_attribute(stdin)
         cls._register_attribute(stdout)
         cls._register_attribute(stderr)
-        cls._register_attribute(update_home)
         cls._register_attribute(tear_down)
 
     @classmethod
@@ -103,7 +96,11 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
-        self._logger = logging.getLogger("neco.linux.Application.%d" % guid)
+        self._logger = logging.getLogger("LinuxApplication")
+    
+    def log_message(self, msg):
+        return " guid %d - host %s - %s " % (self.guid, 
+                self.node.get("hostname"), msg)
 
     @property
     def node(self):
@@ -112,16 +109,16 @@ class LinuxApplication(ResourceManager):
         return None
 
     @property
-    def home(self):
+    def app_home(self):
         return os.path.join(self.node.exp_dir, self._home)
 
     @property
     def src_dir(self):
-        return os.path.join(self.home, 'src')
+        return os.path.join(self.app_home, 'src')
 
     @property
     def build_dir(self):
-        return os.path.join(self.home, 'build')
+        return os.path.join(self.app_home, 'build')
 
     @property
     def pid(self):
@@ -132,27 +129,27 @@ class LinuxApplication(ResourceManager):
         return self._ppid
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
-        path = os.path.join(self.home, name)
+        self.info("Retrieving '%s' trace %s " % (name, attr))
+
+        path = os.path.join(self.app_home, name)
         
         cmd = "(test -f %s && echo 'success') || echo 'error'" % path
         (out, err), proc = self.node.execute(cmd)
 
         if (err and proc.poll()) or out.find("error") != -1:
-            err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
-                    name, self.node.get("hostname"), err)
-            self.logger.error(err_msg)
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
             return None
     
         if attr == TraceAttr.PATH:
             return path
 
         if attr == TraceAttr.ALL:
-            (out, err), proc = self.node.check_output(self.home, name)
+            (out, err), proc = self.node.check_output(self.app_home, name)
             
             if err and proc.poll():
-                err_msg = " Couldn't read trace %s on host %s. Error: %s" % (
-                            name, self.node.get("hostname"), err)
-                self.logger.error(err_msg)
+                msg = " Couldn't read trace %s " % name
+                self.error(msg, out, err)
                 return None
 
             return out
@@ -165,9 +162,8 @@ class LinuxApplication(ResourceManager):
         (out, err), proc = self.node.execute(cmd)
 
         if err and proc.poll():
-            err_msg = " Couldn't find trace %s on host %s. Error: %s" % (
-                    name, self.node.get("hostname"), err)
-            self.logger.error(err_msg)
+            msg = " Couldn't find trace %s " % name
+            self.error(msg, out, err)
             return None
         
         if attr == TraceAttr.SIZE:
@@ -176,10 +172,8 @@ class LinuxApplication(ResourceManager):
         return out
             
     def provision(self, filters = None):
-        # TODO: verify home hash or clean home
-
         # create home dir for application
-        self.node.mkdir(self.home)
+        self.node.mkdir(self.app_home)
 
         # upload sources
         self.upload_sources()
@@ -196,18 +190,28 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
+        command = self.replace_paths(self.get("command"))
+        x11 = self.get("forwardX11") or False
+        if not x11:
+            self.info("Uploading command '%s'" % command)
+            
+            # If the command runs asynchronous, pre upload the command 
+            # to the app.sh file in the remote host
+            dst = os.path.join(self.app_home, "app.sh")
+            self.node.upload(command, dst, text = True)
+
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
-        # check if sources need to be uploaded and upload them
+        # TODO: check if sources need to be uploaded and upload them
         sources = self.get("sources")
         if sources:
-            self.logger.debug(" Uploading sources %s" % sources)
+            self.info(" Uploading sources ")
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            sources = self.sources.split(' ')
+            sources = sources.split(' ')
 
             http_sources = list()
             for source in list(sources):
@@ -218,6 +222,8 @@ class LinuxApplication(ResourceManager):
             # Download http sources
             for source in http_sources:
                 dst = os.path.join(self.src_dir, source.split("/")[-1])
+                # TODO: Check if the tar.gz is already downloaded using a hash
+                # and don't download twice !!
                 command = "wget -o %s %s" % (dst, source)
                 self.node.execute(command)
 
@@ -229,7 +235,7 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            self.logger.debug(" Uploading code '%s'" % code)
+            self.info(" Uploading code ")
 
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
@@ -237,46 +243,55 @@ class LinuxApplication(ResourceManager):
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
-            self.logger.debug(" Installing dependencies %s" % depends)
-            self.node.install_packages(depends, home = self.home)
+            self.info(" Installing dependencies %s" % depends)
+            self.node.install_packages(depends, home = self.app_home)
 
     def build(self):
         build = self.get("build")
         if build:
-            self.logger.debug(" Building sources '%s'" % build)
+            self.info(" Building sources ")
             
             # create dir for build
             self.node.mkdir(self.build_dir)
 
             cmd = self.replace_paths(build)
 
-            (out, err), proc = self.run_and_wait(cmd, self.home,
+            (out, err), proc = self.run_and_wait(cmd, self.app_home,
                 pidfile = "build_pid",
-                stdout = "build_log", 
+                stdout = "build_out", 
                 stderr = "build_err", 
                 raise_on_error = True)
  
     def install(self):
         install = self.get("install")
         if install:
-            self.logger.debug(" Installing sources '%s'" % install)
+            self.info(" Installing sources ")
 
             cmd = self.replace_paths(install)
 
-            (out, err), proc = self.run_and_wait(cmd, self.home, 
+            (out, err), proc = self.run_and_wait(cmd, self.app_home, 
                 pidfile = "install_pid",
-                stdout = "install_log", 
+                stdout = "install_out", 
                 stderr = "install_err", 
                 raise_on_error = True)
 
     def deploy(self):
+        command = self.replace_paths(self.get("command"))
+        
+        self.info(" Deploying command '%s' " % command)
+
         # Wait until node is associated and deployed
         node = self.node
         if not node or node.state < ResourceState.READY:
-            self.ec.schedule(DELAY, self.deploy)
+            self.debug("---- RESCHEDULING DEPLOY ---- node state %s " % self.node.state )
+            self.ec.schedule(reschedule_delay, self.deploy)
         else:
-            self.discover()
-            self.provision()
+            try:
+                self.discover()
+                self.provision()
+            except:
+                self._state = ResourceState.FAILED
+                raise
 
             super(LinuxApplication, self).deploy()
 
@@ -286,12 +301,12 @@ class LinuxApplication(ResourceManager):
         stdin = 'stdin' if self.get("stdin") else None
         sudo = self.get('sudo') or False
         x11 = self.get("forwardX11") or False
-        err_msg = "Failed to run command %s on host %s" % (
-                     command, self.node.get("hostname"))
         failed = False
 
         super(LinuxApplication, self).start()
 
+        self.info("Starting command '%s'" % command)
+
         if x11:
             (out, err), proc = self.node.execute(command,
                     sudo = sudo,
@@ -304,7 +319,9 @@ class LinuxApplication(ResourceManager):
             if proc.poll() and err:
                 failed = True
         else:
-            (out, err), proc = self.node.run(command, self.home, 
+            # Run the command asynchronously
+            command = "bash ./app.sh"
+            (out, err), proc = self.node.run(command, self.app_home, 
                 stdin = stdin, 
                 sudo = sudo)
 
@@ -312,44 +329,51 @@ class LinuxApplication(ResourceManager):
                 failed = True
         
             if not failed:
-                pid, ppid = self.node.wait_pid(home = self.home)
+                pid, ppid = self.node.wait_pid(home = self.app_home)
                 if pid: self._pid = int(pid)
                 if ppid: self._ppid = int(ppid)
 
             if not self.pid or not self.ppid:
                 failed = True
  
-        (out, chkerr), proc = self.node.check_output(self.home, 'stderr')
+        (out, chkerr), proc = self.node.check_output(self.app_home, 'stderr')
 
         if failed or out or chkerr:
             # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            out = out
             if err:
-                err_msg = "%s. Proc error: %s" % (err_msg, err)
+                err = err
+            elif chkerr:
+                err = chkerr
 
-            err_msg = "%s. Run error: %s " % (err_msg, out)
+            self.error(msg, out, err)
 
-            if chkerr:
-                err_msg = "%s. Failed to check error: %s" % (err_msg, chkerr)
+            msg2 = " Setting state to Failed"
+            self.debug(msg2)
+            self._state = ResourceState.FAILED
 
-            self.logger.error(err_msg)
-            self.state = ResourceState.FAILED
+            raise RuntimeError, msg
 
     def stop(self):
         state = self.state
         if state == ResourceState.STARTED:
+            self.info("Stopping command %s" % command)
+
             (out, err), proc = self.node.kill(self.pid, self.ppid)
 
             if out or err:
                 # check if execution errors occurred
-                err_msg = " Failed to STOP command '%s' on host %s. Check error: %s. Run error: %s" % (
-                     self.get("command"), self.node.get("hostname"), err, out)
-                self.logger.error(err_msg)
+                msg = " Failed to STOP command '%s' " % self.get("command")
+                self.error(msg, out, err)
                 self._state = ResourceState.FAILED
                 stopped = False
             else:
                 super(LinuxApplication, self).stop()
 
     def release(self):
+        self.info("Releasing resource")
+
         tear_down = self.get("tearDown")
         if tear_down:
             self.node.execute(tear_down)
@@ -361,13 +385,17 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            (out, err), proc = self.node.check_output(self.home, 'stderr')
+            (out, err), proc = self.node.check_output(self.app_home, 'stderr')
 
             if out or err:
+                if err.find("No such file or directory") >= 0 :
+                    # The resource is marked as started, but the
+                    # command was not yet executed
+                    return ResourceState.READY
+
                 # check if execution errors occurred
-                err_msg = " Failed to execute command '%s' on host %s. Check error: %s. Run error: %s" % (
-                        self.get("command"), self.node.get("hostname"), err, out)
-                self.logger.error(err_msg)
+                msg = " Failed to execute command '%s'" % self.get("command")
+                self.error(msg, out, err)
                 self._state = ResourceState.FAILED
 
             elif self.pid and self.ppid:
@@ -415,7 +443,7 @@ class LinuxApplication(ResourceManager):
         return ( command
             .replace("${SOURCES}", self.src_dir)
             .replace("${BUILD}", self.build_dir) 
-            .replace("${APPHOME}", self.home) 
+            .replace("${APPHOME}", self.app_home) 
             .replace("${NODEHOME}", self.node.home) )