ec_shutdown
[nepi.git] / src / nepi / execution / ec.py
index e59e626..30dd7a4 100644 (file)
@@ -36,6 +36,51 @@ import sys
 import time
 import threading
 
 import time
 import threading
 
+class FailurePolicy(object):
+    """ Defines how to respond to experiment failures  
+    """
+    IGNORE_RM_FAILURE = 1
+    ABORT_ON_RM_FAILURE = 2
+
+class FailureLevel(object):
+    """ Describe the system failure state
+    """
+    OK = 1
+    RM_FAILURE = 2
+    TASK_FAILURE = 3
+    EC_FAILURE = 4
+
+class FailureManager(object):
+    """ The FailureManager is responsible for handling errors,
+    and deciding whether an experiment should be aborted
+    """
+
+    def __init__(self, failure_policy = None):
+        self._failure_level = FailureLevel.OK
+        self._failure_policy = failure_policy or \
+                FailurePolicy.ABORT_ON_RM_FAILURE
+
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.EC_FAILURE:
+            return True
+
+        if self._failure_level in [FailureLevel.TASK_FAILURE, 
+                FailureLevel.RM_FAILURE] and \
+                        self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
+            return True
+
+        return False
+
+    def set_rm_failure(self):
+        self._failure_level = FailureLevel.RM_FAILURE
+
+    def set_task_failure(self):
+        self._failure_level = FailureLevel.TASK_FAILURE
+
+    def set_ec_failure(self):
+        self._failure_level = FailureLevel.EC_FAILURE
+
 class ECState(object):
     """ State of the Experiment Controller
    
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -49,19 +94,23 @@ class ExperimentController(object):
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
-                       It will be used in the name of the directory 
-                        where experiment related information is stored
         :type exp_id: str
 
     .. note::
 
         :type exp_id: str
 
     .. note::
 
-        An experiment, or scenario, is defined by a concrete use, behavior,
-        configuration and interconnection of resources that describe a single
-        experiment case (We call this the experiment description). 
-        A same experiment (scenario) can be run many times.
+        An experiment, or scenario, is defined by a concrete set of resources,
+        behavior, configuration and interconnection of those resources. 
+        The Experiment Description (ED) is a detailed representation of a
+        single experiment. It contains all the necessary information to 
+        allow repeating the experiment. NEPI allows to describe
+        experiments by registering components (resources), configuring them
+        and interconnecting them.
+        
+        A same experiment (scenario) can be executed many times, generating 
+        different results. We call an experiment execution (instance) a 'run'.
 
 
-        The ExperimentController (EC), is the entity responsible for 
-        managing an experiment instance (run). The same scenario can be 
+        The ExperimentController (EC), is the entity responsible of
+        managing an experiment run. The same scenario can be 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
@@ -75,15 +124,15 @@ class ExperimentController(object):
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
-        In order for a new type of resource to be supported in NEPI
-        a new RM must be implemented. NEPI already provides different
+        To support a new type of resource in NEPI, a new RM must be 
+        implemented. NEPI already provides a variety of
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
-        configure them and interconnect them, in order to describe an experiment.
+        configure them and interconnect them, to describe an experiment.
         Describing an experiment through the EC does not run the experiment.
         Describing an experiment through the EC does not run the experiment.
-        Only when the 'deploy()' method is invoked on the EC, will the EC take 
+        Only when the 'deploy()' method is invoked on the EC, the EC will take 
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
@@ -97,8 +146,8 @@ class ExperimentController(object):
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
-        exp_id, which can be re-used by different ExperimentController instances,
-        and the run_id, which unique to a ExperimentController instance, and
+        exp_id, which can be re-used in different ExperimentController,
+        and the run_id, which is unique to one ExperimentController instance, and
         is automatically generated by NEPI.
         
     """
         is automatically generated by NEPI.
         
     """
@@ -108,12 +157,16 @@ class ExperimentController(object):
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
-        # Run identifier. It identifies a concrete instance (run) of an experiment.
-        # Since a same experiment (same configuration) can be run many times,
-        # this id permits to identify concrete exoeriment run
+        # Run identifier. It identifies a concrete execution instance (run) 
+        # of an experiment.
+        # Since a same experiment (same configuration) can be executed many 
+        # times, this run_id permits to separate result files generated on 
+        # different experiment executions
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
+        # Identifies the experiment scenario (i.e. configuration, 
+        # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
@@ -128,7 +181,7 @@ class ExperimentController(object):
         # Tasks
         self._tasks = dict()
 
         # Tasks
         self._tasks = dict()
 
-        # RM groups 
+        # RM groups (for deployment) 
         self._groups = dict()
 
         # generator of globally unique id for groups
         self._groups = dict()
 
         # generator of globally unique id for groups
@@ -140,6 +193,12 @@ class ExperimentController(object):
         self._thread.setDaemon(True)
         self._thread.start()
 
         self._thread.setDaemon(True)
         self._thread.start()
 
+        # Flag to stop processing thread
+        self._stop = False
+    
+        # Entity in charge of managing system failures
+        self._fm = FailureManager()
+
         # EC state
         self._state = ECState.RUNNING
 
         # EC state
         self._state = ECState.RUNNING
 
@@ -172,69 +231,86 @@ class ExperimentController(object):
         return self._run_id
 
     @property
         return self._run_id
 
     @property
-    def finished(self):
-        """ Put the state of the Experiment Controller into a final state :
-            Either TERMINATED or FAILED
+    def abort(self):
+        return self._fm.abort
 
 
-        """
-        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+    def set_rm_failure(self):
+        self._fm.set_rm_failure()
 
     def wait_finished(self, guids):
 
     def wait_finished(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
+            RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
 
         :param guids: List of guids
         :type guids: list
+
         """
         """
-        return self.wait(guids)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STOPPED, 
+                quit = quit)
 
     def wait_started(self, guids):
 
     def wait_started(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STARTED or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.STARTED)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STARTED, 
+                quit = quit)
 
     def wait_released(self, guids):
 
     def wait_released(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state RELEASED (or FAILED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state = RELEASED or until the EC fails
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        # TODO: solve state concurrency BUG and !!!!
-        # correct waited release state to state = ResourceState.FAILED)
-        return self.wait(guids, state = ResourceState.FINISHED)
+
+        def quit():
+            return self._state == ECState.FAILED
+
+        return self.wait(guids, state = ResourceState.RELEASED, 
+                quit = quit)
 
     def wait_deployed(self, guids):
 
     def wait_deployed(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state READY (or any higher state)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= READY or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.READY)
 
 
-    def wait(self, guids, state = ResourceState.STOPPED):
-        """ Blocking method that waits until all the RM from the 'guid' list 
-            reached state 'state' or until a failure occurs
-            
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.READY, 
+                quit = quit)
+
+    def wait(self, guids, state, quit):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= 'state' or until quit yileds True
+           
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        # we randomly alter the order of the guids to avoid ordering
-        # dependencies (e.g. LinuxApplication RMs runing on the same
-        # linux host will be synchronized by the LinuxNode SSH lock)
-        random.shuffle(guids)
-
         while True:
         while True:
-            # If no more guids to wait for or an error occured, then exit
-            if len(guids) == 0 or self.finished:
+            # If there are no more guids to wait for
+            # or the quit function returns True, exit the loop
+            if len(guids) == 0 or quit():
                 break
 
             # If a guid reached one of the target states, remove it from list
                 break
 
             # If a guid reached one of the target states, remove it from list
@@ -245,26 +321,11 @@ class ExperimentController(object):
                 guids.remove(guid)
             else:
                 # Debug...
                 guids.remove(guid)
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
-                    self.state(guid, hr = True), state))
-
-                # Take the opportunity to 'refresh' the states of the RMs.
-                # Query only the first up to N guids (not to overwhelm 
-                # the local machine)
-                n = 100
-                lim = n if len(guids) > n else ( len(guids) -1 )
-                nguids = guids[0: lim]
-
-                # schedule state request for all guids (take advantage of
-                # scheduler multi threading).
-                for guid in nguids:
-                    callback = functools.partial(self.state, guid)
-                    self.schedule("0s", callback)
-
-                # If the guid is not in one of the target states, wait and
-                # continue quering. We keep the sleep big to decrease the
-                # number of RM state queries
-                time.sleep(4)
+                hrstate = ResourceState2str.get(rstate)
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+                    guid, rstate, state))
+
+            time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
   
     def get_task(self, tid):
         """ Get a specific task
@@ -574,7 +635,7 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not guids:
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not guids:
-            # If no guids list was indicated, all 'NEW' RMs will be deployed
+            # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
             guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
@@ -584,6 +645,7 @@ class ExperimentController(object):
             guids = [guids]
 
         # Create deployment group
             guids = [guids]
 
         # Create deployment group
+        # New guids can be added to a same deployment group later on
         new_group = False
         if not group:
             new_group = True
         new_group = False
         if not group:
             new_group = True
@@ -594,20 +656,9 @@ class ExperimentController(object):
 
         self._groups[group].extend(guids)
 
 
         self._groups[group].extend(guids)
 
-        # Before starting deployment we disorder the guids list with the
-        # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'guids' list closely
-        # resources one after another (e.g. all applications
-        # connected to the same node can likely appear one after another).
-        # This can originate a slow down in the deployment since the N 
-        # threads the parallel runner uses to processes tasks may all
-        # be taken up by the same family of resources waiting for the 
-        # same conditions (e.g. LinuxApplications running on a same 
-        # node share a single lock, so they will tend to be serialized).
-        # If we disorder the guids list, this problem can be mitigated.
-        random.shuffle(guids)
-
         def wait_all_and_start(group):
         def wait_all_and_start(group):
+            # Function that checks if all resources are READY
+            # before scheduling a start_with_conditions for each RM
             reschedule = False
             
             # Get all guids in group
             reschedule = False
             
             # Get all guids in group
@@ -630,9 +681,9 @@ class ExperimentController(object):
         if wait_all_ready and new_group:
             # Schedule a function to check that all resources are
             # READY, and only then schedule the start.
         if wait_all_ready and new_group:
             # Schedule a function to check that all resources are
             # READY, and only then schedule the start.
-            # This aimes at reducing the number of tasks looping in the 
+            # This aims at reducing the number of tasks looping in the 
             # scheduler. 
             # scheduler. 
-            # Intead of having N start tasks, we will have only one for 
+            # Instead of having many start tasks, we will have only one for 
             # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
             # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
@@ -672,11 +723,17 @@ class ExperimentController(object):
         Releases all the resources and stops task processing thread
 
         """
         Releases all the resources and stops task processing thread
 
         """
+        # TODO: Clean the parallel runner!! STOP all ongoing tasks
+        ####
+
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
+        # Stop processing thread
+        self._stop = True
+
         # Notify condition to wake up the processing thread
         self._notify()
         
         # Notify condition to wake up the processing thread
         self._notify()
         
@@ -754,8 +811,8 @@ class ExperimentController(object):
         runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
         runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
-        try:
-            while not self.finished:
+        while not self._stop:
+            try:
                 self._cond.acquire()
 
                 task = self._scheduler.next()
                 self._cond.acquire()
 
                 task = self._scheduler.next()
@@ -784,16 +841,20 @@ class ExperimentController(object):
                 if task:
                     # Process tasks in parallel
                     runner.put(self._execute, task)
                 if task:
                     # Process tasks in parallel
                     runner.put(self._execute, task)
-        except: 
-            import traceback
-            err = traceback.format_exc()
-            self.logger.error("Error while processing tasks in the EC: %s" % err)
+            except: 
+                import traceback
+                err = traceback.format_exc()
+                self.logger.error("Error while processing tasks in the EC: %s" % err)
 
 
-            self._state = ECState.FAILED
-        finally:   
-            self.logger.debug("Exiting the task processing loop ... ")
-            runner.sync()
-            runner.destroy()
+                # Set the EC to FAILED state 
+                self._state = ECState.FAILED
+
+                # Set the FailureManager failure level
+                self._fm.set_ec_failure()
+
+        self.logger.debug("Exiting the task processing loop ... ")
+        runner.sync()
+        runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -821,15 +882,8 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the EC to FAILED state (this will force to exit the task
-            # processing thread)
-            self._state = ECState.FAILED
-
-            # Notify condition to wake up the processing thread
-            self._notify()
-
-            # Propage error to the ParallelRunner
-            raise
+            # Set the FailureManager failure level
+            self._fm.set_task_failure()
 
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
 
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting