Improved LinuxApplication behavior
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 15 Jun 2013 04:55:47 +0000 (21:55 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 15 Jun 2013 04:55:47 +0000 (21:55 -0700)
src/nepi/execution/ec.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccnd.py
src/nepi/resources/linux/debfuncs.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/rpmfuncs.py
src/nepi/util/sshfuncs.py
test/execution/resource.py
test/resources/linux/application.py
test/resources/linux/test_utils.py

index c5dc10f..262aecf 100644 (file)
@@ -492,8 +492,7 @@ class ExperimentController(object):
         def wait_all_and_start(group):
             reschedule = False
             for guid in group:
-                rm = self.get_resource(guid)
-                if rm.state < ResourceState.READY:
+                if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
 
index e975cd2..57c5304 100644 (file)
@@ -112,6 +112,10 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
+        # keep a reference to the running process handler when 
+        # the command is not executed as remote daemon in background
+        self._proc = None
+
         # timestamp of last state check of the application
         self._last_state_check = strfnow()
     
@@ -145,6 +149,18 @@ class LinuxApplication(ResourceManager):
     def ppid(self):
         return self._ppid
 
+    @property
+    def in_foreground(self):
+        """ Returns True is the command needs to be executed in foreground.
+        This means that command will be executed using 'execute' instead of
+        'run'.
+
+        When using X11 forwarding option, the command can not run in background
+        and detached from a terminal in the remote host, since we need to keep 
+        the SSH connection to receive graphical data
+        """
+        return self.get("forwardX11") or False
+
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
 
@@ -210,29 +226,32 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
-        # Upload command
+        # Upload command to remote bash script
+        # - only if command can be executed in background and detached
         command = self.get("command")
-        x11 = self.get("forwardX11")
-        env = self.get("env")
-        
-        if command and not x11:
+
+        if command and not self.in_foreground:
             self.info("Uploading command '%s'" % command)
 
             # replace application specific paths in the command
             command = self.replace_paths(command)
+            
+            # replace application specific paths in the environment
+            env = self.get("env")
             env = env and self.replace_paths(env)
 
             self.node.upload_command(command, self.app_home, 
                     shfile = "app.sh",
                     env = env)
        
+        self.info("Provisioning finished")
+
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
-        # TODO: check if sources need to be uploaded and upload them
         sources = self.get("sources")
         if sources:
-            self.info(" Uploading sources ")
+            self.info("Uploading sources ")
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
@@ -263,7 +282,9 @@ class LinuxApplication(ResourceManager):
                 # replace application specific paths in the command
                 command = self.replace_paths(command)
                 
-                # Upload the command to a file, and execute asynchronously
+                # Upload the command to a bash script and run it
+                # in background ( but wait until the command has
+                # finished to continue )
                 self.node.run_and_wait(command, self.app_home,
                         shfile = "http_sources.sh",
                         pidfile = "http_sources_pidfile", 
@@ -280,7 +301,7 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            self.info(" Uploading code ")
+            self.info("Uploading code ")
 
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
@@ -297,13 +318,13 @@ class LinuxApplication(ResourceManager):
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
-            self.info(" Installing dependencies %s" % depends)
+            self.info("Installing dependencies %s" % depends)
             self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
         if build:
-            self.info(" Building sources ")
+            self.info("Building sources ")
             
             # create dir for build
             self.node.mkdir(self.build_dir)
@@ -311,7 +332,9 @@ class LinuxApplication(ResourceManager):
             # replace application specific paths in the command
             command = self.replace_paths(build)
 
-            # Upload the command to a file, and execute asynchronously
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
             self.node.run_and_wait(command, self.app_home,
                     shfile = "build.sh",
                     pidfile = "build_pidfile", 
@@ -322,12 +345,14 @@ class LinuxApplication(ResourceManager):
     def install(self):
         install = self.get("install")
         if install:
-            self.info(" Installing sources ")
+            self.info("Installing sources ")
 
             # replace application specific paths in the command
             command = self.replace_paths(install)
 
-            # Upload the command to a file, and execute asynchronously
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
             self.node.run_and_wait(command, self.app_home,
                     shfile = "install.sh",
                     pidfile = "install_pidfile", 
@@ -356,57 +381,59 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.get('command')
-        env = self.get('env')
-        stdin = 'stdin' if self.get('stdin') else None
-        stdout = 'stdout' if self.get('stdout') else 'stdout'
-        stderr = 'stderr' if self.get('stderr') else 'stderr'
-        sudo = self.get('sudo') or False
-        x11 = self.get('forwardX11') or False
+        command = self.get("command")
+        env = self.get("env")
+        stdin = "stdin" if self.get("stdin") else None
+        stdout = "stdout" if self.get("stdout") else "stdout"
+        stderr = "stderr" if self.get("stderr") else "stderr"
+        sudo = self.get("sudo") or False
         failed = False
 
-        if not command:
-            # If no command was given, then the application 
-            # is directly marked as FINISHED
-            self._state = ResourceState.FINISHED
-        else:
-            super(LinuxApplication, self).start()
-    
         self.info("Starting command '%s'" % command)
 
-        if x11:
-            # If X11 forwarding was specified, then the application
-            # can not run detached, so instead of invoking asynchronous
-            # 'run' we invoke synchronous 'execute'.
+        if self.in_foreground:
+            # If command should be ran in foreground, we invoke
+            # the node 'execute' method
             if not command:
                 msg = "No command is defined but X11 forwarding has been set"
                 self.error(msg)
                 self._state = ResourceState.FAILED
                 raise RuntimeError, msg
 
-            if env:
-                # Export environment
-                environ = ""
-                for var in env.split(" "):
-                    environ += ' %s ' % var
+            # Export environment
+            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
+                if env else ""
 
-                command = "{" + environ + " ; " + command + " ; }"
-                command = self.replace_paths(command)
+            command = environ + command
+            command = self.replace_paths(command)
+            
+            x11 = self.get("forwardX11")
 
-            # If the command requires X11 forwarding, we
-            # can't run it asynchronously
-            (out, err), proc = self.node.execute(command,
+            # We save the reference to the process in self._proc 
+            # to be able to kill the process from the stop method
+            (out, err), self._proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
-                    forward_x11 = x11)
+                    forward_x11 = x11,
+                    blocking = False)
 
-            self._state = ResourceState.FINISHED
+            if self._proc.poll():
+                out = ""
+                err = self._proc.stderr.read()
+                self._state = ResourceState.FAILED
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+            
+            super(LinuxApplication, self).start()
 
-            if proc.poll() and err:
-                failed = True
-        else:
-            # Command was  previously uploaded, now run the remote
-            # bash file asynchronously
+        elif command:
+            # If command is set (i.e. application not used only for dependency
+            # installation), and it does not need to run in foreground, we use 
+            # the 'run' method of the node to launch the application as a daemon 
+
+            # The real command to execute was previously uploaded to a remote bash
+            # script during deployment, now run the remote script using 'run' method 
+            # from the node
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
@@ -437,23 +464,43 @@ class LinuxApplication(ResourceManager):
                 self._state = ResourceState.FAILED
 
                 raise RuntimeError, msg
+            
+            super(LinuxApplication, self).start()
 
+        else:
+            # If no command was given (i.e. Application was used for dependency
+            # installation), then the application is directly marked as FINISHED
+            self._state = ResourceState.FINISHED
     def stop(self):
+        """ Stops application execution
+        """
         command = self.get('command') or ''
         state = self.state
-        
+
         if state == ResourceState.STARTED:
-            self.info("Stopping command '%s'" % command)
+            stopped = True
 
-            (out, err), proc = self.node.kill(self.pid, self.ppid)
+            self.info("Stopping command '%s'" % command)
+        
+            # If the command is running in foreground (it was launched using
+            # the node 'execute' method), then we use the handler to the Popen
+            # process to kill it. Else we send a kill signal using the pid and ppid
+            # retrieved after running the command with the node 'run' method
 
-            if out or err:
-                # check if execution errors occurred
-                msg = " Failed to STOP command '%s' " % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
-                stopped = False
+            if self._proc:
+                self._proc.kill()
             else:
+                (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+                if out or err:
+                    # check if execution errors occurred
+                    msg = " Failed to STOP command '%s' " % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+                    stopped = False
+
+            if stopped:
                 super(LinuxApplication, self).stop()
 
     def release(self):
@@ -470,32 +517,46 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            # To avoid overwhelming the remote hosts and the local processor
-            # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' seconds.
-            state_check_delay = 0.5
-            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
-                # check if execution errors occurred
-                (out, err), proc = self.node.check_errors(self.app_home)
-
-                if out or err:
-                    if err.find("No such file or directory") >= 0 :
-                        # The resource is marked as started, but the
-                        # command was not yet executed
-                        return ResourceState.READY
-
-                    msg = " Failed to execute command '%s'" % self.get("command")
-                    self.error(msg, out, err)
+            if self.in_foreground:
+                retcode = self._proc.poll()
+                
+                # retcode == None -> running
+                # retcode > 0 -> error
+                # retcode == 0 -> finished
+                if retcode:
+                    out = ""
+                    err = self._proc.stderr.read()
                     self._state = ResourceState.FAILED
+                    self.error(msg, out, err)
+                elif retcode == 0:
+                    self._state = ResourceState.FINISHED
 
-                elif self.pid and self.ppid:
-                    status = self.node.status(self.pid, self.ppid)
-
-                    if status == ProcStatus.FINISHED:
-                        self._state = ResourceState.FINISHED
-
-
-                self._last_state_check = strfnow()
+            else:
+                # To avoid overwhelming the remote hosts and the local processor
+                # with too many ssh queries, the state is only requested
+                # every 'state_check_delay' seconds.
+                state_check_delay = 0.5
+                if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                    # check if execution errors occurred
+                    (out, err), proc = self.node.check_errors(self.app_home)
+
+                    if out or err:
+                        if err.find("No such file or directory") >= 0 :
+                            # The resource is marked as started, but the
+                            # command was not yet executed
+                            return ResourceState.READY
+
+                        msg = " Failed to execute command '%s'" % self.get("command")
+                        self.error(msg, out, err)
+                        self._state = ResourceState.FAILED
+
+                    elif self.pid and self.ppid:
+                        status = self.node.status(self.pid, self.ppid)
+
+                        if status == ProcStatus.FINISHED:
+                            self._state = ResourceState.FINISHED
+
+                    self._last_state_check = strfnow()
 
         return self._state
 
index 06a735e..655bc62 100644 (file)
@@ -184,6 +184,92 @@ class LinuxCCND(LinuxApplication):
 
         super(LinuxCCND, self).deploy()
 
+    def start(self):
+        command = self.get("command")
+        env = self.get("env")
+        stdin = "stdin" if self.get("stdin") else None
+        stdout = "stdout" if self.get("stdout") else "stdout"
+        stderr = "stderr" if self.get("stderr") else "stderr"
+        sudo = self.get("sudo") or False
+        x11 = self.get("forwardX11") or False
+        failed = False
+
+        if not command:
+            # If no command was given, then the application 
+            # is directly marked as FINISHED
+            self._state = ResourceState.FINISHED
+    
+        self.info("Starting command '%s'" % command)
+
+        if x11:
+            # If X11 forwarding was specified, then the application
+            # can not run detached, so instead of invoking asynchronous
+            # 'run' we invoke synchronous 'execute'.
+            if not command:
+                msg = "No command is defined but X11 forwarding has been set"
+                self.error(msg)
+                self._state = ResourceState.FAILED
+                raise RuntimeError, msg
+
+            # Export environment
+            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
+                if env else ""
+
+            command = environ + command
+            command = self.replace_paths(command)
+
+            # Mark application as started before executing the command
+            # since after the thread will be blocked by the execution
+            # until it finished
+            super(LinuxApplication, self).start()
+            
+            # If the command requires X11 forwarding, we
+            # can't run it asynchronously
+            (out, err), proc = self.node.execute(command,
+                    sudo = sudo,
+                    stdin = stdin,
+                    forward_x11 = x11)
+
+            self._state = ResourceState.FINISHED
+
+            if proc.poll() and err:
+                failed = True
+        else:
+            # Command was  previously uploaded, now run the remote
+            # bash file asynchronously
+            cmd = "bash ./app.sh"
+            (out, err), proc = self.node.run(cmd, self.app_home, 
+                stdin = stdin, 
+                stdout = stdout,
+                stderr = stderr,
+                sudo = sudo)
+
+            # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            
+            if proc.poll() and err:
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+        
+            # Check status of process running in background
+            pid, ppid = self.node.wait_pid(self.app_home)
+            if pid: self._pid = int(pid)
+            if ppid: self._ppid = int(ppid)
+
+            # If the process is not running, check for error information
+            # on the remote machine
+            if not self.pid or not self.ppid:
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+                self.error(msg, out, err)
+
+                msg2 = " Setting state to Failed"
+                self.debug(msg2)
+                self._state = ResourceState.FAILED
+
+                raise RuntimeError, msg
+            
+            super(LinuxApplication, self).start()
+
     def stop(self):
         command = self.get('command') or ''
         state = self.state
@@ -191,17 +277,56 @@ class LinuxCCND(LinuxApplication):
         if state == ResourceState.STARTED:
             self.info("Stopping command '%s'" % command)
 
-            (out, err), proc = self.node.kill(self.pid, self.ppid)
+            command = "ccndstop"
+            env = self.get("env") 
+
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+            env = env and self.replace_paths(env)
+
+            # Upload the command to a file, and execute asynchronously
+            self.node.run_and_wait(command, self.app_home,
+                        shfile = "ccndstop.sh",
+                        env = env,
+                        pidfile = "ccndstop_pidfile", 
+                        ecodefile = "ccndstop_exitcode", 
+                        stdout = "ccndstop_stdout", 
+                        stderr = "ccndstop_stderr")
+
 
-            if out or err:
+            super(LinuxCCND, self).stop()
+
+    @property
+    def state(self):
+        if self._state == ResourceState.STARTED:
+            # To avoid overwhelming the remote hosts and the local processor
+            # with too many ssh queries, the state is only requested
+            # every 'state_check_delay' seconds.
+            state_check_delay = 0.5
+            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                 # check if execution errors occurred
-                msg = " Failed to STOP command '%s' " % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
-                stopped = False
-            else:
-                super(LinuxApplication, self).stop()
+                (out, err), proc = self.node.check_errors(self.app_home)
+
+                if out or err:
+                    if err.find("No such file or directory") >= 0 :
+                        # The resource is marked as started, but the
+                        # command was not yet executed
+                        return ResourceState.READY
+
+                    msg = " Failed to execute command '%s'" % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+
+                elif self.pid and self.ppid:
+                    status = self.node.status(self.pid, self.ppid)
+
+                    if status == ProcStatus.FINISHED:
+                        self._state = ResourceState.FINISHED
+
+
+                self._last_state_check = strfnow()
 
+        return self._state
 
     @property
     def _default_command(self):
index 1abe208..02d1657 100644 (file)
@@ -23,23 +23,21 @@ def install_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = ""
-    for p in packages:
-        cmd += " ( dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ) ; " % {
-                'package': p}
-   
-    #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...)
+    cmd = " && ".join(map(lambda p: 
+            " { dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ; } " % {
+                    'package': p}, packages))
+        
+    #cmd = { dpkg -s vim || sudo -S apt-get -y install vim ; } && ..
     return cmd 
 
 def remove_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = ""
-    for p in packages:
-        cmd += " ( dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ) ; " % {
-                'package': p}
-    
-    #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...)
+    cmd = " && ".join(map(lambda p: 
+            " { dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ; } " % {
+                    'package': p}, packages))
+        
+    #cmd = { dpkg -s vim && sudo -S apt-get -y purge vim ; } && ..
     return cmd 
 
index de194e5..b7e8668 100644 (file)
@@ -339,6 +339,7 @@ class LinuxNode(ResourceManager):
         
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
+            env = None,
             pidfile = "pidfile", 
             ecodefile = "exitcode", 
             stdin = None, 
@@ -354,7 +355,10 @@ class LinuxNode(ResourceManager):
         since in the remote host the command can continue to run detached
         even if network disconnections occur
         """
-        self.upload_command(command, home, shfile, ecodefile)
+        self.upload_command(command, home, 
+            shfile = shfile, 
+            ecodefile = ecodefile, 
+            env = env)
 
         command = "bash ./%s" % shfile
         # run command in background in remote host
@@ -420,18 +424,26 @@ class LinuxNode(ResourceManager):
             shfile = "cmd.sh",
             ecodefile = "exitcode",
             env = None):
+        """ Saves the command as a bash script file in the remote host, and
+        forces to save the exit code of the command execution to the ecodefile
+        """
+      
+        # Prepare command to be executed as a bash script file
+        # Make sure command ends in ';' so the curly brackets syntax is correct
+        if not command.strip()[-1] == ';':
+            command += " ; "
 
-        command = " ( %(command)s ) ; echo $? > %(ecodefile)s " % {
+        # The exit code of the command will be stored in ecodefile
+        command = " { { %(command)s } ; echo $? > %(ecodefile)s ; } " % {
                 'command': command,
                 'ecodefile': ecodefile,
                 } 
 
         # Export environment
-        environ = ""
-        if env:
-            for var in env.split(" "):
-                environ += 'export %s\n' % var
+        environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) \
+            if env else ""
 
+        # Add environ to command
         command = environ + command
 
         dst = os.path.join(home, shfile)
@@ -569,6 +581,7 @@ class LinuxNode(ResourceManager):
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
+            blocking = True,
             with_lock = False
             ):
         """ Notice that this invocation will block until the
@@ -602,6 +615,7 @@ class LinuxNode(ResourceManager):
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
+                        blocking = blocking, 
                         strict_host_checking = strict_host_checking
                         )
             else:
@@ -622,7 +636,9 @@ class LinuxNode(ResourceManager):
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
-                    persistent = persistent
+                    persistent = persistent,
+                    blocking = blocking, 
+                    strict_host_checking = strict_host_checking
                     )
 
         return (out, err), proc
index e42e7b8..5fa0592 100644 (file)
@@ -26,30 +26,29 @@ def install_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = "( %s )" % install_rpmfusion_command(os)
-    for p in packages:
-        cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
-                    'package': p}
+    cmd = install_rpmfusion_command(os) + " && "
+    cmd += " && ".join(map(lambda p: 
+            " { rpm -q %(package)s || sudo -S yum -y install %(package)s ; } " % {
+                    'package': p}, packages))
     
-    #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
-    return " ( %s )" % cmd 
+    #cmd = { rpm -q rpmfusion-free-release || sudo -s rpm -i ... ; } && { rpm -q vim || sudo yum -y install vim ; } && ..
+    return cmd 
 
 def remove_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = ""
-    for p in packages:
-        cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
-                    'package': p}
-
-    #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
+    cmd = " && ".join(map(lambda p: 
+            " { rpm -q %(package)s && sudo -S yum -y remove %(package)s ; } " % {
+                    'package': p}, packages))
+        
+    #cmd = { rpm -q vim && sudo yum -y remove vim ; } && ..
     return cmd 
 
 def install_rpmfusion_command(os):
     from nepi.resources.linux.node import OSType
 
-    cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s"
+    cmd = " { rpm -q rpmfusion-free-release ||  sudo -S rpm -i %(package)s ; } "
 
     if os in [OSType.FEDORA, OSType.FEDORA_12]:
         cmd =  cmd %  {'package': RPM_FUSION_URL_F12}
index 50f5d7c..7fe579b 100644 (file)
@@ -16,6 +16,7 @@
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Claudio Freire <claudio-daniel.freire@inria.fr>
 
 import base64
 import errno
@@ -218,6 +219,7 @@ def rexec(command, host, user,
         connect_timeout = 30,
         persistent = True,
         forward_x11 = False,
+        blocking = True,
         strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
@@ -280,6 +282,12 @@ def rexec(command, host, user,
         # alive until the process is finished with it
         proc._known_hosts = tmp_known_hosts
     
+        # by default, rexec calls _communicate which will block 
+        # until the process has exit. The argument block == False 
+        # forces to rexec to return immediately, without blocking 
+        if not blocking:
+           return (("", ""), proc)
+
         try:
             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
             msg = " rexec - host %s - command %s " % (host, " ".join(args))
@@ -648,7 +656,7 @@ def rspawn(command, pidfile,
     else:
         stderr = ' ' + stderr
     
-    daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+    daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
         'command' : command,
         'pidfile' : shell_escape(pidfile),
         'stdout' : stdout,
index bb1de3e..7122b92 100755 (executable)
@@ -219,7 +219,7 @@ class ResourceManagerTestCase(unittest.TestCase):
         node = ec.register_resource("Node")
 
         apps = list()
-        for i in xrange(5000):
+        for i in xrange(1000):
             app = ec.register_resource("Application")
             ec.register_connection(app, node)
             apps.append(app)
index 0c69f71..b1e489c 100755 (executable)
@@ -25,7 +25,7 @@ from nepi.execution.trace import TraceAttr
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.linux.application import LinuxApplication
 
-from test_utils import skipIfNotAlive
+from test_utils import skipIfNotAlive, skipInteractive
 
 import os
 import time
@@ -34,13 +34,13 @@ import unittest
 
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
-        self.fedora_host = "nepi5.pl.sophia.inria.fr"
+        self.fedora_host = "nepi2.pl.sophia.inria.fr"
         self.fedora_user = "inria_nepi"
 
         self.ubuntu_host = "roseval.pl.sophia.inria.fr"
         self.ubuntu_user = "alina"
         
-        self.target = "nepi3.pl.sophia.inria.fr"
+        self.target = "nepi5.pl.sophia.inria.fr"
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
@@ -246,6 +246,36 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
         ec.shutdown()
 
+    @skipIfNotAlive
+    def t_xterm(self, host, user):
+        from nepi.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+
+        app = ec.register_resource("LinuxApplication")
+        ec.set(app, "command", "xterm")
+        ec.set(app, "depends", "xterm")
+        ec.set(app, "forwardX11", True)
+
+        ec.register_connection(app, node)
+
+        ec.deploy()
+
+        ec.wait_finished([app])
+
+        self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+        ec.shutdown()
+
     def test_stdout_fedora(self):
         self.t_stdout(self.fedora_host, self.fedora_user)
 
@@ -258,10 +288,10 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_ping_ubuntu(self):
         self.t_ping(self.ubuntu_host, self.ubuntu_user)
 
-    def ztest_concurrency_fedora(self):
+    def test_concurrency_fedora(self):
         self.t_concurrency(self.fedora_host, self.fedora_user)
 
-    def ztest_concurrency_ubuntu(self):
+    def test_concurrency_ubuntu(self):
         self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
 
     def test_condition_fedora(self):
@@ -276,6 +306,12 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_http_sources_ubuntu(self):
         self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
 
+    @skipInteractive
+    def test_xterm_ubuntu(self):
+        """ Interactive test. Should not run automatically """
+        self.t_xterm(self.ubuntu_host, self.ubuntu_user)
+
+
 
 if __name__ == '__main__':
     unittest.main()
index 907cb0a..414b3f3 100644 (file)
@@ -55,7 +55,7 @@ def skipIfNotAlive(func):
 def skipInteractive(func):
     name = func.__name__
     def wrapped(*args, **kwargs):
-        mode = os.environ.get("NEPI_INTERACTIVE", False)
+        mode = os.environ.get("NEPI_INTERACTIVE_TEST", False)
         mode = mode and  mode.lower() in ['true', 'yes']
         if not mode:
             print "*** WARNING: Skipping test %s: Interactive mode off \n" % name