Improved LinuxApplication behavior
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 15 Jun 2013 04:55:47 +0000 (21:55 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Sat, 15 Jun 2013 04:55:47 +0000 (21:55 -0700)
src/nepi/execution/ec.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccnd.py
src/nepi/resources/linux/debfuncs.py
src/nepi/resources/linux/node.py
src/nepi/resources/linux/rpmfuncs.py
src/nepi/util/sshfuncs.py
test/execution/resource.py
test/resources/linux/application.py
test/resources/linux/test_utils.py

index c5dc10f..262aecf 100644 (file)
@@ -492,8 +492,7 @@ class ExperimentController(object):
         def wait_all_and_start(group):
             reschedule = False
             for guid in group:
         def wait_all_and_start(group):
             reschedule = False
             for guid in group:
-                rm = self.get_resource(guid)
-                if rm.state < ResourceState.READY:
+                if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
 
                     reschedule = True
                     break
 
index e975cd2..57c5304 100644 (file)
@@ -112,6 +112,10 @@ class LinuxApplication(ResourceManager):
         self._ppid = None
         self._home = "app-%s" % self.guid
 
         self._ppid = None
         self._home = "app-%s" % self.guid
 
+        # keep a reference to the running process handler when 
+        # the command is not executed as remote daemon in background
+        self._proc = None
+
         # timestamp of last state check of the application
         self._last_state_check = strfnow()
     
         # timestamp of last state check of the application
         self._last_state_check = strfnow()
     
@@ -145,6 +149,18 @@ class LinuxApplication(ResourceManager):
     def ppid(self):
         return self._ppid
 
     def ppid(self):
         return self._ppid
 
+    @property
+    def in_foreground(self):
+        """ Returns True is the command needs to be executed in foreground.
+        This means that command will be executed using 'execute' instead of
+        'run'.
+
+        When using X11 forwarding option, the command can not run in background
+        and detached from a terminal in the remote host, since we need to keep 
+        the SSH connection to receive graphical data
+        """
+        return self.get("forwardX11") or False
+
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
 
     def trace(self, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         self.info("Retrieving '%s' trace %s " % (name, attr))
 
@@ -210,29 +226,32 @@ class LinuxApplication(ResourceManager):
         # Install
         self.install()
 
         # Install
         self.install()
 
-        # Upload command
+        # Upload command to remote bash script
+        # - only if command can be executed in background and detached
         command = self.get("command")
         command = self.get("command")
-        x11 = self.get("forwardX11")
-        env = self.get("env")
-        
-        if command and not x11:
+
+        if command and not self.in_foreground:
             self.info("Uploading command '%s'" % command)
 
             # replace application specific paths in the command
             command = self.replace_paths(command)
             self.info("Uploading command '%s'" % command)
 
             # replace application specific paths in the command
             command = self.replace_paths(command)
+            
+            # replace application specific paths in the environment
+            env = self.get("env")
             env = env and self.replace_paths(env)
 
             self.node.upload_command(command, self.app_home, 
                     shfile = "app.sh",
                     env = env)
        
             env = env and self.replace_paths(env)
 
             self.node.upload_command(command, self.app_home, 
                     shfile = "app.sh",
                     env = env)
        
+        self.info("Provisioning finished")
+
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
         super(LinuxApplication, self).provision()
 
     def upload_sources(self):
-        # TODO: check if sources need to be uploaded and upload them
         sources = self.get("sources")
         if sources:
         sources = self.get("sources")
         if sources:
-            self.info(" Uploading sources ")
+            self.info("Uploading sources ")
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
@@ -263,7 +282,9 @@ class LinuxApplication(ResourceManager):
                 # replace application specific paths in the command
                 command = self.replace_paths(command)
                 
                 # replace application specific paths in the command
                 command = self.replace_paths(command)
                 
-                # Upload the command to a file, and execute asynchronously
+                # Upload the command to a bash script and run it
+                # in background ( but wait until the command has
+                # finished to continue )
                 self.node.run_and_wait(command, self.app_home,
                         shfile = "http_sources.sh",
                         pidfile = "http_sources_pidfile", 
                 self.node.run_and_wait(command, self.app_home,
                         shfile = "http_sources.sh",
                         pidfile = "http_sources_pidfile", 
@@ -280,7 +301,7 @@ class LinuxApplication(ResourceManager):
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
             # create dir for sources
             self.node.mkdir(self.src_dir)
 
-            self.info(" Uploading code ")
+            self.info("Uploading code ")
 
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
 
             dst = os.path.join(self.src_dir, "code")
             self.node.upload(sources, dst, text = True)
@@ -297,13 +318,13 @@ class LinuxApplication(ResourceManager):
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
     def install_dependencies(self):
         depends = self.get("depends")
         if depends:
-            self.info(" Installing dependencies %s" % depends)
+            self.info("Installing dependencies %s" % depends)
             self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
         if build:
             self.node.install_packages(depends, self.app_home)
 
     def build(self):
         build = self.get("build")
         if build:
-            self.info(" Building sources ")
+            self.info("Building sources ")
             
             # create dir for build
             self.node.mkdir(self.build_dir)
             
             # create dir for build
             self.node.mkdir(self.build_dir)
@@ -311,7 +332,9 @@ class LinuxApplication(ResourceManager):
             # replace application specific paths in the command
             command = self.replace_paths(build)
 
             # replace application specific paths in the command
             command = self.replace_paths(build)
 
-            # Upload the command to a file, and execute asynchronously
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
             self.node.run_and_wait(command, self.app_home,
                     shfile = "build.sh",
                     pidfile = "build_pidfile", 
             self.node.run_and_wait(command, self.app_home,
                     shfile = "build.sh",
                     pidfile = "build_pidfile", 
@@ -322,12 +345,14 @@ class LinuxApplication(ResourceManager):
     def install(self):
         install = self.get("install")
         if install:
     def install(self):
         install = self.get("install")
         if install:
-            self.info(" Installing sources ")
+            self.info("Installing sources ")
 
             # replace application specific paths in the command
             command = self.replace_paths(install)
 
 
             # replace application specific paths in the command
             command = self.replace_paths(install)
 
-            # Upload the command to a file, and execute asynchronously
+            # Upload the command to a bash script and run it
+            # in background ( but wait until the command has
+            # finished to continue )
             self.node.run_and_wait(command, self.app_home,
                     shfile = "install.sh",
                     pidfile = "install_pidfile", 
             self.node.run_and_wait(command, self.app_home,
                     shfile = "install.sh",
                     pidfile = "install_pidfile", 
@@ -356,57 +381,59 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).deploy()
 
     def start(self):
             super(LinuxApplication, self).deploy()
 
     def start(self):
-        command = self.get('command')
-        env = self.get('env')
-        stdin = 'stdin' if self.get('stdin') else None
-        stdout = 'stdout' if self.get('stdout') else 'stdout'
-        stderr = 'stderr' if self.get('stderr') else 'stderr'
-        sudo = self.get('sudo') or False
-        x11 = self.get('forwardX11') or False
+        command = self.get("command")
+        env = self.get("env")
+        stdin = "stdin" if self.get("stdin") else None
+        stdout = "stdout" if self.get("stdout") else "stdout"
+        stderr = "stderr" if self.get("stderr") else "stderr"
+        sudo = self.get("sudo") or False
         failed = False
 
         failed = False
 
-        if not command:
-            # If no command was given, then the application 
-            # is directly marked as FINISHED
-            self._state = ResourceState.FINISHED
-        else:
-            super(LinuxApplication, self).start()
-    
         self.info("Starting command '%s'" % command)
 
         self.info("Starting command '%s'" % command)
 
-        if x11:
-            # If X11 forwarding was specified, then the application
-            # can not run detached, so instead of invoking asynchronous
-            # 'run' we invoke synchronous 'execute'.
+        if self.in_foreground:
+            # If command should be ran in foreground, we invoke
+            # the node 'execute' method
             if not command:
                 msg = "No command is defined but X11 forwarding has been set"
                 self.error(msg)
                 self._state = ResourceState.FAILED
                 raise RuntimeError, msg
 
             if not command:
                 msg = "No command is defined but X11 forwarding has been set"
                 self.error(msg)
                 self._state = ResourceState.FAILED
                 raise RuntimeError, msg
 
-            if env:
-                # Export environment
-                environ = ""
-                for var in env.split(" "):
-                    environ += ' %s ' % var
+            # Export environment
+            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
+                if env else ""
 
 
-                command = "{" + environ + " ; " + command + " ; }"
-                command = self.replace_paths(command)
+            command = environ + command
+            command = self.replace_paths(command)
+            
+            x11 = self.get("forwardX11")
 
 
-            # If the command requires X11 forwarding, we
-            # can't run it asynchronously
-            (out, err), proc = self.node.execute(command,
+            # We save the reference to the process in self._proc 
+            # to be able to kill the process from the stop method
+            (out, err), self._proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
                     sudo = sudo,
                     stdin = stdin,
-                    forward_x11 = x11)
+                    forward_x11 = x11,
+                    blocking = False)
 
 
-            self._state = ResourceState.FINISHED
+            if self._proc.poll():
+                out = ""
+                err = self._proc.stderr.read()
+                self._state = ResourceState.FAILED
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+            
+            super(LinuxApplication, self).start()
 
 
-            if proc.poll() and err:
-                failed = True
-        else:
-            # Command was  previously uploaded, now run the remote
-            # bash file asynchronously
+        elif command:
+            # If command is set (i.e. application not used only for dependency
+            # installation), and it does not need to run in foreground, we use 
+            # the 'run' method of the node to launch the application as a daemon 
+
+            # The real command to execute was previously uploaded to a remote bash
+            # script during deployment, now run the remote script using 'run' method 
+            # from the node
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
@@ -437,23 +464,43 @@ class LinuxApplication(ResourceManager):
                 self._state = ResourceState.FAILED
 
                 raise RuntimeError, msg
                 self._state = ResourceState.FAILED
 
                 raise RuntimeError, msg
+            
+            super(LinuxApplication, self).start()
 
 
+        else:
+            # If no command was given (i.e. Application was used for dependency
+            # installation), then the application is directly marked as FINISHED
+            self._state = ResourceState.FINISHED
     def stop(self):
     def stop(self):
+        """ Stops application execution
+        """
         command = self.get('command') or ''
         state = self.state
         command = self.get('command') or ''
         state = self.state
-        
+
         if state == ResourceState.STARTED:
         if state == ResourceState.STARTED:
-            self.info("Stopping command '%s'" % command)
+            stopped = True
 
 
-            (out, err), proc = self.node.kill(self.pid, self.ppid)
+            self.info("Stopping command '%s'" % command)
+        
+            # If the command is running in foreground (it was launched using
+            # the node 'execute' method), then we use the handler to the Popen
+            # process to kill it. Else we send a kill signal using the pid and ppid
+            # retrieved after running the command with the node 'run' method
 
 
-            if out or err:
-                # check if execution errors occurred
-                msg = " Failed to STOP command '%s' " % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
-                stopped = False
+            if self._proc:
+                self._proc.kill()
             else:
             else:
+                (out, err), proc = self.node.kill(self.pid, self.ppid)
+
+                if out or err:
+                    # check if execution errors occurred
+                    msg = " Failed to STOP command '%s' " % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+                    stopped = False
+
+            if stopped:
                 super(LinuxApplication, self).stop()
 
     def release(self):
                 super(LinuxApplication, self).stop()
 
     def release(self):
@@ -470,32 +517,46 @@ class LinuxApplication(ResourceManager):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            # To avoid overwhelming the remote hosts and the local processor
-            # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' seconds.
-            state_check_delay = 0.5
-            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
-                # check if execution errors occurred
-                (out, err), proc = self.node.check_errors(self.app_home)
-
-                if out or err:
-                    if err.find("No such file or directory") >= 0 :
-                        # The resource is marked as started, but the
-                        # command was not yet executed
-                        return ResourceState.READY
-
-                    msg = " Failed to execute command '%s'" % self.get("command")
-                    self.error(msg, out, err)
+            if self.in_foreground:
+                retcode = self._proc.poll()
+                
+                # retcode == None -> running
+                # retcode > 0 -> error
+                # retcode == 0 -> finished
+                if retcode:
+                    out = ""
+                    err = self._proc.stderr.read()
                     self._state = ResourceState.FAILED
                     self._state = ResourceState.FAILED
+                    self.error(msg, out, err)
+                elif retcode == 0:
+                    self._state = ResourceState.FINISHED
 
 
-                elif self.pid and self.ppid:
-                    status = self.node.status(self.pid, self.ppid)
-
-                    if status == ProcStatus.FINISHED:
-                        self._state = ResourceState.FINISHED
-
-
-                self._last_state_check = strfnow()
+            else:
+                # To avoid overwhelming the remote hosts and the local processor
+                # with too many ssh queries, the state is only requested
+                # every 'state_check_delay' seconds.
+                state_check_delay = 0.5
+                if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
+                    # check if execution errors occurred
+                    (out, err), proc = self.node.check_errors(self.app_home)
+
+                    if out or err:
+                        if err.find("No such file or directory") >= 0 :
+                            # The resource is marked as started, but the
+                            # command was not yet executed
+                            return ResourceState.READY
+
+                        msg = " Failed to execute command '%s'" % self.get("command")
+                        self.error(msg, out, err)
+                        self._state = ResourceState.FAILED
+
+                    elif self.pid and self.ppid:
+                        status = self.node.status(self.pid, self.ppid)
+
+                        if status == ProcStatus.FINISHED:
+                            self._state = ResourceState.FINISHED
+
+                    self._last_state_check = strfnow()
 
         return self._state
 
 
         return self._state
 
index 06a735e..655bc62 100644 (file)
@@ -184,6 +184,92 @@ class LinuxCCND(LinuxApplication):
 
         super(LinuxCCND, self).deploy()
 
 
         super(LinuxCCND, self).deploy()
 
+    def start(self):
+        command = self.get("command")
+        env = self.get("env")
+        stdin = "stdin" if self.get("stdin") else None
+        stdout = "stdout" if self.get("stdout") else "stdout"
+        stderr = "stderr" if self.get("stderr") else "stderr"
+        sudo = self.get("sudo") or False
+        x11 = self.get("forwardX11") or False
+        failed = False
+
+        if not command:
+            # If no command was given, then the application 
+            # is directly marked as FINISHED
+            self._state = ResourceState.FINISHED
+    
+        self.info("Starting command '%s'" % command)
+
+        if x11:
+            # If X11 forwarding was specified, then the application
+            # can not run detached, so instead of invoking asynchronous
+            # 'run' we invoke synchronous 'execute'.
+            if not command:
+                msg = "No command is defined but X11 forwarding has been set"
+                self.error(msg)
+                self._state = ResourceState.FAILED
+                raise RuntimeError, msg
+
+            # Export environment
+            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
+                if env else ""
+
+            command = environ + command
+            command = self.replace_paths(command)
+
+            # Mark application as started before executing the command
+            # since after the thread will be blocked by the execution
+            # until it finished
+            super(LinuxApplication, self).start()
+            
+            # If the command requires X11 forwarding, we
+            # can't run it asynchronously
+            (out, err), proc = self.node.execute(command,
+                    sudo = sudo,
+                    stdin = stdin,
+                    forward_x11 = x11)
+
+            self._state = ResourceState.FINISHED
+
+            if proc.poll() and err:
+                failed = True
+        else:
+            # Command was  previously uploaded, now run the remote
+            # bash file asynchronously
+            cmd = "bash ./app.sh"
+            (out, err), proc = self.node.run(cmd, self.app_home, 
+                stdin = stdin, 
+                stdout = stdout,
+                stderr = stderr,
+                sudo = sudo)
+
+            # check if execution errors occurred
+            msg = " Failed to start command '%s' " % command
+            
+            if proc.poll() and err:
+                self.error(msg, out, err)
+                raise RuntimeError, msg
+        
+            # Check status of process running in background
+            pid, ppid = self.node.wait_pid(self.app_home)
+            if pid: self._pid = int(pid)
+            if ppid: self._ppid = int(ppid)
+
+            # If the process is not running, check for error information
+            # on the remote machine
+            if not self.pid or not self.ppid:
+                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
+                self.error(msg, out, err)
+
+                msg2 = " Setting state to Failed"
+                self.debug(msg2)
+                self._state = ResourceState.FAILED
+
+                raise RuntimeError, msg
+            
+            super(LinuxApplication, self).start()
+
     def stop(self):
         command = self.get('command') or ''
         state = self.state
     def stop(self):
         command = self.get('command') or ''
         state = self.state
@@ -191,17 +277,56 @@ class LinuxCCND(LinuxApplication):
         if state == ResourceState.STARTED:
             self.info("Stopping command '%s'" % command)
 
         if state == ResourceState.STARTED:
             self.info("Stopping command '%s'" % command)
 
-            (out, err), proc = self.node.kill(self.pid, self.ppid)
+            command = "ccndstop"
+            env = self.get("env") 
+
+            # replace application specific paths in the command
+            command = self.replace_paths(command)
+            env = env and self.replace_paths(env)
+
+            # Upload the command to a file, and execute asynchronously
+            self.node.run_and_wait(command, self.app_home,
+                        shfile = "ccndstop.sh",
+                        env = env,
+                        pidfile = "ccndstop_pidfile", 
+                        ecodefile = "ccndstop_exitcode", 
+                        stdout = "ccndstop_stdout", 
+                        stderr = "ccndstop_stderr")
+
 
 
-            if out or err:
+            super(LinuxCCND, self).stop()
+
+    @property
+    def state(self):
+        if self._state == ResourceState.STARTED:
+            # To avoid overwhelming the remote hosts and the local processor
+            # with too many ssh queries, the state is only requested
+            # every 'state_check_delay' seconds.
+            state_check_delay = 0.5
+            if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                 # check if execution errors occurred
                 # check if execution errors occurred
-                msg = " Failed to STOP command '%s' " % self.get("command")
-                self.error(msg, out, err)
-                self._state = ResourceState.FAILED
-                stopped = False
-            else:
-                super(LinuxApplication, self).stop()
+                (out, err), proc = self.node.check_errors(self.app_home)
+
+                if out or err:
+                    if err.find("No such file or directory") >= 0 :
+                        # The resource is marked as started, but the
+                        # command was not yet executed
+                        return ResourceState.READY
+
+                    msg = " Failed to execute command '%s'" % self.get("command")
+                    self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
+
+                elif self.pid and self.ppid:
+                    status = self.node.status(self.pid, self.ppid)
+
+                    if status == ProcStatus.FINISHED:
+                        self._state = ResourceState.FINISHED
+
+
+                self._last_state_check = strfnow()
 
 
+        return self._state
 
     @property
     def _default_command(self):
 
     @property
     def _default_command(self):
index 1abe208..02d1657 100644 (file)
@@ -23,23 +23,21 @@ def install_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = ""
-    for p in packages:
-        cmd += " ( dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ) ; " % {
-                'package': p}
-   
-    #cmd = (dpkg -s vim || sudo dpkg -s install vim) ; (...)
+    cmd = " && ".join(map(lambda p: 
+            " { dpkg -s %(package)s || sudo -S apt-get -y install %(package)s ; } " % {
+                    'package': p}, packages))
+        
+    #cmd = { dpkg -s vim || sudo -S apt-get -y install vim ; } && ..
     return cmd 
 
 def remove_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
     return cmd 
 
 def remove_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = ""
-    for p in packages:
-        cmd += " ( dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ) ; " % {
-                'package': p}
-    
-    #cmd = (dpkg -s vim || sudo apt-get -y purge vim) ; (...)
+    cmd = " && ".join(map(lambda p: 
+            " { dpkg -s %(package)s && sudo -S apt-get -y purge %(package)s ; } " % {
+                    'package': p}, packages))
+        
+    #cmd = { dpkg -s vim && sudo -S apt-get -y purge vim ; } && ..
     return cmd 
 
     return cmd 
 
index de194e5..b7e8668 100644 (file)
@@ -339,6 +339,7 @@ class LinuxNode(ResourceManager):
         
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
         
     def run_and_wait(self, command, home, 
             shfile = "cmd.sh",
+            env = None,
             pidfile = "pidfile", 
             ecodefile = "exitcode", 
             stdin = None, 
             pidfile = "pidfile", 
             ecodefile = "exitcode", 
             stdin = None, 
@@ -354,7 +355,10 @@ class LinuxNode(ResourceManager):
         since in the remote host the command can continue to run detached
         even if network disconnections occur
         """
         since in the remote host the command can continue to run detached
         even if network disconnections occur
         """
-        self.upload_command(command, home, shfile, ecodefile)
+        self.upload_command(command, home, 
+            shfile = shfile, 
+            ecodefile = ecodefile, 
+            env = env)
 
         command = "bash ./%s" % shfile
         # run command in background in remote host
 
         command = "bash ./%s" % shfile
         # run command in background in remote host
@@ -420,18 +424,26 @@ class LinuxNode(ResourceManager):
             shfile = "cmd.sh",
             ecodefile = "exitcode",
             env = None):
             shfile = "cmd.sh",
             ecodefile = "exitcode",
             env = None):
+        """ Saves the command as a bash script file in the remote host, and
+        forces to save the exit code of the command execution to the ecodefile
+        """
+      
+        # Prepare command to be executed as a bash script file
+        # Make sure command ends in ';' so the curly brackets syntax is correct
+        if not command.strip()[-1] == ';':
+            command += " ; "
 
 
-        command = " ( %(command)s ) ; echo $? > %(ecodefile)s " % {
+        # The exit code of the command will be stored in ecodefile
+        command = " { { %(command)s } ; echo $? > %(ecodefile)s ; } " % {
                 'command': command,
                 'ecodefile': ecodefile,
                 } 
 
         # Export environment
                 'command': command,
                 'ecodefile': ecodefile,
                 } 
 
         # Export environment
-        environ = ""
-        if env:
-            for var in env.split(" "):
-                environ += 'export %s\n' % var
+        environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) \
+            if env else ""
 
 
+        # Add environ to command
         command = environ + command
 
         dst = os.path.join(home, shfile)
         command = environ + command
 
         dst = os.path.join(home, shfile)
@@ -569,6 +581,7 @@ class LinuxNode(ResourceManager):
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
             connect_timeout = 30,
             strict_host_checking = False,
             persistent = True,
+            blocking = True,
             with_lock = False
             ):
         """ Notice that this invocation will block until the
             with_lock = False
             ):
         """ Notice that this invocation will block until the
@@ -602,6 +615,7 @@ class LinuxNode(ResourceManager):
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
                         err_on_timeout = err_on_timeout,
                         connect_timeout = connect_timeout,
                         persistent = persistent,
+                        blocking = blocking, 
                         strict_host_checking = strict_host_checking
                         )
             else:
                         strict_host_checking = strict_host_checking
                         )
             else:
@@ -622,7 +636,9 @@ class LinuxNode(ResourceManager):
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
                     retry = retry,
                     err_on_timeout = err_on_timeout,
                     connect_timeout = connect_timeout,
-                    persistent = persistent
+                    persistent = persistent,
+                    blocking = blocking, 
+                    strict_host_checking = strict_host_checking
                     )
 
         return (out, err), proc
                     )
 
         return (out, err), proc
index e42e7b8..5fa0592 100644 (file)
@@ -26,30 +26,29 @@ def install_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = "( %s )" % install_rpmfusion_command(os)
-    for p in packages:
-        cmd += " ; ( rpm -q %(package)s || sudo -S yum -y install %(package)s ) " % {
-                    'package': p}
+    cmd = install_rpmfusion_command(os) + " && "
+    cmd += " && ".join(map(lambda p: 
+            " { rpm -q %(package)s || sudo -S yum -y install %(package)s ; } " % {
+                    'package': p}, packages))
     
     
-    #cmd = ((rpm -q rpmfusion-free-release || sudo -s rpm -i ...) ; (rpm -q vim || sudo yum -y install vim))
-    return " ( %s )" % cmd 
+    #cmd = { rpm -q rpmfusion-free-release || sudo -s rpm -i ... ; } && { rpm -q vim || sudo yum -y install vim ; } && ..
+    return cmd 
 
 def remove_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
 
 def remove_packages_command(os, packages):
     if not isinstance(packages, list):
         packages = [packages]
 
-    cmd = ""
-    for p in packages:
-        cmd += " ( rpm -q %(package)s && sudo -S yum -y remove %(package)s ) ; " % {
-                    'package': p}
-
-    #cmd = (rpm -q vim || sudo yum -y remove vim) ; (...)
+    cmd = " && ".join(map(lambda p: 
+            " { rpm -q %(package)s && sudo -S yum -y remove %(package)s ; } " % {
+                    'package': p}, packages))
+        
+    #cmd = { rpm -q vim && sudo yum -y remove vim ; } && ..
     return cmd 
 
 def install_rpmfusion_command(os):
     from nepi.resources.linux.node import OSType
 
     return cmd 
 
 def install_rpmfusion_command(os):
     from nepi.resources.linux.node import OSType
 
-    cmd = "rpm -q rpmfusion-free-release || sudo -S rpm -i %(package)s"
+    cmd = " { rpm -q rpmfusion-free-release ||  sudo -S rpm -i %(package)s ; } "
 
     if os in [OSType.FEDORA, OSType.FEDORA_12]:
         cmd =  cmd %  {'package': RPM_FUSION_URL_F12}
 
     if os in [OSType.FEDORA, OSType.FEDORA_12]:
         cmd =  cmd %  {'package': RPM_FUSION_URL_F12}
index 50f5d7c..7fe579b 100644 (file)
@@ -16,6 +16,7 @@
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Claudio Freire <claudio-daniel.freire@inria.fr>
 
 import base64
 import errno
 
 import base64
 import errno
@@ -218,6 +219,7 @@ def rexec(command, host, user,
         connect_timeout = 30,
         persistent = True,
         forward_x11 = False,
         connect_timeout = 30,
         persistent = True,
         forward_x11 = False,
+        blocking = True,
         strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
         strict_host_checking = True):
     """
     Executes a remote command, returns ((stdout,stderr),process)
@@ -280,6 +282,12 @@ def rexec(command, host, user,
         # alive until the process is finished with it
         proc._known_hosts = tmp_known_hosts
     
         # alive until the process is finished with it
         proc._known_hosts = tmp_known_hosts
     
+        # by default, rexec calls _communicate which will block 
+        # until the process has exit. The argument block == False 
+        # forces to rexec to return immediately, without blocking 
+        if not blocking:
+           return (("", ""), proc)
+
         try:
             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
             msg = " rexec - host %s - command %s " % (host, " ".join(args))
         try:
             out, err = _communicate(proc, stdin, timeout, err_on_timeout)
             msg = " rexec - host %s - command %s " % (host, " ".join(args))
@@ -648,7 +656,7 @@ def rspawn(command, pidfile,
     else:
         stderr = ' ' + stderr
     
     else:
         stderr = ' ' + stderr
     
-    daemon_command = '{ { %(command)s  > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
+    daemon_command = '{ { %(command)s > %(stdout)s 2>%(stderr)s < %(stdin)s & } ; echo $! 1 > %(pidfile)s ; }' % {
         'command' : command,
         'pidfile' : shell_escape(pidfile),
         'stdout' : stdout,
         'command' : command,
         'pidfile' : shell_escape(pidfile),
         'stdout' : stdout,
index bb1de3e..7122b92 100755 (executable)
@@ -219,7 +219,7 @@ class ResourceManagerTestCase(unittest.TestCase):
         node = ec.register_resource("Node")
 
         apps = list()
         node = ec.register_resource("Node")
 
         apps = list()
-        for i in xrange(5000):
+        for i in xrange(1000):
             app = ec.register_resource("Application")
             ec.register_connection(app, node)
             apps.append(app)
             app = ec.register_resource("Application")
             ec.register_connection(app, node)
             apps.append(app)
index 0c69f71..b1e489c 100755 (executable)
@@ -25,7 +25,7 @@ from nepi.execution.trace import TraceAttr
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.linux.application import LinuxApplication
 
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.linux.application import LinuxApplication
 
-from test_utils import skipIfNotAlive
+from test_utils import skipIfNotAlive, skipInteractive
 
 import os
 import time
 
 import os
 import time
@@ -34,13 +34,13 @@ import unittest
 
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
 
 class LinuxApplicationTestCase(unittest.TestCase):
     def setUp(self):
-        self.fedora_host = "nepi5.pl.sophia.inria.fr"
+        self.fedora_host = "nepi2.pl.sophia.inria.fr"
         self.fedora_user = "inria_nepi"
 
         self.ubuntu_host = "roseval.pl.sophia.inria.fr"
         self.ubuntu_user = "alina"
         
         self.fedora_user = "inria_nepi"
 
         self.ubuntu_host = "roseval.pl.sophia.inria.fr"
         self.ubuntu_user = "alina"
         
-        self.target = "nepi3.pl.sophia.inria.fr"
+        self.target = "nepi5.pl.sophia.inria.fr"
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
 
     @skipIfNotAlive
     def t_stdout(self, host, user):
@@ -246,6 +246,36 @@ class LinuxApplicationTestCase(unittest.TestCase):
 
         ec.shutdown()
 
 
         ec.shutdown()
 
+    @skipIfNotAlive
+    def t_xterm(self, host, user):
+        from nepi.execution.resource import ResourceFactory
+        
+        ResourceFactory.register_type(LinuxNode)
+        ResourceFactory.register_type(LinuxApplication)
+
+        ec = ExperimentController()
+        
+        node = ec.register_resource("LinuxNode")
+        ec.set(node, "hostname", host)
+        ec.set(node, "username", user)
+        ec.set(node, "cleanHome", True)
+        ec.set(node, "cleanProcesses", True)
+
+        app = ec.register_resource("LinuxApplication")
+        ec.set(app, "command", "xterm")
+        ec.set(app, "depends", "xterm")
+        ec.set(app, "forwardX11", True)
+
+        ec.register_connection(app, node)
+
+        ec.deploy()
+
+        ec.wait_finished([app])
+
+        self.assertTrue(ec.state(app) == ResourceState.FINISHED)
+
+        ec.shutdown()
+
     def test_stdout_fedora(self):
         self.t_stdout(self.fedora_host, self.fedora_user)
 
     def test_stdout_fedora(self):
         self.t_stdout(self.fedora_host, self.fedora_user)
 
@@ -258,10 +288,10 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_ping_ubuntu(self):
         self.t_ping(self.ubuntu_host, self.ubuntu_user)
 
     def test_ping_ubuntu(self):
         self.t_ping(self.ubuntu_host, self.ubuntu_user)
 
-    def ztest_concurrency_fedora(self):
+    def test_concurrency_fedora(self):
         self.t_concurrency(self.fedora_host, self.fedora_user)
 
         self.t_concurrency(self.fedora_host, self.fedora_user)
 
-    def ztest_concurrency_ubuntu(self):
+    def test_concurrency_ubuntu(self):
         self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
 
     def test_condition_fedora(self):
         self.t_concurrency(self.ubuntu_host, self.ubuntu_user)
 
     def test_condition_fedora(self):
@@ -276,6 +306,12 @@ class LinuxApplicationTestCase(unittest.TestCase):
     def test_http_sources_ubuntu(self):
         self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
 
     def test_http_sources_ubuntu(self):
         self.t_http_sources(self.ubuntu_host, self.ubuntu_user)
 
+    @skipInteractive
+    def test_xterm_ubuntu(self):
+        """ Interactive test. Should not run automatically """
+        self.t_xterm(self.ubuntu_host, self.ubuntu_user)
+
+
 
 if __name__ == '__main__':
     unittest.main()
 
 if __name__ == '__main__':
     unittest.main()
index 907cb0a..414b3f3 100644 (file)
@@ -55,7 +55,7 @@ def skipIfNotAlive(func):
 def skipInteractive(func):
     name = func.__name__
     def wrapped(*args, **kwargs):
 def skipInteractive(func):
     name = func.__name__
     def wrapped(*args, **kwargs):
-        mode = os.environ.get("NEPI_INTERACTIVE", False)
+        mode = os.environ.get("NEPI_INTERACTIVE_TEST", False)
         mode = mode and  mode.lower() in ['true', 'yes']
         if not mode:
             print "*** WARNING: Skipping test %s: Interactive mode off \n" % name
         mode = mode and  mode.lower() in ['true', 'yes']
         if not mode:
             print "*** WARNING: Skipping test %s: Interactive mode off \n" % name