Adding Linux Application scalability tests
[nepi.git] / src / neco / execution / ec.py
index c9e4e06..65ef175 100644 (file)
@@ -1,5 +1,6 @@
 import logging
 import os
+import random
 import sys
 import time
 import threading
@@ -13,6 +14,13 @@ from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
 from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
+# TODO: Improve speed. Too slow... !!
+# TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
+
+class ECState(object):
+    RUNNING = 1
+    FAILED = 2
+    TERMINATED = 3
 
 class ExperimentController(object):
     def __init__(self, exp_id = None, root_dir = "/tmp"): 
@@ -29,9 +37,6 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
-        # Resource managers
-        self._group = dict()
-
         # Scheduler
         self._scheduler = HeapScheduler()
 
@@ -39,11 +44,14 @@ class ExperimentController(object):
         self._tasks = dict()
 
         # Event processing thread
-        self._stop = False
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
+        self._thread.setDaemon(True)
         self._thread.start()
 
+        # EC state
+        self._state = ECState.RUNNING
+
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
@@ -51,6 +59,10 @@ class ExperimentController(object):
     def logger(self):
         return self._logger
 
+    @property
+    def ecstate(self):
+        return self._state
+
     @property
     def exp_id(self):
         exp_id = self._exp_id
@@ -58,6 +70,17 @@ class ExperimentController(object):
             exp_id = "nepi-" + exp_id
         return exp_id
 
+    @property
+    def finished(self):
+        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+
+    def wait_finished(self, guids):
+        while not all([self.state(guid) == ResourceState.FINISHED \
+                for guid in guids]) and not self.finished:
+            # We keep the sleep as large as possible to 
+            # decrese the number of RM state requests
+            time.sleep(2)
+    
     def get_task(self, tid):
         return self._tasks.get(tid)
 
@@ -80,16 +103,6 @@ class ExperimentController(object):
 
         return guid
 
-    def register_group(self, group):
-        guid = self._guid_generator.next()
-
-        if not isinstance(group, list):
-            group = [group] 
-
-        self._groups[guid] = group
-
-        return guid
-
     def get_attributes(self, guid):
         rm = self.get_resource(guid)
         return rm.get_attributes()
@@ -254,18 +267,41 @@ class ExperimentController(object):
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
-        def steps(rm):
-            rm.deploy()
-            rm.start_with_conditions()
+        stop = []
 
-            # Only if the RM has STOP consitions we
-            # schedule a stop. Otherwise the RM will stop immediately
-            if rm.conditions.get(ResourceAction.STOP):
-                rm.stop_with_conditions()
+        def steps(rm):
+            try:
+                rm.deploy()
+                rm.start_with_conditions()
+
+                # Only if the RM has STOP consitions we
+                # schedule a stop. Otherwise the RM will stop immediately
+                if rm.conditions.get(ResourceAction.STOP):
+                    rm.stop_with_conditions()
+            except:
+                import traceback
+                err = traceback.format_exc()
+                
+                self._logger.error("Error occurred while deploying resources: %s" % err)
+
+                # stop deployment
+                stop.append(None)
 
         if not group:
             group = self.resources
 
+        # Before starting deployment we disorder the group list with the
+        # purpose of speeding up the whole deployment process.
+        # It is likely that the user inserted in the 'group' list closely
+        # resources resources one after another (e.g. all applications
+        # connected to the same node can likely appear one after another).
+        # This can originate a slow down in the deployment since the N 
+        # threads the parallel runner uses to processes tasks may all
+        # be taken up by the same family of resources waiting for the 
+        # same conditions. 
+        # If we disorder the group list, this problem can be mitigated
+        random.shuffle(group)
+
         threads = []
         for guid in group:
             rm = self.get_resource(guid)
@@ -278,10 +314,24 @@ class ExperimentController(object):
 
             thread = threading.Thread(target = steps, args = (rm,))
             threads.append(thread)
+            thread.setDaemon(True)
             thread.start()
 
-        for thread in threads:
-            thread.join()
+        while list(threads) and not self.finished and not stop:
+            thread = threads[0]
+            # Time out after 5 seconds to check EC not terminated
+            thread.join(1)
+            if not thread.is_alive():
+                threads.remove(thread)
+
+        if stop:
+            # stop the scheduler
+            self._stop_scheduler()
+
+            if self._thread.is_alive():
+               self._thread.join()
+
+            raise RuntimeError, "Error occurred, interrupting deployment " 
 
     def release(self, group = None):
         if not group:
@@ -292,18 +342,21 @@ class ExperimentController(object):
             rm = self.get_resource(guid)
             thread = threading.Thread(target=rm.release)
             threads.append(thread)
+            thread.setDaemon(True)
             thread.start()
 
-        for thread in threads:
-            thread.join()
+        while list(threads) and not self.finished:
+            thread = threads[0]
+            # Time out after 5 seconds to check EC not terminated
+            thread.join(5)
+            if not thread.is_alive():
+                threads.remove(thread)
 
     def shutdown(self):
         self.release()
+
+        self._stop_scheduler()
         
-        self._stop = True
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
         if self._thread.is_alive():
            self._thread.join()
 
@@ -342,7 +395,7 @@ class ExperimentController(object):
         runner.start()
 
         try:
-            while not self._stop:
+            while not self.finished:
                 self._cond.acquire()
                 task = self._scheduler.next()
                 self._cond.release()
@@ -369,11 +422,20 @@ class ExperimentController(object):
                     else:
                         # Process tasks in parallel
                         runner.put(self._execute, task)
-        except:  
+                
+        except: 
             import traceback
             err = traceback.format_exc()
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
+            self._state = ECState.FAILED
+        finally:
+            runner.sync()
+   
+        # Mark EC state as terminated
+        if self.ecstate == ECState.RUNNING:
+            self._state = ECState.TERMINATED
+
     def _execute(self, task):
         # Invoke callback
         task.status = TaskStatus.DONE
@@ -383,8 +445,23 @@ class ExperimentController(object):
         except:
             import traceback
             err = traceback.format_exc()
-            self._logger.error("Error while executing event: %s" % err)
-
             task.result = err
             task.status = TaskStatus.ERROR
+            
+            self._logger.error("Error occurred while executing task: %s" % err)
+
+            self._stop_scheduler()
+
+            # Propage error to the ParallelRunner
+            raise
+
+    def _stop_scheduler(self):
+        # Mark the EC as failed
+        self._state = ECState.FAILED
+
+        # Wake up the EC in case it was sleeping
+        self._cond.acquire()
+        self._cond.notify()
+        self._cond.release()
+