From: Alina Quereilhac Date: Tue, 22 Oct 2013 13:50:36 +0000 (+0200) Subject: Flushing scheduler before shutdown X-Git-Tag: nepi-3.0.0~26^2~5 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;ds=sidebyside;h=fba73e968df37bd1d1fde00f85c7a336b5d133b9;p=nepi.git Flushing scheduler before shutdown --- diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 30dd7a40..790a3381 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -321,9 +321,10 @@ class ExperimentController(object): guids.remove(guid) else: # Debug... - hrstate = ResourceState2str.get(rstate) + hrrstate = ResourceState2str.get(rstate) + hrstate = ResourceState2str.get(state) self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % ( - guid, rstate, state)) + guid, hrrstate, hrstate)) time.sleep(0.5) @@ -598,26 +599,6 @@ class ExperimentController(object): rm = self.get_resource(guid) rm.set_with_conditions(name, value, guids2, state, time) - def stop_with_conditions(self, guid): - """ Stop a specific RM defined by its 'guid' only if all the conditions are true - - :param guid: Guid of the RM - :type guid: int - - """ - rm = self.get_resource(guid) - return rm.stop_with_conditions() - - def start_with_conditions(self, guid): - """ Start a specific RM defined by its 'guid' only if all the conditions are true - - :param guid: Guid of the RM - :type guid: int - - """ - rm = self.get_resource(guid) - return rm.start_with_conditions() - def deploy(self, guids = None, wait_all_ready = True, group = None): """ Deploy all resource manager in guids list @@ -673,7 +654,7 @@ class ExperimentController(object): callback = functools.partial(wait_all_and_start, group) self.schedule("1s", callback) else: - # If all resources are read, we schedule the start + # If all resources are ready, we schedule the start for guid in guids: rm = self.get_resource(guid) self.schedule("0s", rm.start_with_conditions) @@ -686,7 +667,7 @@ class ExperimentController(object): # Instead of having many start tasks, we will have only one for # the whole group. callback = functools.partial(wait_all_and_start, group) - self.schedule("1s", callback) + self.schedule("0s", callback) for guid in guids: rm = self.get_resource(guid) @@ -694,12 +675,12 @@ class ExperimentController(object): self.schedule("0s", rm.deploy_with_conditions) if not wait_all_ready: - self.schedule("1s", rm.start_with_conditions) + self.schedule("0s", rm.start_with_conditions) if rm.conditions.get(ResourceAction.STOP): # Only if the RM has STOP conditions we # schedule a stop. Otherwise the RM will stop immediately - self.schedule("2s", rm.stop_with_conditions) + self.schedule("0s", rm.stop_with_conditions) def release(self, guids = None): """ Release al RMs on the guids list or @@ -712,6 +693,10 @@ class ExperimentController(object): if not guids: guids = self.resources + # Remove all pending tasks from the scheduler queue + for tis in self._scheduler.pending: + self._scheduler.remove(tid) + for guid in guids: rm = self.get_resource(guid) self.schedule("0s", rm.release) @@ -723,9 +708,6 @@ class ExperimentController(object): Releases all the resources and stops task processing thread """ - # TODO: Clean the parallel runner!! STOP all ongoing tasks - #### - self.release() # Mark the EC state as TERMINATED diff --git a/src/nepi/execution/scheduler.py b/src/nepi/execution/scheduler.py index 53a7530c..cbd9874f 100644 --- a/src/nepi/execution/scheduler.py +++ b/src/nepi/execution/scheduler.py @@ -25,7 +25,6 @@ class TaskStatus: DONE = 1 ERROR = 2 - class Task(object): """ This class is to define a task, that is represented by an id, an execution time 'timestamp' and an action 'callback """ @@ -54,6 +53,11 @@ class HeapScheduler(object): self._valid = set() self._idgen = itertools.count(1) + @property + def pending(self): + """ Returns the list of pending task ids """ + return self._valid + def schedule(self, task): """ Add the task 'task' in the heap of the scheduler diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index 6868c4aa..f5d39d78 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -16,10 +16,9 @@ # along with this program. If not, see . # # Author: Claudio Freire +# Alina Quereilhac # -# A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely! - import threading import Queue import traceback