Flushing scheduler before shutdown
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 22 Oct 2013 13:50:36 +0000 (15:50 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 22 Oct 2013 13:50:36 +0000 (15:50 +0200)
src/nepi/execution/ec.py
src/nepi/execution/scheduler.py
src/nepi/util/parallel.py

index 30dd7a4..790a338 100644 (file)
@@ -321,9 +321,10 @@ class ExperimentController(object):
                 guids.remove(guid)
             else:
                 # Debug...
-                hrstate = ResourceState2str.get(rstate)
+                hrrstate = ResourceState2str.get(rstate)
+                hrstate = ResourceState2str.get(state)
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
-                    guid, rstate, state))
+                    guid, hrrstate, hrstate))
 
             time.sleep(0.5)
   
@@ -598,26 +599,6 @@ class ExperimentController(object):
             rm = self.get_resource(guid)
             rm.set_with_conditions(name, value, guids2, state, time)
 
-    def stop_with_conditions(self, guid):
-        """ Stop a specific RM defined by its 'guid' only if all the conditions are true
-
-            :param guid: Guid of the RM
-            :type guid: int
-
-        """
-        rm = self.get_resource(guid)
-        return rm.stop_with_conditions()
-
-    def start_with_conditions(self, guid):
-        """ Start a specific RM defined by its 'guid' only if all the conditions are true
-
-            :param guid: Guid of the RM
-            :type guid: int
-
-        """
-        rm = self.get_resource(guid)
-        return rm.start_with_conditions()
-
     def deploy(self, guids = None, wait_all_ready = True, group = None):
         """ Deploy all resource manager in guids list
 
@@ -673,7 +654,7 @@ class ExperimentController(object):
                 callback = functools.partial(wait_all_and_start, group)
                 self.schedule("1s", callback)
             else:
-                # If all resources are read, we schedule the start
+                # If all resources are ready, we schedule the start
                 for guid in guids:
                     rm = self.get_resource(guid)
                     self.schedule("0s", rm.start_with_conditions)
@@ -686,7 +667,7 @@ class ExperimentController(object):
             # Instead of having many start tasks, we will have only one for 
             # the whole group.
             callback = functools.partial(wait_all_and_start, group)
-            self.schedule("1s", callback)
+            self.schedule("0s", callback)
 
         for guid in guids:
             rm = self.get_resource(guid)
@@ -694,12 +675,12 @@ class ExperimentController(object):
             self.schedule("0s", rm.deploy_with_conditions)
 
             if not wait_all_ready:
-                self.schedule("1s", rm.start_with_conditions)
+                self.schedule("0s", rm.start_with_conditions)
 
             if rm.conditions.get(ResourceAction.STOP):
                 # Only if the RM has STOP conditions we
                 # schedule a stop. Otherwise the RM will stop immediately
-                self.schedule("2s", rm.stop_with_conditions)
+                self.schedule("0s", rm.stop_with_conditions)
 
     def release(self, guids = None):
         """ Release al RMs on the guids list or 
@@ -712,6 +693,10 @@ class ExperimentController(object):
         if not guids:
             guids = self.resources
 
+        # Remove all pending tasks from the scheduler queue
+        for tis in self._scheduler.pending:
+            self._scheduler.remove(tid)
+
         for guid in guids:
             rm = self.get_resource(guid)
             self.schedule("0s", rm.release)
@@ -723,9 +708,6 @@ class ExperimentController(object):
         Releases all the resources and stops task processing thread
 
         """
-        # TODO: Clean the parallel runner!! STOP all ongoing tasks
-        ####
-
         self.release()
 
         # Mark the EC state as TERMINATED
index 53a7530..cbd9874 100644 (file)
@@ -25,7 +25,6 @@ class TaskStatus:
     DONE = 1
     ERROR = 2
 
-
 class Task(object):
     """ This class is to define a task, that is represented by an id,
     an execution time 'timestamp' and an action 'callback """
@@ -54,6 +53,11 @@ class HeapScheduler(object):
         self._valid = set()
         self._idgen = itertools.count(1)
 
+    @property
+    def pending(self):
+        """ Returns the list of pending task ids """
+        return self._valid
+
     def schedule(self, task):
         """ Add the task 'task' in the heap of the scheduler
 
index 6868c4a..f5d39d7 100644 (file)
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 #
 # Author: Claudio Freire <claudio-daniel.freire@inria.fr>
+#         Alina Quereilhac <alina.quereilhac@inria.fr>
 #
 
-# A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely!
-
 import threading
 import Queue
 import traceback