"""
rm = self.get_resource(guid)
+ state = rm.state
+
if hr:
- return ResourceState2str.get(rm.state)
+ return ResourceState2str.get(state)
- return rm.state
+ return state
def stop(self, guid):
""" Stop a specific RM defined by its 'guid'
# same conditions (e.g. LinuxApplications running on a same
# node share a single lock, so they will tend to be serialized).
# If we disorder the group list, this problem can be mitigated.
- #random.shuffle(group)
+ random.shuffle(group)
def wait_all_and_start(group):
reschedule = False
# schedule a stop. Otherwise the RM will stop immediately
self.schedule("2s", rm.stop_with_conditions)
-
def release(self, group = None):
""" Release the elements of the list 'group' or
all the resources if any group is specified
if track:
self._tasks[task.id] = task
-
+
# Notify condition to wake up the processing thread
self._notify()
def _process(self):
""" Process scheduled tasks.
+ .. note::
+
The _process method is executed in an independent thread held by the
ExperimentController for as long as the experiment is running.
try:
while not self.finished:
self._cond.acquire()
+
task = self._scheduler.next()
- self._cond.release()
if not task:
- # It there are not tasks in the tasks queue we need to
- # wait until a call to schedule wakes us up
- self._cond.acquire()
+ # No task to execute. Wait for a new task to be scheduled.
self._cond.wait()
- self._cond.release()
- else:
- # If the task timestamp is in the future the thread needs to wait
- # until time elapse or until another task is scheduled
+ else:
+ # The task timestamp is in the future. Wait for timeout
+ # or until another task is scheduled.
now = strfnow()
if now < task.timestamp:
- # Calculate time difference in seconds
+ # Calculate timeout in seconds
timeout = strfdiff(task.timestamp, now)
+
# Re-schedule task with the same timestamp
self._scheduler.schedule(task)
- # Sleep until timeout or until a new task awakes the condition
- self._cond.acquire()
+
+ task = None
+
+ # Wait timeout or until a new task awakes the condition
self._cond.wait(timeout)
- self._cond.release()
- else:
- # Process tasks in parallel
- runner.put(self._execute, task)
+
+ self._cond.release()
+
+ if task:
+ # Process tasks in parallel
+ runner.put(self._execute, task)
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
finally:
- self._logger.info("Exiting the task processing loop ... ")
+ self.logger.debug("Exiting the task processing loop ... ")
runner.sync()
def _execute(self, task):
""" Executes a single task.
- If the invokation of the task callback raises an
- exception, the processing thread of the ExperimentController
- will be stopped and the experiment will be aborted.
-
:param task: Object containing the callback to execute
:type task: Task
+ .. note::
+
+ If the invokation of the task callback raises an
+ exception, the processing thread of the ExperimentController
+ will be stopped and the experiment will be aborted.
+
"""
# Invoke callback
task.status = TaskStatus.DONE
task.result = err
task.status = TaskStatus.ERROR
- self._logger.error("Error occurred while executing task: %s" % err)
+ self.logger.error("Error occurred while executing task: %s" % err)
# Set the EC to FAILED state (this will force to exit the task
# processing thread)