LinuxApplication: stdin made symlink to file in shared directory
[nepi.git] / src / nepi / execution / ec.py
index 5ec3411..0a90dec 100644 (file)
@@ -194,10 +194,12 @@ class ExperimentController(object):
         """
         return self.wait(guids, states = [ResourceState.STARTED,
             ResourceState.STOPPED,
+            ResourceState.FAILED,
             ResourceState.FINISHED])
 
     def wait(self, guids, states = [ResourceState.FINISHED, 
-        ResourceState.STOPPED]):
+            ResourceState.FAILED,
+            ResourceState.STOPPED]):
         """ Blocking method that waits until all the RM from the 'guid' list 
             reached state 'state' or until a failure occurs
             
@@ -207,19 +209,45 @@ class ExperimentController(object):
         if isinstance(guids, int):
             guids = [guids]
 
-        while not all([self.state(guid) in states for guid in guids]) and \
-                not any([self.state(guid) in [
-                        ResourceState.FAILED] for guid in guids]) and \
-                not self.finished:
-            # debug logging
-            waited = ""
-            for guid in guids:
-                waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
-            self.logger.debug(" WAITING FOR %s " % waited )
-            
-            # We keep the sleep big to decrease the number of RM state queries
-            time.sleep(2)
-   
+        # we randomly alter the order of the guids to avoid ordering
+        # dependencies (e.g. LinuxApplication RMs runing on the same
+        # linux host will be synchronized by the LinuxNode SSH lock)
+        random.shuffle(guids)
+
+        while True:
+            # If no more guids to wait for or an error occured, then exit
+            if len(guids) == 0 or self.finished:
+                break
+
+            # If a guid reached one of the target states, remove it from list
+            guid = guids[0]
+            state = self.state(guid)
+
+            if state in states:
+                guids.remove(guid)
+            else:
+                # Debug...
+                self.logger.debug(" WAITING FOR %g - state %s " % (guid,
+                    self.state(guid, hr = True)))
+
+                # Take the opportunity to 'refresh' the states of the RMs.
+                # Query only the first up to N guids (not to overwhelm 
+                # the local machine)
+                n = 100
+                lim = n if len(guids) > n else ( len(guids) -1 )
+                nguids = guids[0: lim]
+
+                # schedule state request for all guids (take advantage of
+                # scheduler multi threading).
+                for guid in nguids:
+                    callback = functools.partial(self.state, guid)
+                    self.schedule("0s", callback)
+
+                # If the guid is not in one of the target states, wait and
+                # continue quering. We keep the sleep big to decrease the
+                # number of RM state queries
+                time.sleep(2)
+  
     def get_task(self, tid):
         """ Get a specific task