Fixed nasty concurrency bug in EC
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 18 Jun 2013 21:10:06 +0000 (14:10 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 18 Jun 2013 21:10:06 +0000 (14:10 -0700)
src/nepi/execution/attribute.py
src/nepi/execution/ec.py
src/nepi/execution/scheduler.py
src/nepi/resources/linux/application.py
src/nepi/resources/linux/ccnd.py
src/nepi/resources/linux/node.py
src/nepi/util/environ.py
src/nepi/util/parallel.py
test/resources/linux/node.py

index c6b973b..e9f4c54 100644 (file)
@@ -44,23 +44,42 @@ class Flags:
 class Attribute(object):
     """
     .. class:: Class Args :
+
+        An Attribute reflects a configuration parameter for
+        a particular resource. Attributes might be read only or
+        not.
       
         :param name: Name of the attribute
         :type name: str
-        :param help: Help about the attribute
+
+        :param help: Attribute description
         :type help: str
-        :param type: type of the attribute
+        
+        :param type: The type expected for the attribute value.
+                     Should be one of Attribute.Types .
         :type type: str
-        :param flags: Help about the attribute
-        :type flags: str
+
+        :param flags: Defines attribute behavior (i.e. whether it is read-only,
+                read and write, etc). This parameter should take its values from
+                Attribute.Flags. Flags values can be bitwised.
+        :type flags: hex
+
         :param default: Default value of the attribute
-        :type default: str
-        :param allowed: Allowed value for this attribute
-        :type allowed: str
-        :param range: Range of the attribute
-        :type range: str
-        :param set_hook: hook that is related with this attribute
-        :type set_hook: str
+        :type default: depends on the type of attribute
+        
+        :param allowed: List of values that the attribute can take. 
+                This parameter is only meaningful for Enumerate type attributes.
+        :type allowed: list
+        
+        :param range: (max, min) tuple with range of possible values for
+                attributes.
+                This parameter is only meaningful for Integer or Double type
+                attributes.
+        :type range: (int, int) or (float, float)
+        
+        :param set_hook: Function that will be executed when ever a new 
+                value is set for the attribute.
+        :type set_hook: function
 
     """
     def __init__(self, name, help, type = Types.String,
index 262aecf..b5663c0 100644 (file)
@@ -370,10 +370,12 @@ class ExperimentController(object):
 
         """
         rm = self.get_resource(guid)
+        state = rm.state
+
         if hr:
-            return ResourceState2str.get(rm.state)
+            return ResourceState2str.get(state)
 
-        return rm.state
+        return state
 
     def stop(self, guid):
         """ Stop a specific RM defined by its 'guid'
@@ -487,7 +489,7 @@ class ExperimentController(object):
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
-        #random.shuffle(group)
+        random.shuffle(group)
 
         def wait_all_and_start(group):
             reschedule = False
@@ -525,7 +527,6 @@ class ExperimentController(object):
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
-
     def release(self, group = None):
         """ Release the elements of the list 'group' or 
         all the resources if any group is specified
@@ -592,7 +593,7 @@ class ExperimentController(object):
 
         if track:
             self._tasks[task.id] = task
-  
+
         # Notify condition to wake up the processing thread
         self._notify()
 
@@ -601,6 +602,8 @@ class ExperimentController(object):
     def _process(self):
         """ Process scheduled tasks.
 
+        .. note::
+
         The _process method is executed in an independent thread held by the 
         ExperimentController for as long as the experiment is running.
         
@@ -641,51 +644,55 @@ class ExperimentController(object):
         try:
             while not self.finished:
                 self._cond.acquire()
+
                 task = self._scheduler.next()
-                self._cond.release()
                 
                 if not task:
-                    # It there are not tasks in the tasks queue we need to 
-                    # wait until a call to schedule wakes us up
-                    self._cond.acquire()
+                    # No task to execute. Wait for a new task to be scheduled.
                     self._cond.wait()
-                    self._cond.release()
-                else: 
-                    # If the task timestamp is in the future the thread needs to wait
-                    # until time elapse or until another task is scheduled
+                else:
+                    # The task timestamp is in the future. Wait for timeout 
+                    # or until another task is scheduled.
                     now = strfnow()
                     if now < task.timestamp:
-                        # Calculate time difference in seconds
+                        # Calculate timeout in seconds
                         timeout = strfdiff(task.timestamp, now)
+
                         # Re-schedule task with the same timestamp
                         self._scheduler.schedule(task)
-                        # Sleep until timeout or until a new task awakes the condition
-                        self._cond.acquire()
+                        
+                        task = None
+
+                        # Wait timeout or until a new task awakes the condition
                         self._cond.wait(timeout)
-                        self._cond.release()
-                    else:
-                        # Process tasks in parallel
-                        runner.put(self._execute, task)
+               
+                self._cond.release()
+
+                if task:
+                    # Process tasks in parallel
+                    runner.put(self._execute, task)
         except: 
             import traceback
             err = traceback.format_exc()
-            self._logger.error("Error while processing tasks in the EC: %s" % err)
+            self.logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
         finally:   
-            self._logger.info("Exiting the task processing loop ... ")
+            self.logger.debug("Exiting the task processing loop ... ")
             runner.sync()
 
     def _execute(self, task):
         """ Executes a single task. 
 
-            If the invokation of the task callback raises an
-            exception, the processing thread of the ExperimentController
-            will be stopped and the experiment will be aborted.
-
             :param task: Object containing the callback to execute
             :type task: Task
 
+        .. note::
+
+        If the invokation of the task callback raises an
+        exception, the processing thread of the ExperimentController
+        will be stopped and the experiment will be aborted.
+
         """
         # Invoke callback
         task.status = TaskStatus.DONE
@@ -698,7 +705,7 @@ class ExperimentController(object):
             task.result = err
             task.status = TaskStatus.ERROR
             
-            self._logger.error("Error occurred while executing task: %s" % err)
+            self.logger.error("Error occurred while executing task: %s" % err)
 
             # Set the EC to FAILED state (this will force to exit the task
             # processing thread)
index 8d70fbb..53a7530 100644 (file)
@@ -62,6 +62,7 @@ class HeapScheduler(object):
         """
         if task.id == None:
             task.id = self._idgen.next()
+
         entry = (task.timestamp, task.id, task)
         self._valid.add(task.id)
         heapq.heappush(self._queue, entry)
index 57c5304..13a8a9b 100644 (file)
@@ -151,13 +151,14 @@ class LinuxApplication(ResourceManager):
 
     @property
     def in_foreground(self):
-        """ Returns True is the command needs to be executed in foreground.
+        """ Returns True if the command needs to be executed in foreground.
         This means that command will be executed using 'execute' instead of
-        'run'.
+        'run' ('run' executes a command in background and detached from the 
+        terminal)
 
         When using X11 forwarding option, the command can not run in background
-        and detached from a terminal in the remote host, since we need to keep 
-        the SSH connection to receive graphical data
+        and detached from a terminal, since we need to keep the terminal attached 
+        to interact with it.
         """
         return self.get("forwardX11") or False
 
@@ -392,8 +393,8 @@ class LinuxApplication(ResourceManager):
         self.info("Starting command '%s'" % command)
 
         if self.in_foreground:
-            # If command should be ran in foreground, we invoke
-            # the node 'execute' method
+            # If command should run in foreground, we invoke 'execute' method
+            # of the node
             if not command:
                 msg = "No command is defined but X11 forwarding has been set"
                 self.error(msg)
@@ -401,8 +402,7 @@ class LinuxApplication(ResourceManager):
                 raise RuntimeError, msg
 
             # Export environment
-            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
-                if env else ""
+            environ = self.node.format_environment(env, inline = True)
 
             command = environ + command
             command = self.replace_paths(command)
@@ -410,7 +410,9 @@ class LinuxApplication(ResourceManager):
             x11 = self.get("forwardX11")
 
             # We save the reference to the process in self._proc 
-            # to be able to kill the process from the stop method
+            # to be able to kill the process from the stop method.
+            # We also set blocking = False, since we don't want the
+            # thread to block until the execution finishes.
             (out, err), self._proc = self.node.execute(command,
                     sudo = sudo,
                     stdin = stdin,
@@ -427,13 +429,14 @@ class LinuxApplication(ResourceManager):
             super(LinuxApplication, self).start()
 
         elif command:
-            # If command is set (i.e. application not used only for dependency
-            # installation), and it does not need to run in foreground, we use 
-            # the 'run' method of the node to launch the application as a daemon 
+            # If command is set (i.e. application is not used only for dependency
+            # installation), and it does not need to run in foreground, then we  
+            # invoke the 'run' method of the node to launch the application as a 
+            # daemon in background
 
             # The real command to execute was previously uploaded to a remote bash
-            # script during deployment, now run the remote script using 'run' method 
-            # from the node
+            # script during deployment, now launch the remote script using 'run'
+            # method from the node
             cmd = "bash ./app.sh"
             (out, err), proc = self.node.run(cmd, self.app_home, 
                 stdin = stdin, 
@@ -516,8 +519,12 @@ class LinuxApplication(ResourceManager):
     
     @property
     def state(self):
+        """ Returns the state of the application
+        """
         if self._state == ResourceState.STARTED:
             if self.in_foreground:
+                # Check if the process we used to execute the command
+                # is still running ...
                 retcode = self._proc.poll()
                 
                 # retcode == None -> running
@@ -525,32 +532,31 @@ class LinuxApplication(ResourceManager):
                 # retcode == 0 -> finished
                 if retcode:
                     out = ""
+                    msg = " Failed to execute command '%s'" % self.get("command")
                     err = self._proc.stderr.read()
-                    self._state = ResourceState.FAILED
                     self.error(msg, out, err)
+                    self._state = ResourceState.FAILED
                 elif retcode == 0:
                     self._state = ResourceState.FINISHED
 
             else:
-                # To avoid overwhelming the remote hosts and the local processor
-                # with too many ssh queries, the state is only requested
-                # every 'state_check_delay' seconds.
+                # We need to query the status of the command we launched in 
+                # background. In oredr to avoid overwhelming the remote host and
+                # the local processor with too many ssh queries, the state is only
+                # requested every 'state_check_delay' seconds.
                 state_check_delay = 0.5
                 if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
                     # check if execution errors occurred
                     (out, err), proc = self.node.check_errors(self.app_home)
 
-                    if out or err:
-                        if err.find("No such file or directory") >= 0 :
-                            # The resource is marked as started, but the
-                            # command was not yet executed
-                            return ResourceState.READY
-
+                    if err:
                         msg = " Failed to execute command '%s'" % self.get("command")
                         self.error(msg, out, err)
                         self._state = ResourceState.FAILED
 
                     elif self.pid and self.ppid:
+                        # No execution errors occurred. Make sure the background
+                        # process with the recorded pid is still running.
                         status = self.node.status(self.pid, self.ppid)
 
                         if status == ProcStatus.FINISHED:
index 655bc62..44b5f16 100644 (file)
@@ -23,6 +23,8 @@ from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState
 from nepi.resources.linux.application import LinuxApplication
 from nepi.resources.linux.node import OSType
 
+from nepi.util.sshfuncs import ProcStatus
+from nepi.util.timefuncs import strfnow, strfdiff
 import os
 
 @clsinit_copy
@@ -185,90 +187,7 @@ class LinuxCCND(LinuxApplication):
         super(LinuxCCND, self).deploy()
 
     def start(self):
-        command = self.get("command")
-        env = self.get("env")
-        stdin = "stdin" if self.get("stdin") else None
-        stdout = "stdout" if self.get("stdout") else "stdout"
-        stderr = "stderr" if self.get("stderr") else "stderr"
-        sudo = self.get("sudo") or False
-        x11 = self.get("forwardX11") or False
-        failed = False
-
-        if not command:
-            # If no command was given, then the application 
-            # is directly marked as FINISHED
-            self._state = ResourceState.FINISHED
-    
-        self.info("Starting command '%s'" % command)
-
-        if x11:
-            # If X11 forwarding was specified, then the application
-            # can not run detached, so instead of invoking asynchronous
-            # 'run' we invoke synchronous 'execute'.
-            if not command:
-                msg = "No command is defined but X11 forwarding has been set"
-                self.error(msg)
-                self._state = ResourceState.FAILED
-                raise RuntimeError, msg
-
-            # Export environment
-            environ = "\n".join(map(lambda e: "export %s" % e, env.split(" ")))\
-                if env else ""
-
-            command = environ + command
-            command = self.replace_paths(command)
-
-            # Mark application as started before executing the command
-            # since after the thread will be blocked by the execution
-            # until it finished
-            super(LinuxApplication, self).start()
-            
-            # If the command requires X11 forwarding, we
-            # can't run it asynchronously
-            (out, err), proc = self.node.execute(command,
-                    sudo = sudo,
-                    stdin = stdin,
-                    forward_x11 = x11)
-
-            self._state = ResourceState.FINISHED
-
-            if proc.poll() and err:
-                failed = True
-        else:
-            # Command was  previously uploaded, now run the remote
-            # bash file asynchronously
-            cmd = "bash ./app.sh"
-            (out, err), proc = self.node.run(cmd, self.app_home, 
-                stdin = stdin, 
-                stdout = stdout,
-                stderr = stderr,
-                sudo = sudo)
-
-            # check if execution errors occurred
-            msg = " Failed to start command '%s' " % command
-            
-            if proc.poll() and err:
-                self.error(msg, out, err)
-                raise RuntimeError, msg
-        
-            # Check status of process running in background
-            pid, ppid = self.node.wait_pid(self.app_home)
-            if pid: self._pid = int(pid)
-            if ppid: self._ppid = int(ppid)
-
-            # If the process is not running, check for error information
-            # on the remote machine
-            if not self.pid or not self.ppid:
-                (out, err), proc = self.node.check_output(self.app_home, 'stderr')
-                self.error(msg, out, err)
-
-                msg2 = " Setting state to Failed"
-                self.debug(msg2)
-                self._state = ResourceState.FAILED
-
-                raise RuntimeError, msg
-            
-            super(LinuxApplication, self).start()
+        super(LinuxCCND, self).start()
 
     def stop(self):
         command = self.get('command') or ''
@@ -299,31 +218,29 @@ class LinuxCCND(LinuxApplication):
     @property
     def state(self):
         if self._state == ResourceState.STARTED:
-            # To avoid overwhelming the remote hosts and the local processor
-            # with too many ssh queries, the state is only requested
-            # every 'state_check_delay' seconds.
+            # we executed the ccndstart command. This should have started
+            # a remote ccnd daemon. The way we can query wheather ccnd is
+            # still running is by executing the ccndstatus command.
             state_check_delay = 0.5
             if strfdiff(strfnow(), self._last_state_check) > state_check_delay:
-                # check if execution errors occurred
-                (out, err), proc = self.node.check_errors(self.app_home)
+                env = self.get('env') or ""
+                environ = self.node.format_environment(env, inline = True)
+                command = environ + "; ccndstatus"
+                command = self.replace_paths(command)
+            
+                (out, err), proc = self.node.execute(command)
 
-                if out or err:
-                    if err.find("No such file or directory") >= 0 :
-                        # The resource is marked as started, but the
-                        # command was not yet executed
-                        return ResourceState.READY
+                retcode = proc.poll()
 
-                    msg = " Failed to execute command '%s'" % self.get("command")
+                if retcode == 1 and err.find("No such file or directory") > -1:
+                    # ccnd is not running (socket not found)
+                    self._state = ResourceState.FINISHED
+                elif retcode:
+                    # other error
+                    msg = " Failed to execute command '%s'" % command
                     self.error(msg, out, err)
                     self._state = ResourceState.FAILED
 
-                elif self.pid and self.ppid:
-                    status = self.node.status(self.pid, self.ppid)
-
-                    if status == ProcStatus.FINISHED:
-                        self._state = ResourceState.FINISHED
-
-
                 self._last_state_check = strfnow()
 
         return self._state
@@ -356,7 +273,8 @@ class LinuxCCND(LinuxApplication):
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
-                "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+                " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+                " echo 'sources found, nothing to do' "
             " ) || ( "
             # If not, untar and build
                 " ( "
@@ -373,8 +291,10 @@ class LinuxCCND(LinuxApplication):
         return (
             # Evaluate if ccnx binaries are already installed
             " ( "
-                "  test -f ${EXP_HOME}/ccnx/bin/ccnd"
+                " test -f ${EXP_HOME}/ccnx/bin/ccnd && "
+                " echo 'sources found, nothing to do' "
             " ) || ( "
+            # If not, install
                 "  mkdir -p ${EXP_HOME}/ccnx/bin && "
                 "  cp -r ${SOURCES}/ccnx ${EXP_HOME}"
             " )"
index 7ac2d12..c2c825e 100644 (file)
@@ -388,7 +388,7 @@ class LinuxNode(ResourceManager):
         (out, err), proc = self.check_errors(home, ecodefile, stderr)
 
         # Out is what was written in the stderr file
-        if out or err:
+        if err:
             msg = " Failed to run command '%s' " % command
             self.error(msg, out, err)
 
@@ -434,8 +434,7 @@ class LinuxNode(ResourceManager):
                 } 
 
         # Export environment
-        environ = "\n".join(map(lambda e: "export %s" % e, env.split(" "))) + "\n" \
-            if env else ""
+        environ = self.format_environment(env)
 
         # Add environ to command
         command = environ + command
@@ -443,6 +442,16 @@ class LinuxNode(ResourceManager):
         dst = os.path.join(home, shfile)
         return self.upload(command, dst, text = True)
 
+    def format_environment(self, env, inline = False):
+        """Format environmental variables for command to be executed either
+        as an inline command (i.e. PYTHONPATH=src/.. python script.py) or
+        as a bash script (i.e. export PYTHONPATH=src/.. \n export LALA=.. \n)
+        """
+        sep = " " if inline else "\n"
+        export = " " if inline else "export"
+        return sep.join(map(lambda e: "%s %s" % (export, e), env.split(" "))) \
+                + sep if env else ""
+
     def check_errors(self, home, 
             ecodefile = "exitcode", 
             stderr = "stderr"):
@@ -450,11 +459,12 @@ class LinuxNode(ResourceManager):
         Checks whether errors occurred while running a command.
         It first checks the exit code for the command, and only if the
         exit code is an error one it returns the error output.
+
         """
         out = err = ""
         proc = None
 
-        # get Exit code
+        # get exit code saved in the 'exitcode' file
         ecode = self.exitcode(home, ecodefile)
 
         if ecode in [ ExitCode.CORRUPTFILE, ExitCode.ERROR ]:
@@ -463,12 +473,18 @@ class LinuxNode(ResourceManager):
             # The process returned an error code or didn't exist. 
             # Check standard error.
             (out, err), proc = self.check_output(home, stderr)
-            
-            # If the stderr file was not found, assume nothing happened.
-            # We just ignore the error.
+
+            # If the stderr file was not found, assume nothing bad happened,
+            # and just ignore the error.
             # (cat returns 1 for error "No such file or directory")
             if ecode == ExitCode.FILENOTFOUND and proc.poll() == 1: 
                 out = err = ""
+            else:
+                # The actual error (read from the stderr file) is in 'out'.
+                # We swap the variables to avoid confusion. It is more
+                # intuitive to find the 'error' in err variable.
+                err = out
+                out = ""
        
         return (out, err), proc
  
index 218370c..b403546 100644 (file)
 #    You should have received a copy of the GNU General Public License
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
+#         Martin Ferrari <martin.ferrari@inria.fr>
+
+
 
 import ctypes
 import imp
index e354b42..fffdea5 100644 (file)
@@ -15,6 +15,8 @@
 #    You should have received a copy of the GNU General Public License
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
+# Author: Claudio Freire <claudio-daniel.freire@inria.fr>
+#
 
 import threading
 import Queue
index 259f578..0c3f86c 100755 (executable)
@@ -153,11 +153,13 @@ class LinuxNodeTestCase(unittest.TestCase):
  
         # get the pid of the process
         ecode = node.exitcode(app_home)
+
         # bash erro 127 - command not found
         self.assertEquals(ecode, 127)
  
         (out, err), proc = node.check_errors(app_home)
-        self.assertNotEquals(out, "")
+
+        self.assertEquals(err.strip(), "./cmd.sh: line 1: unexistent-command: command not found")
 
     @skipIfNotAlive
     def t_install(self, host, user):