def wait_finished(self, guids):
""" Blocking method that wait until all the RM from the 'guid' list
- reach the state FINISHED
+ reached the state FINISHED
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids)
+
+ def wait_started(self, guids):
+ """ Blocking method that wait until all the RM from the 'guid' list
+ reached the state STARTED
+
+ :param guids: List of guids
+ :type guids: list
+ """
+ return self.wait(guids, states = [ResourceState.STARTED,
+ ResourceState.STOPPED,
+ ResourceState.FINISHED])
+
+ def wait(self, guids, states = [ResourceState.FINISHED,
+ ResourceState.STOPPED]):
+ """ Blocking method that waits until all the RM from the 'guid' list
+ reached state 'state' or until a failure occurs
+
:param guids: List of guids
:type guids: list
"""
if isinstance(guids, int):
guids = [guids]
- while not all([self.state(guid) in [ResourceState.FINISHED,
- ResourceState.STOPPED,
- ResourceState.FAILED] \
- for guid in guids]) and not self.finished:
- # We keep the sleep as large as possible to
- # decrese the number of RM state requests
+ while not all([self.state(guid) in states for guid in guids]) and \
+ not any([self.state(guid) in [
+ ResourceState.FAILED] for guid in guids]) and \
+ not self.finished:
+ # debug logging
+ waited = ""
+ for guid in guids:
+ waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
+ self.logger.debug(" WAITING FOR %s " % waited )
+
+ # We keep the sleep big to decrease the number of RM state queries
time.sleep(2)
-
+
def get_task(self, tid):
""" Get a specific task
"""
rm = self.get_resource(guid)
+ state = rm.state
+
if hr:
- return ResourceState2str.get(rm.state)
+ return ResourceState2str.get(state)
- return rm.state
+ return state
def stop(self, guid):
""" Stop a specific RM defined by its 'guid'
self.logger.debug(" ------- DEPLOY START ------ ")
if not group:
- group = self.resources
-
+ # By default, if not deployment group is indicated,
+ # all RMs that are undeployed will be deployed
+ group = []
+ for guid in self.resources:
+ if self.state(guid) == ResourceState.NEW:
+ group.append(guid)
+
if isinstance(group, int):
group = [group]
# same conditions (e.g. LinuxApplications running on a same
# node share a single lock, so they will tend to be serialized).
# If we disorder the group list, this problem can be mitigated.
- #random.shuffle(group)
+ random.shuffle(group)
def wait_all_and_start(group):
reschedule = False
for guid in group:
- rm = self.get_resource(guid)
- if rm.state < ResourceState.READY:
+ if self.state(guid) < ResourceState.READY:
reschedule = True
break
# schedule a stop. Otherwise the RM will stop immediately
self.schedule("2s", rm.stop_with_conditions)
-
def release(self, group = None):
""" Release the elements of the list 'group' or
all the resources if any group is specified
if track:
self._tasks[task.id] = task
-
+
# Notify condition to wake up the processing thread
self._notify()
def _process(self):
""" Process scheduled tasks.
+ .. note::
+
The _process method is executed in an independent thread held by the
ExperimentController for as long as the experiment is running.
try:
while not self.finished:
self._cond.acquire()
+
task = self._scheduler.next()
- self._cond.release()
if not task:
- # It there are not tasks in the tasks queue we need to
- # wait until a call to schedule wakes us up
- self._cond.acquire()
+ # No task to execute. Wait for a new task to be scheduled.
self._cond.wait()
- self._cond.release()
- else:
- # If the task timestamp is in the future the thread needs to wait
- # until time elapse or until another task is scheduled
+ else:
+ # The task timestamp is in the future. Wait for timeout
+ # or until another task is scheduled.
now = strfnow()
if now < task.timestamp:
- # Calculate time difference in seconds
+ # Calculate timeout in seconds
timeout = strfdiff(task.timestamp, now)
+
# Re-schedule task with the same timestamp
self._scheduler.schedule(task)
- # Sleep until timeout or until a new task awakes the condition
- self._cond.acquire()
+
+ task = None
+
+ # Wait timeout or until a new task awakes the condition
self._cond.wait(timeout)
- self._cond.release()
- else:
- # Process tasks in parallel
- runner.put(self._execute, task)
+
+ self._cond.release()
+
+ if task:
+ # Process tasks in parallel
+ runner.put(self._execute, task)
except:
import traceback
err = traceback.format_exc()
- self._logger.error("Error while processing tasks in the EC: %s" % err)
+ self.logger.error("Error while processing tasks in the EC: %s" % err)
self._state = ECState.FAILED
finally:
+ self.logger.debug("Exiting the task processing loop ... ")
runner.sync()
def _execute(self, task):
""" Executes a single task.
- If the invokation of the task callback raises an
- exception, the processing thread of the ExperimentController
- will be stopped and the experiment will be aborted.
-
:param task: Object containing the callback to execute
:type task: Task
+ .. note::
+
+ If the invokation of the task callback raises an
+ exception, the processing thread of the ExperimentController
+ will be stopped and the experiment will be aborted.
+
"""
# Invoke callback
task.status = TaskStatus.DONE
task.result = err
task.status = TaskStatus.ERROR
- self._logger.error("Error occurred while executing task: %s" % err)
+ self.logger.error("Error occurred while executing task: %s" % err)
# Set the EC to FAILED state (this will force to exit the task
# processing thread)