Code cleanup. Setting resource state through specific functions
[nepi.git] / src / nepi / execution / ec.py
index de013d1..6beb3d7 100644 (file)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
@@ -34,9 +26,16 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
-# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
+import functools
+import logging
+import os
+import random
+import sys
+import time
+import threading
+
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -182,7 +181,7 @@ class ExperimentController(object):
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED
+            reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
 
         :param guids: List of guids
         :type guids: list
@@ -191,31 +190,34 @@ class ExperimentController(object):
 
     def wait_started(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED
+            reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.STARTED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
+        return self.wait(guids, state = ResourceState.STARTED)
 
     def wait_released(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state RELEASED
+            reached the state RELEASED (or FAILED)
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        # TODO: solve state concurrency BUG and !!!!
+        # correct waited release state to state = ResourceState.FAILED)
+        return self.wait(guids, state = ResourceState.FINISHED)
+
+    def wait_deployed(self, guids):
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reached the state READY (or any higher state)
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.RELEASED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
+        return self.wait(guids, state = ResourceState.READY)
 
-    def wait(self, guids, states = [ResourceState.FINISHED, 
-            ResourceState.FAILED,
-            ResourceState.STOPPED]):
+    def wait(self, guids, state = ResourceState.STOPPED):
         """ Blocking method that waits until all the RM from the 'guid' list 
             reached state 'state' or until a failure occurs
             
@@ -237,14 +239,14 @@ class ExperimentController(object):
 
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
-            state = self.state(guid)
+            rstate = self.state(guid)
 
-            if state in states:
+            if rstate >= state:
                 guids.remove(guid)
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR %g - state %s " % (guid,
-                    self.state(guid, hr = True)))
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
+                    self.state(guid, hr = True), state))
 
                 # Take the opportunity to 'refresh' the states of the RMs.
                 # Query only the first up to N guids (not to overwhelm 
@@ -262,7 +264,7 @@ class ExperimentController(object):
                 # If the guid is not in one of the target states, wait and
                 # continue quering. We keep the sleep big to decrease the
                 # number of RM state queries
-                time.sleep(2)
+                time.sleep(4)
   
     def get_task(self, tid):
         """ Get a specific task