Fixed nasty concurrency bug in EC
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 18 Jun 2013 21:10:06 +0000 (14:10 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 18 Jun 2013 21:10:06 +0000 (14:10 -0700)
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/scheduler.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccnd.py
src/nepi/resources/linux/node.py
src/nepi/util/environ.py
src/nepi/util/parallel.py
test/resources/linux/node.py

index c6b973b..e9f4c54 100644 (file)
@@ -44,23 +44,42 @@ class Flags:
 class Attribute(object):
     """
     .. class:: Class Args :
 class Attribute(object):
     """
     .. class:: Class Args :
+
+        An Attribute reflects a configuration parameter for
+        a particular resource. Attributes might be read only or
+        not.
       
         :param name: Name of the attribute
         :type name: str
       
         :param name: Name of the attribute
         :type name: str
-        :param help: Help about the attribute
+
+        :param help: Attribute description
         :type help: str
         :type help: str
-        :param type: type of the attribute
+        
+        :param type: The type expected for the attribute value.
+                     Should be one of Attribute.Types .
         :type type: str
         :type type: str
-        :param flags: Help about the attribute
-        :type flags: str
+
+        :param flags: Defines attribute behavior (i.e. whether it is read-only,
+                read and write, etc). This parameter should take its values from
+                Attribute.Flags. Flags values can be bitwised.
+        :type flags: hex
+
         :param default: Default value of the attribute
         :param default: Default value of the attribute
-        :type default: str
-        :param allowed: Allowed value for this attribute
-        :type allowed: str
-        :param range: Range of the attribute
-        :type range: str
-        :param set_hook: hook that is related with this attribute
-        :type set_hook: str
+        :type default: depends on the type of attribute
+        
+        :param allowed: List of values that the attribute can take. 
+                This parameter is only meaningful for Enumerate type attributes.
+        :type allowed: list
+        
+        :param range: (max, min) tuple with range of possible values for
+                attributes.
+                This parameter is only meaningful for Integer or Double type
+                attributes.
+        :type range: (int, int) or (float, float)
+        
+        :param set_hook: Function that will be executed when ever a new 
+                value is set for the attribute.
+        :type set_hook: function
 
     """
     def __init__(self, name, help, type = Types.String,
 
     """
     def __init__(self, name, help, type = Types.String,
index 262aecf..b5663c0 100644 (file)
@@ -370,10 +370,12 @@ class ExperimentController(object):
 
         """
         rm = self.get_resource(guid)
 
         """
         rm = self.get_resource(guid)
+        state = rm.state
+
         if hr:
         if hr:
-            return ResourceState2str.get(rm.state)
+            return ResourceState2str.get(state)
 
 
-        return rm.state
+        return state
 
     def stop(self, guid):
         """ Stop a specific RM defined by its 'guid'
 
     def stop(self, guid):
         """ Stop a specific RM defined by its 'guid'
@@ -487,7 +489,7 @@ class ExperimentController(object):
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
-        #random.shuffle(group)
+        random.shuffle(group)
 
         def wait_all_and_start(group):
             reschedule = False
 
         def wait_all_and_start(group):
             reschedule = False
@@ -525,7 +527,6 @@ class ExperimentController(object):
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
-
     def release(self, group = None):
         """ Release the elements of the list 'group' or 
         all the resources if any group is specified
     def release(self, group = None):
         """ Release the elements of the list 'group' or 
         all the resources if any group is specified
@@ -592,7 +593,7 @@ class ExperimentController(object):
 
         if track:
             self._tasks[task.id] = task
 
         if track:
             self._tasks[task.id] = task
-  
+
         # Notify condition to wake up the processing thread
         self._notify()
 
         # Notify condition to wake up the processing thread
         self._notify()
 
@@ -601,6 +602,8 @@ class ExperimentController(object):
     def _process(self):
         """ Process scheduled tasks.
 
     def _process(self):
         """ Process scheduled tasks.
 
+        .. note::
+
         The _process method is executed in an independent thread held by the 
         ExperimentController for as long as the experiment is running.
         
         The _process method is executed in an independent thread held by the 
         ExperimentController for as long as the experiment is running.
         
@@ -641,51 +644,55 @@ class ExperimentController(object):
         try:
             while not self.finished:
                 self._cond.acquire()
         try:
             while not self.finished:
                 self._cond.acquire()
+
                 task = self._scheduler.next()
                 task = self._scheduler.next()
-                self._cond.release()
                 
                 if not task:
                 
                 if not task:
-                    # It there are not tasks in the tasks queue we need to 
-                    # wait until a call to schedule wakes us up
-                    self._cond.acquire()
+                    # No task to execute. Wait for a new task to be scheduled.
                     self._cond.wait()
                     self._cond.wait()
-                    self._cond.release()
-                else: 
-                    # If the task timestamp is in the future the thread needs to wait
-                    # until time elapse or until another task is scheduled
+                else:
+                    # The task timestamp is in the future. Wait for timeout 
+                    # or until another task is scheduled.
                     now = strfnow()
                     if now < task.timestamp:
                     now = strfnow()
                     if now < task.timestamp:
-                        # Calculate time difference in seconds
+                        # Calculate timeout in seconds
                         timeout = strfdiff(task.timestamp, now)
                         timeout = strfdiff(task.timestamp, now)
+
                         # Re-schedule task with the same timestamp
                         self._scheduler.schedule(task)
                         # Re-schedule task with the same timestamp
                         self._scheduler.schedule(task)
-                        # Sleep until timeout or until a new task awakes the condition
-                        self._cond.acquire()
+                        
+                        task = None
+
+                        # Wait timeout or until a new task awakes the condition
                         self._cond.wait(timeout)
                         self._cond.wait(timeout)
-                        self._cond.release()
-                    else:
-                        # Process tasks in parallel
-                        runner.put(self._execute, task)
+               
+                self._cond.release()
+
+                if task:
+                    # Process tasks in parallel
+                    runner.put(self._execute, task)
         except: 
             import traceback
             err = traceback.format_exc()
         except: 
             import traceback
             err = traceback.format_exc()
-            self._logger.error("Error while processing tasks in the EC: %s" % err)
+            self.logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
         finally:   
 
             self._state = ECState.FAILED
         finally:   
-            self._logger.info("Exiting the task processing loop ... ")
+            self.logger.debug("Exiting the task processing loop ... ")
             runner.sync()
 
     def _execute(self, task):
         """ Executes a single task. 
 
             runner.sync()
 
     def _execute(self, task):
         """ Executes a single task. 
 
-            If the invokation of the task callback raises an
-            exception, the processing thread of the ExperimentController
-            will be stopped and the experiment will be aborted.
-
             :param task: Object containing the callback to execute
             :type task: Task
 
             :param task: Object containing the callback to execute
             :type task: Task
 
+        .. note::
+
+        If the invokation of the task callback raises an
+        exception, the processing thread of the ExperimentController
+        will be stopped and the experiment will be aborted.
+
         """
         # Invoke callback
         task.status = TaskStatus.DONE
         """
         # Invoke callback
         task.status = TaskStatus.DONE
@@ -698,7 +705,7 @@ class ExperimentController(object):
             task.result = err
             task.status = TaskStatus.ERROR
             
             task.result = err
             task.status = TaskStatus.ERROR
             
-            self._logger.error("Error occurred while executing task: %s" % err)
+            self.logger.error("Error occurred while executing task: %s" % err)
 
             # Set the EC to FAILED state (this will force to exit the task
             # processing thread)
 
             # Set the EC to FAILED state (this will force to exit the task
             # processing thread)
index 8d70fbb..53a7530 100644 (file)
@@ -62,6 +62,7 @@ class HeapScheduler(object):
         """
         if task.id == None:
             task.id = self._idgen.next()
         """
         if task.id == None:
             task.id = self._idgen.next()
+
         entry = (task.timestamp, task.id, task)
         self._valid.add(task.id)
         heapq.heappush(self._queue, entry)
         entry = (task.timestamp, task.id, task)
         self._valid.add(task.id)
         heapq.heappush(self._queue, entry)
index 57c5304..13a8a9b 100644 (file)
@@ -151,13 +151,14 @@ class LinuxApplication(ResourceManager):
 
     @property
     def in_foreground(self):
 
     @property
     def in_foreground(self):
-        """ Returns True is the command needs to be executed in foreground.
+        """ Returns True if the command needs to be executed in foreground.
         This means that command will be executed using 'execute' instead of
         This means that command will be executed using 'execute' instead of
-        'run'.
+        'run' ('run' executes a command in background and detached from the 
+        terminal)
 
         When using X11 forwarding option, the command can not run in background
 
         When using X11 forwarding option, the command can not run in background
-        and detached from a terminal in the remote host, since we need to keep 
-        the SSH connection to receive graphical data
+        and detached from a terminal, since we need to keep the terminal attached 
+        to interact with it.
         """
         return self.get("forwardX11") or False
 
         """
         return self.get("forwardX11") or False
 
@@ -392,8 +393,8 @@ class LinuxApplication(ResourceManager):
         self.info("Starting command '%s'" % command)
 
         if self.in_foreground:
         self.info("Starting command '%s'" % command)
 
         if self.in_foreground:
-            # If command should be ran in foreground, we invoke
-            # the node 'execute' method
+            # If command should run in foreground, we invoke 'execute' method
+            # of the node
             if not command:
                 msg = "No command is defined but X11 forwarding has been set"
                 self.error(msg)
             if not command:
                 msg = "No command is defined but X11 forwarding has been set"
                 self.error(msg)
@@ -401,8 +402,7 @@ class LinuxApplication(ResourceManager):
                 raise RuntimeError, msg
 
             # Export environment
                 raise RuntimeError, msg
 
             # Export environment
-            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
-                if env else ""
+            environ = self.node.format_environment(env, inline = True)
 
             command = environ + command
             command = self.replace_paths(command)
 
             command = environ + command
             command = self.replace_paths(command)
@@ -410,7 +410,9 @@ class LinuxApplication(ResourceManager):
             x11 = self.get("forwardX11")
 
             # We save the reference to the process in self._proc 
             x11 = self.get("forwardX11")
 
             # We save the reference to the process in self._proc 
-            # to be able to kill the process from the stop method
+            # to be able to kill the process from the stop method.
+            # We also set blocking = False, since we don't want the
+            # thread to block until the execution finishes.
             (out, err), self._proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
             (out, err), self._proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
@@ -427,13 +429,14 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).start()
 
         elif command:
             super(LinuxApplication, self).start()
 
         elif command:
-            # If command is set (i.e. application not used only for dependency
-            # installation), and it does not need to run in foreground, we use 
-            # the 'run' method of the node to launch the application as a daemon 
+            # If command is set (i.e. application is not used only for dependency
+            # installation), and it does not need to run in foreground, then we  
+            # invoke the 'run' method of the node to launch the application as a 
+            # daemon in background
 
             # The real command to execute was previously uploaded to a remote bash
 
             # The real command to execute was previously uploaded to a remote bash
-            # script during deployment, now run the remote script using 'run' method 
-            # from the node
+            # script during deployment, now launch the remote script using 'run'
+            # method from the node
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
@@ -516,8 +519,12 @@ class LinuxApplication(ResourceManager):
     
     @property
     def state(self):
     
     @property
     def state(self):
+        """ Returns the state of the application
+        """
         if self._state == ResourceState.STARTED:
             if self.in_foreground:
         if self._state == ResourceState.STARTED:
             if self.in_foreground:
+                # Check if the process we used to execute the command
+                # is still running ...
                 retcode = self._proc.poll()
                 
                 # retcode == None -> running
                 retcode = self._proc.poll()
                 
                 # retcode == None -> running
@@ -525,32 +532,31 @@ class LinuxApplication(ResourceManager):
                 # retcode == 0 -> finished
                 if retcode:
                     out = ""
                 # retcode == 0 -> finished
                 if retcode:
                     out = ""
+                    msg = " Failed to execute command '%s'" % self.get("command")
                     err = self._proc.stderr.read()
                     err = self._proc.stderr.read()
-                    self._state = ResourceState.FAILED
                     self.error(msg, out, err)
                     self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
                 elif retcode == 0:
                     self._state = ResourceState.FINISHED
 
             else:
                 elif retcode == 0:
                     self._state = ResourceState.FINISHED
 
             else:
-                # To avoid overwhelming the remote hosts and the local processor
-                # with too many ssh queries, the state is only requested
-                # every 'state_check_delay' seconds.
+                # We need to query the status of the command we launched in 
+                # background. In oredr to avoid overwhelming the remote host and
+                # the local processor with too many ssh queries, the state is only
+                # requested every 'state_check_delay' seconds.
                 state_check_delay = 0.5
                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                     # check if execution errors occurred
                     (out, err), proc = self.node.check_errors(self.app_home)
 
                 state_check_delay = 0.5
                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                     # check if execution errors occurred
                     (out, err), proc = self.node.check_errors(self.app_home)
 
-                    if out or err:
-                        if err.find("No such file or directory") >= 0 :
-                            # The resource is marked as started, but the
-                            # command was not yet executed
-                            return ResourceState.READY
-
+                    if err:
                         msg = " Failed to execute command '%s'" % self.get("command")
                         self.error(msg, out, err)
                         self._state = ResourceState.FAILED
 
                     elif self.pid and self.ppid:
                         msg = " Failed to execute command '%s'" % self.get("command")
                         self.error(msg, out, err)
                         self._state = ResourceState.FAILED
 
                     elif self.pid and self.ppid:
+                        # No execution errors occurred. Make sure the background
+                        # process with the recorded pid is still running.
                         status = self.node.status(self.pid, self.ppid)
 
                         if status == ProcStatus.FINISHED:
                         status = self.node.status(self.pid, self.ppid)
 
                         if status == ProcStatus.FINISHED:
index 655bc62..44b5f16 100644 (file)
@@ -23,6 +23,8 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 
+from nepi.util.sshfuncs import ProcStatus
+from nepi.util.timefuncs import strfnow, strfdiff
 import os
 
 @clsinit_copy
 import os
 
 @clsinit_copy
@@ -185,90 +187,7 @@ class LinuxCCND(LinuxApplication):
         super(LinuxCCND, self).deploy()
 
     def start(self):
         super(LinuxCCND, self).deploy()
 
     def start(self):
-        command = self.get("command")
-        env = self.get("env")
-        stdin = "stdin" if self.get("stdin") else None
-        stdout = "stdout" if self.get("stdout") else "stdout"
-        stderr = "stderr" if self.get("stderr") else "stderr"
-        sudo = self.get("sudo") or False
-        x11 = self.get("forwardX11") or False
-        failed = False
-
-        if not command:
-            # If no command was given, then the application 
-            # is directly marked as FINISHED
-            self._state = ResourceState.FINISHED
-    
-        self.info("Starting command '%s'" % command)
-
-        if x11:
-            # If X11 forwarding was specified, then the application
-            # can not run detached, so instead of invoking asynchronous
-            # 'run' we invoke synchronous 'execute'.
-            if not command:
-                msg = "No command is defined but X11 forwarding has been set"
-                self.error(msg)
-                self._state = ResourceState.FAILED
-                raise RuntimeError, msg
-
-            # Export environment
-            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
-                if env else ""
-
-            command = environ + command
-            command = self.replace_paths(command)
-
-            # Mark application as started before executing the command
-            # since after the thread will be blocked by the execution
-            # until it finished
-            super(LinuxApplication, self).start()
-            
-            # If the command requires X11 forwarding, we
-            # can't run it asynchronously
-            (out, err), proc = self.node.execute(command,
-                    sudo = sudo,
-                    stdin = stdin,
-                    forward_x11 = x11)
-
-            self._state = ResourceState.FINISHED
-
-            if proc.poll() and err:
-                failed = True
-        else:
-            # Command was  previously uploaded, now run the remote
-            # bash file asynchronously
-            cmd = "bash ./app.sh"
-            (out, err), proc = self.node.run(cmd, self.app_home, 
-                stdin = stdin, 
-                stdout = stdout,
-                stderr = stderr,
-                sudo = sudo)
-
-            # check if execution errors occurred
-            msg = " Failed to start command '%s' " % command
-            
-            if proc.poll() and err:
-                self.error(msg, out, err)
-                raise RuntimeError, msg
-        
-            # Check status of process running in background
-            pid, ppid = self.node.wait_pid(self.app_home)
-            if pid: self._pid = int(pid)
-            if ppid: self._ppid = int(ppid)
-
-            # If the process is not running, check for error information
-            # on the remote machine
-            if not self.pid or not self.ppid:
-                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
-                self.error(msg, out, err)
-
-                msg2 = " Setting state to Failed"
-                self.debug(msg2)
-                self._state = ResourceState.FAILED
-
-                raise RuntimeError, msg
-            
-            super(LinuxApplication, self).start()
+        super(LinuxCCND, self).start()
 
     def stop(self):
         command = self.get('command') or ''
 
     def stop(self):
         command = self.get('command') or ''
@@ -299,31 +218,29 @@ class LinuxCCND(LinuxApplication):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            # To avoid overwhelming the remote hosts and the local processor
-            # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' seconds.
+            # we executed the ccndstart command. This should have started
+            # a remote ccnd daemon. The way we can query wheather ccnd is
+            # still running is by executing the ccndstatus command.
             state_check_delay = 0.5
             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
             state_check_delay = 0.5
             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
-                # check if execution errors occurred
-                (out, err), proc = self.node.check_errors(self.app_home)
+                env = self.get('env') or ""
+                environ = self.node.format_environment(env, inline = True)
+                command = environ + "; ccndstatus"
+                command = self.replace_paths(command)
+            
+                (out, err), proc = self.node.execute(command)
 
 
-                if out or err:
-                    if err.find("No such file or directory") >= 0 :
-                        # The resource is marked as started, but the
-                        # command was not yet executed
-                        return ResourceState.READY
+                retcode = proc.poll()
 
 
-                    msg = " Failed to execute command '%s'" % self.get("command")
+                if retcode == 1 and err.find("No such file or directory") > -1:
+                    # ccnd is not running (socket not found)
+                    self._state = ResourceState.FINISHED
+                elif retcode:
+                    # other error
+                    msg = " Failed to execute command '%s'" % command
                     self.error(msg, out, err)
                     self._state = ResourceState.FAILED
 
                     self.error(msg, out, err)
                     self._state = ResourceState.FAILED
 
-                elif self.pid and self.ppid:
-                    status = self.node.status(self.pid, self.ppid)
-
-                    if status == ProcStatus.FINISHED:
-                        self._state = ResourceState.FINISHED
-
-
                 self._last_state_check = strfnow()
 
         return self._state
                 self._last_state_check = strfnow()
 
         return self._state
@@ -356,7 +273,8 @@ class LinuxCCND(LinuxApplication):
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
-                "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+                " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+                " echo 'sources found, nothing to do' "
             " ) || ( "
             # If not, untar and build
                 " ( "
             " ) || ( "
             # If not, untar and build
                 " ( "
@@ -373,8 +291,10 @@ class LinuxCCND(LinuxApplication):
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
-                "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+                " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+                " echo 'sources found, nothing to do' "
             " ) || ( "
             " ) || ( "
+            # If not, install
                 "  mkdir -p ${EXP_HOME}/ccnx/bin && "
                 "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
             " )"
                 "  mkdir -p ${EXP_HOME}/ccnx/bin && "
                 "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
             " )"
index 7ac2d12..c2c825e 100644 (file)
@@ -388,7 +388,7 @@ class LinuxNode(ResourceManager):
         (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
         # Out is what was written in the stderr file
         (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
         # Out is what was written in the stderr file
-        if out or err:
+        if err:
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
@@ -434,8 +434,7 @@ class LinuxNode(ResourceManager):
                 } 
 
         # Export environment
                 } 
 
         # Export environment
-        environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) + "\n" \
-            if env else ""
+        environ = self.format_environment(env)
 
         # Add environ to command
         command = environ + command
 
         # Add environ to command
         command = environ + command
@@ -443,6 +442,16 @@ class LinuxNode(ResourceManager):
         dst = os.path.join(home, shfile)
         return self.upload(command, dst, text = True)
 
         dst = os.path.join(home, shfile)
         return self.upload(command, dst, text = True)
 
+    def format_environment(self, env, inline = False):
+        """Format environmental variables for command to be executed either
+        as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
+        as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
+        """
+        sep = " " if inline else "\n"
+        export = " " if inline else "export"
+        return sep.join(map(lambda e: "%s %s" % (export, e), env.split(" "))) \
+                + sep if env else ""
+
     def check_errors(self, home, 
             ecodefile = "exitcode", 
             stderr = "stderr"):
     def check_errors(self, home, 
             ecodefile = "exitcode", 
             stderr = "stderr"):
@@ -450,11 +459,12 @@ class LinuxNode(ResourceManager):
         Checks whether errors occurred while running a command.
         It first checks the exit code for the command, and only if the
         exit code is an error one it returns the error output.
         Checks whether errors occurred while running a command.
         It first checks the exit code for the command, and only if the
         exit code is an error one it returns the error output.
+
         """
         out = err = ""
         proc = None
 
         """
         out = err = ""
         proc = None
 
-        # get Exit code
+        # get exit code saved in the 'exitcode' file
         ecode = self.exitcode(home, ecodefile)
 
         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
         ecode = self.exitcode(home, ecodefile)
 
         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
@@ -463,12 +473,18 @@ class LinuxNode(ResourceManager):
             # The process returned an error code or didn't exist. 
             # Check standard error.
             (out, err), proc = self.check_output(home, stderr)
             # The process returned an error code or didn't exist. 
             # Check standard error.
             (out, err), proc = self.check_output(home, stderr)
-            
-            # If the stderr file was not found, assume nothing happened.
-            # We just ignore the error.
+
+            # If the stderr file was not found, assume nothing bad happened,
+            # and just ignore the error.
             # (cat returns 1 for error "No such file or directory")
             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
                 out = err = ""
             # (cat returns 1 for error "No such file or directory")
             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
                 out = err = ""
+            else:
+                # The actual error (read from the stderr file) is in 'out'.
+                # We swap the variables to avoid confusion. It is more
+                # intuitive to find the 'error' in err variable.
+                err = out
+                out = ""
        
         return (out, err), proc
  
        
         return (out, err), proc
  
index 218370c..b403546 100644 (file)
 #    You should have received a copy of the GNU General Public License
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 #    You should have received a copy of the GNU General Public License
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Martin Ferrari <martin.ferrari@inria.fr>
+
+
 
 import ctypes
 import imp
 
 import ctypes
 import imp
index e354b42..fffdea5 100644 (file)
@@ -15,6 +15,8 @@
 #    You should have received a copy of the GNU General Public License
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 #    You should have received a copy of the GNU General Public License
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
+# Author: Claudio Freire <claudio-daniel.freire@inria.fr>
+#
 
 import threading
 import Queue
 
 import threading
 import Queue
index 259f578..0c3f86c 100755 (executable)
@@ -153,11 +153,13 @@ class LinuxNodeTestCase(unittest.TestCase):
  
         # get the pid of the process
         ecode = node.exitcode(app_home)
  
         # get the pid of the process
         ecode = node.exitcode(app_home)
+
         # bash erro 127 - command not found
         self.assertEquals(ecode, 127)
  
         (out, err), proc = node.check_errors(app_home)
         # bash erro 127 - command not found
         self.assertEquals(ecode, 127)
  
         (out, err), proc = node.check_errors(app_home)
-        self.assertNotEquals(out, "")
+
+        self.assertEquals(err.strip(), "./cmd.sh: line 1: unexistent-command: command not found")
 
     @skipIfNotAlive
     def t_install(self, host, user):
 
     @skipIfNotAlive
     def t_install(self, host, user):