Fixed nasty concurrency bug in EC
[nepi.git] / src / nepi / execution / ec.py
index 262aecf..b5663c0 100644 (file)
@@ -370,10 +370,12 @@ class ExperimentController(object):
 
         """
         rm = self.get_resource(guid)
+        state = rm.state
+
         if hr:
-            return ResourceState2str.get(rm.state)
+            return ResourceState2str.get(state)
 
-        return rm.state
+        return state
 
     def stop(self, guid):
         """ Stop a specific RM defined by its 'guid'
@@ -487,7 +489,7 @@ class ExperimentController(object):
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
-        #random.shuffle(group)
+        random.shuffle(group)
 
         def wait_all_and_start(group):
             reschedule = False
@@ -525,7 +527,6 @@ class ExperimentController(object):
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
-
     def release(self, group = None):
         """ Release the elements of the list 'group' or 
         all the resources if any group is specified
@@ -592,7 +593,7 @@ class ExperimentController(object):
 
         if track:
             self._tasks[task.id] = task
-  
+
         # Notify condition to wake up the processing thread
         self._notify()
 
@@ -601,6 +602,8 @@ class ExperimentController(object):
     def _process(self):
         """ Process scheduled tasks.
 
+        .. note::
+
         The _process method is executed in an independent thread held by the 
         ExperimentController for as long as the experiment is running.
         
@@ -641,51 +644,55 @@ class ExperimentController(object):
         try:
             while not self.finished:
                 self._cond.acquire()
+
                 task = self._scheduler.next()
-                self._cond.release()
                 
                 if not task:
-                    # It there are not tasks in the tasks queue we need to 
-                    # wait until a call to schedule wakes us up
-                    self._cond.acquire()
+                    # No task to execute. Wait for a new task to be scheduled.
                     self._cond.wait()
-                    self._cond.release()
-                else: 
-                    # If the task timestamp is in the future the thread needs to wait
-                    # until time elapse or until another task is scheduled
+                else:
+                    # The task timestamp is in the future. Wait for timeout 
+                    # or until another task is scheduled.
                     now = strfnow()
                     if now < task.timestamp:
-                        # Calculate time difference in seconds
+                        # Calculate timeout in seconds
                         timeout = strfdiff(task.timestamp, now)
+
                         # Re-schedule task with the same timestamp
                         self._scheduler.schedule(task)
-                        # Sleep until timeout or until a new task awakes the condition
-                        self._cond.acquire()
+                        
+                        task = None
+
+                        # Wait timeout or until a new task awakes the condition
                         self._cond.wait(timeout)
-                        self._cond.release()
-                    else:
-                        # Process tasks in parallel
-                        runner.put(self._execute, task)
+               
+                self._cond.release()
+
+                if task:
+                    # Process tasks in parallel
+                    runner.put(self._execute, task)
         except: 
             import traceback
             err = traceback.format_exc()
-            self._logger.error("Error while processing tasks in the EC: %s" % err)
+            self.logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
         finally:   
-            self._logger.info("Exiting the task processing loop ... ")
+            self.logger.debug("Exiting the task processing loop ... ")
             runner.sync()
 
     def _execute(self, task):
         """ Executes a single task. 
 
-            If the invokation of the task callback raises an
-            exception, the processing thread of the ExperimentController
-            will be stopped and the experiment will be aborted.
-
             :param task: Object containing the callback to execute
             :type task: Task
 
+        .. note::
+
+        If the invokation of the task callback raises an
+        exception, the processing thread of the ExperimentController
+        will be stopped and the experiment will be aborted.
+
         """
         # Invoke callback
         task.status = TaskStatus.DONE
@@ -698,7 +705,7 @@ class ExperimentController(object):
             task.result = err
             task.status = TaskStatus.ERROR
             
-            self._logger.error("Error occurred while executing task: %s" % err)
+            self.logger.error("Error occurred while executing task: %s" % err)
 
             # Set the EC to FAILED state (this will force to exit the task
             # processing thread)