Adding comments to Linux CCN examples
[nepi.git] / src / nepi / resources / linux / application.py
index e975cd2..ea4b0a4 100644 (file)
@@ -25,8 +25,10 @@ from nepi.util.sshfuncs import ProcStatus
 from nepi.util.timefuncs import strfnow, strfdiff
 
 import os
+import subprocess
 
 # TODO: Resolve wildcards in commands!!
+# TODO: compare_hash for all files that are uploaded!
 
 
 @clsinit
@@ -37,7 +39,7 @@ class LinuxApplication(ResourceManager):
     def _register_attributes(cls):
         command = Attribute("command", "Command to execute", 
                 flags = Flags.ExecReadOnly)
-        forward_x11 = Attribute("forwardX11", " Enables X11 forwarding for SSH connections", 
+        forward_x11 = Attribute("forwardX11", "Enables X11 forwarding for SSH connections", 
                 flags = Flags.ExecReadOnly)
         env = Attribute("env", "Environment variables string for command execution",
                 flags = Flags.ExecReadOnly)
@@ -111,6 +113,11 @@ class LinuxApplication(ResourceManager):
         self._pid = None
         self._ppid = None
         self._home = "app-%s" % self.guid
+        self._in_foreground = False
+
+        # keep a reference to the running process handler when 
+        # the command is not executed as remote daemon in background
+        self._proc = None
 
         # timestamp of last state check of the application
         self._last_state_check = strfnow()
@@ -145,6 +152,19 @@ class LinuxApplication(ResourceManager):
     def ppid(self):
         return self._ppid
 
+    @property
+    def in_foreground(self):
+        """ Returns True if the command needs to be executed in foreground.
+        This means that command will be executed using 'execute' instead of
+        'run' ('run' executes a command in background and detached from the 
+        terminal)
+        
+        When using X11 forwarding option, the command can not run in background
+        and detached from a terminal, since we need to keep the terminal attached 
+        to interact with it.
+        """
+        return self.get("forwardX11") or self._in_foreground
+
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
 
@@ -210,29 +230,32 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
-        # Upload command
+        # Upload command to remote bash script
+        # - only if command can be executed in background and detached
         command = self.get("command")
-        x11 = self.get("forwardX11")
-        env = self.get("env")
-        
-        if command and not x11:
+
+        if command and not self.in_foreground:
             self.info("Uploading command '%s'" % command)
 
             # replace application specific paths in the command
             command = self.replace_paths(command)
+            
+            # replace application specific paths in the environment
+            env = self.get("env")
             env = env and self.replace_paths(env)
 
             self.node.upload_command(command, self.app_home, 
                     shfile = "app.sh",
                     env = env)
        
+        self.info("Provisioning finished")
+
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
-        # TODO: check if sources need to be uploaded and upload them
         sources = self.get("sources")
         if sources:
-            self.info(" Uploading sources ")
+            self.info("Uploading sources ")
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
@@ -263,7 +286,9 @@ class LinuxApplication(ResourceManager):
                 # replace application specific paths in the command
                 command = self.replace_paths(command)
                 
-                # Upload the command to a file, and execute asynchronously
+                # Upload the command to a bash script and run it
+                # in background ( but wait until the command has
+                # finished to continue )
                 self.node.run_and_wait(command, self.app_home,
                         shfile = "http_sources.sh",
                         pidfile = "http_sources_pidfile", 
@@ -280,7 +305,7 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            self.info(" Uploading code ")
+            self.info("Uploading code ")
 
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
@@ -290,20 +315,26 @@ class LinuxApplication(ResourceManager):
         if stdin:
             # create dir for sources
             self.info(" Uploading stdin ")
-
+            
             dst = os.path.join(self.app_home, "stdin")
+
+            # If what we are uploading is a file, check whether
+            # the same file already exists (using md5sum)
+            if self.compare_hash(stdin, dst):
+                return
+
             self.node.upload(stdin, dst, text = True)
 
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
-            self.info(" Installing dependencies %s" % depends)
+            self.info("Installing dependencies %s" % depends)
             self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
         if build:
-            self.info(" Building sources ")
+            self.info("Building sources ")
             
             # create dir for build
             self.node.mkdir(self.build_dir)
@@ -311,7 +342,9 @@ class LinuxApplication(ResourceManager):
             # replace application specific paths in the command
             command = self.replace_paths(build)
 
-            # Upload the command to a file, and execute asynchronously
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
             self.node.run_and_wait(command, self.app_home,
                     shfile = "build.sh",
                     pidfile = "build_pidfile", 
@@ -322,12 +355,14 @@ class LinuxApplication(ResourceManager):
     def install(self):
         install = self.get("install")
         if install:
-            self.info(" Installing sources ")
+            self.info("Installing sources ")
 
             # replace application specific paths in the command
             command = self.replace_paths(install)
 
-            # Upload the command to a file, and execute asynchronously
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
             self.node.run_and_wait(command, self.app_home,
                     shfile = "install.sh",
                     pidfile = "install_pidfile", 
@@ -356,104 +391,129 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.get('command')
-        env = self.get('env')
-        stdin = 'stdin' if self.get('stdin') else None
-        stdout = 'stdout' if self.get('stdout') else 'stdout'
-        stderr = 'stderr' if self.get('stderr') else 'stderr'
-        sudo = self.get('sudo') or False
-        x11 = self.get('forwardX11') or False
-        failed = False
+        command = self.get("command")
+
+        self.info("Starting command '%s'" % command)
 
         if not command:
-            # If no command was given, then the application 
-            # is directly marked as FINISHED
+            # If no command was given (i.e. Application was used for dependency
+            # installation), then the application is directly marked as FINISHED
             self._state = ResourceState.FINISHED
         else:
-            super(LinuxApplication, self).start()
-    
-        self.info("Starting command '%s'" % command)
 
-        if x11:
-            # If X11 forwarding was specified, then the application
-            # can not run detached, so instead of invoking asynchronous
-            # 'run' we invoke synchronous 'execute'.
-            if not command:
-                msg = "No command is defined but X11 forwarding has been set"
-                self.error(msg)
-                self._state = ResourceState.FAILED
-                raise RuntimeError, msg
+            if self.in_foreground:
+                self._start_in_foreground()
+            else:
+                self._start_in_background()
 
-            if env:
-                # Export environment
-                environ = ""
-                for var in env.split(" "):
-                    environ += ' %s ' % var
+            super(LinuxApplication, self).start()
 
-                command = "{" + environ + " ; " + command + " ; }"
-                command = self.replace_paths(command)
+    def _start_in_foreground(self):
+        command = self.get("command")
+        stdin = "stdin" if self.get("stdin") else None
+        sudo = self.get("sudo") or False
+        x11 = self.get("forwardX11")
 
-            # If the command requires X11 forwarding, we
-            # can't run it asynchronously
-            (out, err), proc = self.node.execute(command,
-                    sudo = sudo,
-                    stdin = stdin,
-                    forward_x11 = x11)
+        # Command will be launched in foreground and attached to the
+        # terminal using the node 'execute' in non blocking mode.
 
-            self._state = ResourceState.FINISHED
+        # Export environment
+        env = self.get("env")
+        environ = self.node.format_environment(env, inline = True)
+        command = environ + command
+        command = self.replace_paths(command)
+
+        # We save the reference to the process in self._proc 
+        # to be able to kill the process from the stop method.
+        # We also set blocking = False, since we don't want the
+        # thread to block until the execution finishes.
+        (out, err), self._proc = self.node.execute(command,
+                sudo = sudo,
+                stdin = stdin,
+                forward_x11 = x11,
+                blocking = False)
+
+        if self._proc.poll():
+            self._state = ResourceState.FAILED
+            self.error(msg, out, err)
+            raise RuntimeError, msg
 
-            if proc.poll() and err:
-                failed = True
-        else:
-            # Command was  previously uploaded, now run the remote
-            # bash file asynchronously
-            cmd = "bash ./app.sh"
-            (out, err), proc = self.node.run(cmd, self.app_home, 
-                stdin = stdin, 
-                stdout = stdout,
-                stderr = stderr,
-                sudo = sudo)
-
-            # check if execution errors occurred
-            msg = " Failed to start command '%s' " % command
-            
-            if proc.poll() and err:
-                self.error(msg, out, err)
-                raise RuntimeError, msg
+    def _start_in_background(self):
+        command = self.get("command")
+        env = self.get("env")
+        stdin = "stdin" if self.get("stdin") else None
+        stdout = "stdout" if self.get("stdout") else "stdout"
+        stderr = "stderr" if self.get("stderr") else "stderr"
+        sudo = self.get("sudo") or False
+
+        # Command will be as a daemon in baground and detached from any terminal.
+        # The real command to run was previously uploaded to a bash script
+        # during deployment, now launch the remote script using 'run'
+        # method from the node
+        cmd = "bash ./app.sh"
+        (out, err), proc = self.node.run(cmd, self.app_home, 
+            stdin = stdin, 
+            stdout = stdout,
+            stderr = stderr,
+            sudo = sudo)
+
+        # check if execution errors occurred
+        msg = " Failed to start command '%s' " % command
         
-            # Check status of process running in background
-            pid, ppid = self.node.wait_pid(self.app_home)
-            if pid: self._pid = int(pid)
-            if ppid: self._ppid = int(ppid)
-
-            # If the process is not running, check for error information
-            # on the remote machine
-            if not self.pid or not self.ppid:
-                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
-                self.error(msg, out, err)
-
-                msg2 = " Setting state to Failed"
-                self.debug(msg2)
+        if proc.poll():
+            self._state = ResourceState.FAILED
+            self.error(msg, out, err)
+            raise RuntimeError, msg
+    
+        # Wait for pid file to be generated
+        pid, ppid = self.node.wait_pid(self.app_home)
+        if pid: self._pid = int(pid)
+        if ppid: self._ppid = int(ppid)
+
+        # If the process is not running, check for error information
+        # on the remote machine
+        if not self.pid or not self.ppid:
+            (out, err), proc = self.node.check_errors(self.app_home,
+                    stderr = stderr) 
+
+            # Out is what was written in the stderr file
+            if err:
                 self._state = ResourceState.FAILED
-
+                msg = " Failed to start command '%s' " % command
+                self.error(msg, out, err)
                 raise RuntimeError, msg
-
+        
     def stop(self):
+        """ Stops application execution
+        """
         command = self.get('command') or ''
-        state = self.state
-        
-        if state == ResourceState.STARTED:
-            self.info("Stopping command '%s'" % command)
 
-            (out, err), proc = self.node.kill(self.pid, self.ppid)
+        if self.state == ResourceState.STARTED:
+            stopped = True
 
-            if out or err:
-                # check if execution errors occurred
-                msg = " Failed to STOP command '%s' " % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
-                stopped = False
+            self.info("Stopping command '%s'" % command)
+        
+            # If the command is running in foreground (it was launched using
+            # the node 'execute' method), then we use the handler to the Popen
+            # process to kill it. Else we send a kill signal using the pid and ppid
+            # retrieved after running the command with the node 'run' method
+
+            if self._proc:
+                self._proc.kill()
             else:
+                # Only try to kill the process if the pid and ppid
+                # were retrieved
+                if self.pid and self.ppid:
+                    (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+                    if out or err:
+                        # check if execution errors occurred
+                        msg = " Failed to STOP command '%s' " % self.get("command")
+                        self.error(msg, out, err)
+                        self._state = ResourceState.FAILED
+                        stopped = False
+
+            if stopped:
                 super(LinuxApplication, self).stop()
 
     def release(self):
@@ -464,38 +524,56 @@ class LinuxApplication(ResourceManager):
             self.node.execute(tear_down)
 
         self.stop()
+
         if self.state == ResourceState.STOPPED:
             super(LinuxApplication, self).release()
     
     @property
     def state(self):
+        """ Returns the state of the application
+        """
         if self._state == ResourceState.STARTED:
-            # To avoid overwhelming the remote hosts and the local processor
-            # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' seconds.
-            state_check_delay = 0.5
-            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
-                # check if execution errors occurred
-                (out, err), proc = self.node.check_errors(self.app_home)
-
-                if out or err:
-                    if err.find("No such file or directory") >= 0 :
-                        # The resource is marked as started, but the
-                        # command was not yet executed
-                        return ResourceState.READY
-
+            if self.in_foreground:
+                # Check if the process we used to execute the command
+                # is still running ...
+                retcode = self._proc.poll()
+
+                # retcode == None -> running
+                # retcode > 0 -> error
+                # retcode == 0 -> finished
+                if retcode:
+                    out = ""
                     msg = " Failed to execute command '%s'" % self.get("command")
+                    err = self._proc.stderr.read()
                     self.error(msg, out, err)
                     self._state = ResourceState.FAILED
+                elif retcode == 0:
+                    self._state = ResourceState.FINISHED
 
-                elif self.pid and self.ppid:
-                    status = self.node.status(self.pid, self.ppid)
-
-                    if status == ProcStatus.FINISHED:
-                        self._state = ResourceState.FINISHED
-
-
-                self._last_state_check = strfnow()
+            else:
+                # We need to query the status of the command we launched in 
+                # background. In oredr to avoid overwhelming the remote host and
+                # the local processor with too many ssh queries, the state is only
+                # requested every 'state_check_delay' seconds.
+                state_check_delay = 0.5
+                if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                    # check if execution errors occurred
+                    (out, err), proc = self.node.check_errors(self.app_home)
+
+                    if err:
+                        msg = " Failed to execute command '%s'" % self.get("command")
+                        self.error(msg, out, err)
+                        self._state = ResourceState.FAILED
+
+                    elif self.pid and self.ppid:
+                        # No execution errors occurred. Make sure the background
+                        # process with the recorded pid is still running.
+                        status = self.node.status(self.pid, self.ppid)
+
+                        if status == ProcStatus.FINISHED:
+                            self._state = ResourceState.FINISHED
+
+                    self._last_state_check = strfnow()
 
         return self._state
 
@@ -513,7 +591,39 @@ class LinuxApplication(ResourceManager):
             .replace("${NODE_HOME}", absolute_dir(self.node.node_home))
             .replace("${EXP_HOME}", absolute_dir(self.node.exp_home) )
             )
-        
+
+    def compare_hash(self, local, remote):
+        # getting md5sum from remote file
+        (out, err), proc = self.node.execute("md5sum %s " % remote)
+
+        if proc.poll() == 0: #OK
+            if not os.path.isfile(local):
+                # store to a tmp file
+                f = tempfile.NamedTemporaryFile()
+                f.write(local)
+                f.flush()
+                local = f.name
+
+            lproc = subprocess.Popen(["md5sum", local],
+                stdout = subprocess.PIPE,
+                stderr = subprocess.PIPE) 
+
+            # getting md5sum from local file
+            (lout, lerr) = lproc.communicate()
+
+            # files are the same, no need to upload
+            lchk = lout.strip().split(" ")[0]
+            rchk = out.strip().split(" ")[0]
+
+            msg = " Comparing files: LOCAL %s md5sum %s - REMOTE %s md5sum %s" % (
+                    local, lchk, remote, rchk)
+            self.debug(msg)
+
+            if lchk == rchk:
+                return True
+
+        return False
+
     def valid_connection(self, guid):
         # TODO: Validate!
         return True