ec_shutdown
[nepi.git] / src / nepi / execution / ec.py
index e59e626..30dd7a4 100644 (file)
@@ -36,6 +36,51 @@ import sys
 import time
 import threading
 
+class FailurePolicy(object):
+    """ Defines how to respond to experiment failures  
+    """
+    IGNORE_RM_FAILURE = 1
+    ABORT_ON_RM_FAILURE = 2
+
+class FailureLevel(object):
+    """ Describe the system failure state
+    """
+    OK = 1
+    RM_FAILURE = 2
+    TASK_FAILURE = 3
+    EC_FAILURE = 4
+
+class FailureManager(object):
+    """ The FailureManager is responsible for handling errors,
+    and deciding whether an experiment should be aborted
+    """
+
+    def __init__(self, failure_policy = None):
+        self._failure_level = FailureLevel.OK
+        self._failure_policy = failure_policy or \
+                FailurePolicy.ABORT_ON_RM_FAILURE
+
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.EC_FAILURE:
+            return True
+
+        if self._failure_level in [FailureLevel.TASK_FAILURE, 
+                FailureLevel.RM_FAILURE] and \
+                        self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
+            return True
+
+        return False
+
+    def set_rm_failure(self):
+        self._failure_level = FailureLevel.RM_FAILURE
+
+    def set_task_failure(self):
+        self._failure_level = FailureLevel.TASK_FAILURE
+
+    def set_ec_failure(self):
+        self._failure_level = FailureLevel.EC_FAILURE
+
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -49,19 +94,23 @@ class ExperimentController(object):
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
-                       It will be used in the name of the directory 
-                        where experiment related information is stored
         :type exp_id: str
 
     .. note::
 
-        An experiment, or scenario, is defined by a concrete use, behavior,
-        configuration and interconnection of resources that describe a single
-        experiment case (We call this the experiment description). 
-        A same experiment (scenario) can be run many times.
+        An experiment, or scenario, is defined by a concrete set of resources,
+        behavior, configuration and interconnection of those resources. 
+        The Experiment Description (ED) is a detailed representation of a
+        single experiment. It contains all the necessary information to 
+        allow repeating the experiment. NEPI allows to describe
+        experiments by registering components (resources), configuring them
+        and interconnecting them.
+        
+        A same experiment (scenario) can be executed many times, generating 
+        different results. We call an experiment execution (instance) a 'run'.
 
-        The ExperimentController (EC), is the entity responsible for 
-        managing an experiment instance (run). The same scenario can be 
+        The ExperimentController (EC), is the entity responsible of
+        managing an experiment run. The same scenario can be 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
@@ -75,15 +124,15 @@ class ExperimentController(object):
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
-        In order for a new type of resource to be supported in NEPI
-        a new RM must be implemented. NEPI already provides different
+        To support a new type of resource in NEPI, a new RM must be 
+        implemented. NEPI already provides a variety of
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
-        configure them and interconnect them, in order to describe an experiment.
+        configure them and interconnect them, to describe an experiment.
         Describing an experiment through the EC does not run the experiment.
-        Only when the 'deploy()' method is invoked on the EC, will the EC take 
+        Only when the 'deploy()' method is invoked on the EC, the EC will take 
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
@@ -97,8 +146,8 @@ class ExperimentController(object):
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
-        exp_id, which can be re-used by different ExperimentController instances,
-        and the run_id, which unique to a ExperimentController instance, and
+        exp_id, which can be re-used in different ExperimentController,
+        and the run_id, which is unique to one ExperimentController instance, and
         is automatically generated by NEPI.
         
     """
@@ -108,12 +157,16 @@ class ExperimentController(object):
         # Logging
         self._logger = logging.getLogger("ExperimentController")
 
-        # Run identifier. It identifies a concrete instance (run) of an experiment.
-        # Since a same experiment (same configuration) can be run many times,
-        # this id permits to identify concrete exoeriment run
+        # Run identifier. It identifies a concrete execution instance (run) 
+        # of an experiment.
+        # Since a same experiment (same configuration) can be executed many 
+        # times, this run_id permits to separate result files generated on 
+        # different experiment executions
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
+        # Identifies the experiment scenario (i.e. configuration, 
+        # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
@@ -128,7 +181,7 @@ class ExperimentController(object):
         # Tasks
         self._tasks = dict()
 
-        # RM groups 
+        # RM groups (for deployment) 
         self._groups = dict()
 
         # generator of globally unique id for groups
@@ -140,6 +193,12 @@ class ExperimentController(object):
         self._thread.setDaemon(True)
         self._thread.start()
 
+        # Flag to stop processing thread
+        self._stop = False
+    
+        # Entity in charge of managing system failures
+        self._fm = FailureManager()
+
         # EC state
         self._state = ECState.RUNNING
 
@@ -172,69 +231,86 @@ class ExperimentController(object):
         return self._run_id
 
     @property
-    def finished(self):
-        """ Put the state of the Experiment Controller into a final state :
-            Either TERMINATED or FAILED
+    def abort(self):
+        return self._fm.abort
 
-        """
-        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+    def set_rm_failure(self):
+        self._fm.set_rm_failure()
 
     def wait_finished(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or 
+            RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
+
         """
-        return self.wait(guids)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STOPPED, 
+                quit = quit)
 
     def wait_started(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STARTED or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.STARTED)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STARTED, 
+                quit = quit)
 
     def wait_released(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state RELEASED (or FAILED)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state = RELEASED or until the EC fails
 
         :param guids: List of guids
         :type guids: list
         """
-        # TODO: solve state concurrency BUG and !!!!
-        # correct waited release state to state = ResourceState.FAILED)
-        return self.wait(guids, state = ResourceState.FINISHED)
+
+        def quit():
+            return self._state == ECState.FAILED
+
+        return self.wait(guids, state = ResourceState.RELEASED, 
+                quit = quit)
 
     def wait_deployed(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state READY (or any higher state)
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= READY or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, state = ResourceState.READY)
 
-    def wait(self, guids, state = ResourceState.STOPPED):
-        """ Blocking method that waits until all the RM from the 'guid' list 
-            reached state 'state' or until a failure occurs
-            
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.READY, 
+                quit = quit)
+
+    def wait(self, guids, state, quit):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= 'state' or until quit yileds True
+           
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        # we randomly alter the order of the guids to avoid ordering
-        # dependencies (e.g. LinuxApplication RMs runing on the same
-        # linux host will be synchronized by the LinuxNode SSH lock)
-        random.shuffle(guids)
-
         while True:
-            # If no more guids to wait for or an error occured, then exit
-            if len(guids) == 0 or self.finished:
+            # If there are no more guids to wait for
+            # or the quit function returns True, exit the loop
+            if len(guids) == 0 or quit():
                 break
 
             # If a guid reached one of the target states, remove it from list
@@ -245,26 +321,11 @@ class ExperimentController(object):
                 guids.remove(guid)
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
-                    self.state(guid, hr = True), state))
-
-                # Take the opportunity to 'refresh' the states of the RMs.
-                # Query only the first up to N guids (not to overwhelm 
-                # the local machine)
-                n = 100
-                lim = n if len(guids) > n else ( len(guids) -1 )
-                nguids = guids[0: lim]
-
-                # schedule state request for all guids (take advantage of
-                # scheduler multi threading).
-                for guid in nguids:
-                    callback = functools.partial(self.state, guid)
-                    self.schedule("0s", callback)
-
-                # If the guid is not in one of the target states, wait and
-                # continue quering. We keep the sleep big to decrease the
-                # number of RM state queries
-                time.sleep(4)
+                hrstate = ResourceState2str.get(rstate)
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+                    guid, rstate, state))
+
+            time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
@@ -574,7 +635,7 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not guids:
-            # If no guids list was indicated, all 'NEW' RMs will be deployed
+            # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
@@ -584,6 +645,7 @@ class ExperimentController(object):
             guids = [guids]
 
         # Create deployment group
+        # New guids can be added to a same deployment group later on
         new_group = False
         if not group:
             new_group = True
@@ -594,20 +656,9 @@ class ExperimentController(object):
 
         self._groups[group].extend(guids)
 
-        # Before starting deployment we disorder the guids list with the
-        # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'guids' list closely
-        # resources one after another (e.g. all applications
-        # connected to the same node can likely appear one after another).
-        # This can originate a slow down in the deployment since the N 
-        # threads the parallel runner uses to processes tasks may all
-        # be taken up by the same family of resources waiting for the 
-        # same conditions (e.g. LinuxApplications running on a same 
-        # node share a single lock, so they will tend to be serialized).
-        # If we disorder the guids list, this problem can be mitigated.
-        random.shuffle(guids)
-
         def wait_all_and_start(group):
+            # Function that checks if all resources are READY
+            # before scheduling a start_with_conditions for each RM
             reschedule = False
             
             # Get all guids in group
@@ -630,9 +681,9 @@ class ExperimentController(object):
         if wait_all_ready and new_group:
             # Schedule a function to check that all resources are
             # READY, and only then schedule the start.
-            # This aimes at reducing the number of tasks looping in the 
+            # This aims at reducing the number of tasks looping in the 
             # scheduler. 
-            # Intead of having N start tasks, we will have only one for 
+            # Instead of having many start tasks, we will have only one for 
             # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             self.schedule("1s", callback)
@@ -672,11 +723,17 @@ class ExperimentController(object):
         Releases all the resources and stops task processing thread
 
         """
+        # TODO: Clean the parallel runner!! STOP all ongoing tasks
+        ####
+
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
+        # Stop processing thread
+        self._stop = True
+
         # Notify condition to wake up the processing thread
         self._notify()
         
@@ -754,8 +811,8 @@ class ExperimentController(object):
         runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
-        try:
-            while not self.finished:
+        while not self._stop:
+            try:
                 self._cond.acquire()
 
                 task = self._scheduler.next()
@@ -784,16 +841,20 @@ class ExperimentController(object):
                 if task:
                     # Process tasks in parallel
                     runner.put(self._execute, task)
-        except: 
-            import traceback
-            err = traceback.format_exc()
-            self.logger.error("Error while processing tasks in the EC: %s" % err)
+            except: 
+                import traceback
+                err = traceback.format_exc()
+                self.logger.error("Error while processing tasks in the EC: %s" % err)
 
-            self._state = ECState.FAILED
-        finally:   
-            self.logger.debug("Exiting the task processing loop ... ")
-            runner.sync()
-            runner.destroy()
+                # Set the EC to FAILED state 
+                self._state = ECState.FAILED
+
+                # Set the FailureManager failure level
+                self._fm.set_ec_failure()
+
+        self.logger.debug("Exiting the task processing loop ... ")
+        runner.sync()
+        runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -821,15 +882,8 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the EC to FAILED state (this will force to exit the task
-            # processing thread)
-            self._state = ECState.FAILED
-
-            # Notify condition to wake up the processing thread
-            self._notify()
-
-            # Propage error to the ParallelRunner
-            raise
+            # Set the FailureManager failure level
+            self._fm.set_task_failure()
 
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting