X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=b5663c0faedc6496e2097e7ceaa1ea08b8244774;hb=4c5d308e0d13c0dc4b54556f149bc2a9cd585592;hp=262aecf656217203c6efe94f58487eea332f347e;hpb=78a5f5ae901920e205fb257e889a3b9b3659b44e;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 262aecf6..b5663c0f 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -370,10 +370,12 @@ class ExperimentController(object): """ rm = self.get_resource(guid) + state = rm.state + if hr: - return ResourceState2str.get(rm.state) + return ResourceState2str.get(state) - return rm.state + return state def stop(self, guid): """ Stop a specific RM defined by its 'guid' @@ -487,7 +489,7 @@ class ExperimentController(object): # same conditions (e.g. LinuxApplications running on a same # node share a single lock, so they will tend to be serialized). # If we disorder the group list, this problem can be mitigated. - #random.shuffle(group) + random.shuffle(group) def wait_all_and_start(group): reschedule = False @@ -525,7 +527,6 @@ class ExperimentController(object): # schedule a stop. Otherwise the RM will stop immediately self.schedule("2s", rm.stop_with_conditions) - def release(self, group = None): """ Release the elements of the list 'group' or all the resources if any group is specified @@ -592,7 +593,7 @@ class ExperimentController(object): if track: self._tasks[task.id] = task - + # Notify condition to wake up the processing thread self._notify() @@ -601,6 +602,8 @@ class ExperimentController(object): def _process(self): """ Process scheduled tasks. + .. note:: + The _process method is executed in an independent thread held by the ExperimentController for as long as the experiment is running. @@ -641,51 +644,55 @@ class ExperimentController(object): try: while not self.finished: self._cond.acquire() + task = self._scheduler.next() - self._cond.release() if not task: - # It there are not tasks in the tasks queue we need to - # wait until a call to schedule wakes us up - self._cond.acquire() + # No task to execute. Wait for a new task to be scheduled. self._cond.wait() - self._cond.release() - else: - # If the task timestamp is in the future the thread needs to wait - # until time elapse or until another task is scheduled + else: + # The task timestamp is in the future. Wait for timeout + # or until another task is scheduled. now = strfnow() if now < task.timestamp: - # Calculate time difference in seconds + # Calculate timeout in seconds timeout = strfdiff(task.timestamp, now) + # Re-schedule task with the same timestamp self._scheduler.schedule(task) - # Sleep until timeout or until a new task awakes the condition - self._cond.acquire() + + task = None + + # Wait timeout or until a new task awakes the condition self._cond.wait(timeout) - self._cond.release() - else: - # Process tasks in parallel - runner.put(self._execute, task) + + self._cond.release() + + if task: + # Process tasks in parallel + runner.put(self._execute, task) except: import traceback err = traceback.format_exc() - self._logger.error("Error while processing tasks in the EC: %s" % err) + self.logger.error("Error while processing tasks in the EC: %s" % err) self._state = ECState.FAILED finally: - self._logger.info("Exiting the task processing loop ... ") + self.logger.debug("Exiting the task processing loop ... ") runner.sync() def _execute(self, task): """ Executes a single task. - If the invokation of the task callback raises an - exception, the processing thread of the ExperimentController - will be stopped and the experiment will be aborted. - :param task: Object containing the callback to execute :type task: Task + .. note:: + + If the invokation of the task callback raises an + exception, the processing thread of the ExperimentController + will be stopped and the experiment will be aborted. + """ # Invoke callback task.status = TaskStatus.DONE @@ -698,7 +705,7 @@ class ExperimentController(object): task.result = err task.status = TaskStatus.ERROR - self._logger.error("Error occurred while executing task: %s" % err) + self.logger.error("Error occurred while executing task: %s" % err) # Set the EC to FAILED state (this will force to exit the task # processing thread)