Adding Linux Application scalability tests
[nepi.git] / src / neco / execution / ec.py
index b793d0e..65ef175 100644 (file)
@@ -1,5 +1,6 @@
 import logging
 import os
+import random
 import sys
 import time
 import threading
@@ -14,6 +15,7 @@ from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 # TODO: Improve speed. Too slow... !!
+# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
 
 class ECState(object):
     RUNNING = 1
@@ -265,18 +267,41 @@ class ExperimentController(object):
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
+        stop = []
+
         def steps(rm):
-            rm.deploy()
-            rm.start_with_conditions()
+            try:
+                rm.deploy()
+                rm.start_with_conditions()
+
+                # Only if the RM has STOP consitions we
+                # schedule a stop. Otherwise the RM will stop immediately
+                if rm.conditions.get(ResourceAction.STOP):
+                    rm.stop_with_conditions()
+            except:
+                import traceback
+                err = traceback.format_exc()
+                
+                self._logger.error("Error occurred while deploying resources: %s" % err)
 
-            # Only if the RM has STOP consitions we
-            # schedule a stop. Otherwise the RM will stop immediately
-            if rm.conditions.get(ResourceAction.STOP):
-                rm.stop_with_conditions()
+                # stop deployment
+                stop.append(None)
 
         if not group:
             group = self.resources
 
+        # Before starting deployment we disorder the group list with the
+        # purpose of speeding up the whole deployment process.
+        # It is likely that the user inserted in the 'group' list closely
+        # resources resources one after another (e.g. all applications
+        # connected to the same node can likely appear one after another).
+        # This can originate a slow down in the deployment since the N 
+        # threads the parallel runner uses to processes tasks may all
+        # be taken up by the same family of resources waiting for the 
+        # same conditions. 
+        # If we disorder the group list, this problem can be mitigated
+        random.shuffle(group)
+
         threads = []
         for guid in group:
             rm = self.get_resource(guid)
@@ -292,13 +317,22 @@ class ExperimentController(object):
             thread.setDaemon(True)
             thread.start()
 
-        while list(threads) and not self.finished:
+        while list(threads) and not self.finished and not stop:
             thread = threads[0]
             # Time out after 5 seconds to check EC not terminated
-            thread.join(5)
+            thread.join(1)
             if not thread.is_alive():
                 threads.remove(thread)
 
+        if stop:
+            # stop the scheduler
+            self._stop_scheduler()
+
+            if self._thread.is_alive():
+               self._thread.join()
+
+            raise RuntimeError, "Error occurred, interrupting deployment " 
+
     def release(self, group = None):
         if not group:
             group = self.resources
@@ -317,16 +351,12 @@ class ExperimentController(object):
             thread.join(5)
             if not thread.is_alive():
                 threads.remove(thread)
-        
-        self._state = ECState.TERMINATED
 
     def shutdown(self):
         self.release()
-        
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
 
+        self._stop_scheduler()
+        
         if self._thread.is_alive():
            self._thread.join()
 
@@ -399,7 +429,8 @@ class ExperimentController(object):
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-            return
+        finally:
+            runner.sync()
    
         # Mark EC state as terminated
         if self.ecstate == ECState.RUNNING:
@@ -419,14 +450,18 @@ class ExperimentController(object):
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
-            # Mark the EC as failed
-            self._state = ECState.FAILED
-
-            # Wake up the EC in case it was sleeping
-            self._cond.acquire()
-            self._cond.notify()
-            self._cond.release()
+            self._stop_scheduler()
 
             # Propage error to the ParallelRunner
             raise
 
+    def _stop_scheduler(self):
+        # Mark the EC as failed
+        self._state = ECState.FAILED
+
+        # Wake up the EC in case it was sleeping
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+
+