Removing ResourceState.FINISHED beacuse it is redundant with STOPPED
[nepi.git] / src / nepi / execution / ec.py
index 0a90dec..0d0e477 100644 (file)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
@@ -34,9 +26,56 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
-# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
+import functools
+import logging
+import os
+import random
+import sys
+import time
+import threading
+import weakref
+
+class FailureLevel(object):
+    """ Describes the system failure state
+    """
+    OK = 1
+    RM_FAILURE = 2
+    EC_FAILURE = 3
+
+class FailureManager(object):
+    """ The FailureManager is responsible for handling errors,
+    and deciding whether an experiment should be aborted
+    """
+
+    def __init__(self, ec):
+        self._ec = weakref.ref(ec)
+        self._failure_level = FailureLevel.OK
+
+    @property
+    def ec(self):
+        """ Returns the Experiment Controller """
+        return self._ec()
+
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.OK:
+            for guid in self.ec.resources:
+                state = self.ec.state(guid)
+                critical = self.ec.get(guid, "critical")
+                if state == ResourceState.FAILED and critical:
+                    self._failure_level = FailureLevel.RM_FAILURE
+                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+                    break
+
+        return self._failure_level != FailureLevel.OK
+
+    def set_ec_failure(self):
+        self._failure_level = FailureLevel.EC_FAILURE
+
+
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -50,19 +89,23 @@ class ExperimentController(object):
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
-                       It will be used in the name of the directory 
-                        where experiment related information is stored
         :type exp_id: str
 
     .. note::
 
-        An experiment, or scenario, is defined by a concrete use, behavior,
-        configuration and interconnection of resources that describe a single
-        experiment case (We call this the experiment description). 
-        A same experiment (scenario) can be run many times.
+        An experiment, or scenario, is defined by a concrete set of resources,
+        behavior, configuration and interconnection of those resources. 
+        The Experiment Description (ED) is a detailed representation of a
+        single experiment. It contains all the necessary information to 
+        allow repeating the experiment. NEPI allows to describe
+        experiments by registering components (resources), configuring them
+        and interconnecting them.
+        
+        A same experiment (scenario) can be executed many times, generating 
+        different results. We call an experiment execution (instance) a 'run'.
 
-        The ExperimentController (EC), is the entity responsible for 
-        managing an experiment instance (run). The same scenario can be 
+        The ExperimentController (EC), is the entity responsible of
+        managing an experiment run. The same scenario can be 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
@@ -76,15 +119,15 @@ class ExperimentController(object):
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
-        In order for a new type of resource to be supported in NEPI
-        a new RM must be implemented. NEPI already provides different
+        To support a new type of resource in NEPI, a new RM must be 
+        implemented. NEPI already provides a variety of
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
-        configure them and interconnect them, in order to describe an experiment.
+        configure them and interconnect them, to describe an experiment.
         Describing an experiment through the EC does not run the experiment.
-        Only when the 'deploy()' method is invoked on the EC, will the EC take 
+        Only when the 'deploy()' method is invoked on the EC, the EC will take 
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
@@ -98,22 +141,27 @@ class ExperimentController(object):
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
-        exp_id, which can be re-used by different ExperimentController instances,
-        and the run_id, which unique to a ExperimentController instance, and
+        exp_id, which can be re-used in different ExperimentController,
+        and the run_id, which is unique to one ExperimentController instance, and
         is automatically generated by NEPI.
         
     """
 
     def __init__(self, exp_id = None): 
         super(ExperimentController, self).__init__()
-        # root directory to store files
+        # Logging
+        self._logger = logging.getLogger("ExperimentController")
 
-        # Run identifier. It identifies a concrete instance (run) of an experiment.
-        # Since a same experiment (same configuration) can be run many times,
-        # this id permits to identify concrete exoeriment run
+        # Run identifier. It identifies a concrete execution instance (run) 
+        # of an experiment.
+        # Since a same experiment (same configuration) can be executed many 
+        # times, this run_id permits to separate result files generated on 
+        # different experiment executions
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
+        # Identifies the experiment scenario (i.e. configuration, 
+        # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
@@ -122,24 +170,40 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
-        # Scheduler
+        # Scheduler. It a queue that holds tasks scheduled for
+        # execution, and yields the next task to be executed 
+        # ordered by execution and arrival time
         self._scheduler = HeapScheduler()
 
         # Tasks
         self._tasks = dict()
 
+        # RM groups (for deployment) 
+        self._groups = dict()
+
+        # generator of globally unique id for groups
+        self._group_id_generator = guid.GuidGenerator()
+
+        # Flag to stop processing thread
+        self._stop = False
+    
+        # Entity in charge of managing system failures
+        self._fm = FailureManager(self)
+
+        # EC state
+        self._state = ECState.RUNNING
+
+        # The runner is a pool of threads used to parallelize 
+        # execution of tasks
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+        self._runner = ParallelRun(maxthreads = nthreads)
+
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
 
-        # EC state
-        self._state = ECState.RUNNING
-
-        # Logging
-        self._logger = logging.getLogger("ExperimentController")
-
     @property
     def logger(self):
         """ Return the logger of the Experiment Controller
@@ -169,84 +233,105 @@ class ExperimentController(object):
         return self._run_id
 
     @property
-    def finished(self):
-        """ Put the state of the Experiment Controller into a final state :
-            Either TERMINATED or FAILED
-
-        """
-        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+    def abort(self):
+        return self._fm.abort
 
     def wait_finished(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STOPPED (i.e. STOPPED, FAILED or 
+            RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
+
         """
-        return self.wait(guids)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STOPPED, 
+                quit = quit)
 
     def wait_started(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STARTED or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.STARTED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
 
-    def wait(self, guids, states = [ResourceState.FINISHED, 
-            ResourceState.FAILED,
-            ResourceState.STOPPED]):
-        """ Blocking method that waits until all the RM from the 'guid' list 
-            reached state 'state' or until a failure occurs
-            
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STARTED, 
+                quit = quit)
+
+    def wait_released(self, guids):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state = RELEASED or until the EC fails
+
+        :param guids: List of guids
+        :type guids: list
+        """
+
+        def quit():
+            return self._state == ECState.FAILED
+
+        return self.wait(guids, state = ResourceState.RELEASED, 
+                quit = quit)
+
+    def wait_deployed(self, guids):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= READY or until a System Failure occurs 
+            (e.g. Task Failure) 
+
+        :param guids: List of guids
+        :type guids: list
+        """
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.READY, 
+                quit = quit)
+
+    def wait(self, guids, state, quit):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= 'state' or until quit yileds True
+           
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        # we randomly alter the order of the guids to avoid ordering
-        # dependencies (e.g. LinuxApplication RMs runing on the same
-        # linux host will be synchronized by the LinuxNode SSH lock)
-        random.shuffle(guids)
+        # Make a copy to avoid modifying the original guids list
+        guids = list(guids)
 
         while True:
-            # If no more guids to wait for or an error occured, then exit
-            if len(guids) == 0 or self.finished:
+            # If there are no more guids to wait for
+            # or the quit function returns True, exit the loop
+            if len(guids) == 0 or quit():
                 break
 
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
-            state = self.state(guid)
+            rstate = self.state(guid)
+            
+            hrrstate = ResourceState2str.get(rstate)
+            hrstate = ResourceState2str.get(state)
 
-            if state in states:
+            if rstate >= state:
                 guids.remove(guid)
+                rm = self.get_resource(guid)
+                self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
+                    rm.get_rtype(), guid, hrrstate, hrstate))
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR %g - state %s " % (guid,
-                    self.state(guid, hr = True)))
-
-                # Take the opportunity to 'refresh' the states of the RMs.
-                # Query only the first up to N guids (not to overwhelm 
-                # the local machine)
-                n = 100
-                lim = n if len(guids) > n else ( len(guids) -1 )
-                nguids = guids[0: lim]
-
-                # schedule state request for all guids (take advantage of
-                # scheduler multi threading).
-                for guid in nguids:
-                    callback = functools.partial(self.state, guid)
-                    self.schedule("0s", callback)
-
-                # If the guid is not in one of the target states, wait and
-                # continue quering. We keep the sleep big to decrease the
-                # number of RM state queries
-                time.sleep(2)
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+                    guid, hrrstate, hrstate))
+                time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
@@ -322,44 +407,53 @@ class ExperimentController(object):
         rm1.register_connection(guid2)
         rm2.register_connection(guid1)
 
-    def register_condition(self, group1, action, group2, state,
+    def register_condition(self, guids1, action, guids2, state,
             time = None):
-        """ Registers an action START or STOP for all RM on group1 to occur 
-            time 'time' after all elements in group2 reached state 'state'.
+        """ Registers an action START or STOP for all RM on guids1 to occur 
+            time 'time' after all elements in guids2 reached state 'state'.
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid1)
-            rm.register_condition(action, group2, state, time)
+            rm.register_condition(action, guids2, state, time)
 
-    def register_trace(self, guid, name):
+    def enable_trace(self, guid, name):
         """ Enable trace
 
         :param name: Name of the trace
         :type name: str
         """
         rm = self.get_resource(guid)
-        rm.register_trace(name)
+        rm.enable_trace(name)
+
+    def trace_enabled(self, guid, name):
+        """ Returns True if trace is enabled
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        rm = self.get_resource(guid)
+        return rm.trace_enabled(name)
 
     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         """ Get information on collected trace
@@ -473,10 +567,10 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start()
 
-    def set_with_conditions(self, name, value, group1, group2, state,
+    def set_with_conditions(self, name, value, guids1, guids2, state,
             time = None):
         """ Set value 'value' on attribute with name 'name' on all RMs of
-            group1 when 'time' has elapsed since all elements in group
+            guids1 when 'time' has elapsed since all elements in guids
             have reached state 'state'.
 
             :param name: Name of attribute to set in RM
@@ -485,160 +579,155 @@ class ExperimentController(object):
             :param value: Value of attribute to set in RM
             :type name: string
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid)
-            rm.set_with_conditions(name, value, group2, state, time)
-
-    def stop_with_conditions(self, guid):
-        """ Stop a specific RM defined by its 'guid' only if all the conditions are true
-
-            :param guid: Guid of the RM
-            :type guid: int
-
-        """
-        rm = self.get_resource(guid)
-        return rm.stop_with_conditions()
-
-    def start_with_conditions(self, guid):
-        """ Start a specific RM defined by its 'guid' only if all the conditions are true
-
-            :param guid: Guid of the RM
-            :type guid: int
-
-        """
-        rm = self.get_resource(guid)
-        return rm.start_with_condition()
+            rm.set_with_conditions(name, value, guids2, state, time)
 
-    def deploy(self, group = None, wait_all_ready = True):
-        """ Deploy all resource manager in group
+    def deploy(self, guids = None, wait_all_ready = True, group = None):
+        """ Deploy all resource manager in guids list
 
-        :param group: List of guids of RMs to deploy
-        :type group: list
+        :param guids: List of guids of RMs to deploy
+        :type guids: list
 
         :param wait_all_ready: Wait until all RMs are ready in
             order to start the RMs
         :type guid: int
 
+        :param group: Id of deployment group in which to deploy RMs
+        :type group: int
+
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
-        if not group:
-            # By default, if not deployment group is indicated, 
-            # all RMs that are undeployed will be deployed
-            group = []
+        if not guids:
+            # If no guids list was passed, all 'NEW' RMs will be deployed
+            guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
-                    group.append(guid)
+                    guids.append(guid)
                 
-        if isinstance(group, int):
-            group = [group]
-
-        # Before starting deployment we disorder the group list with the
-        # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'group' list closely
-        # resources one after another (e.g. all applications
-        # connected to the same node can likely appear one after another).
-        # This can originate a slow down in the deployment since the N 
-        # threads the parallel runner uses to processes tasks may all
-        # be taken up by the same family of resources waiting for the 
-        # same conditions (e.g. LinuxApplications running on a same 
-        # node share a single lock, so they will tend to be serialized).
-        # If we disorder the group list, this problem can be mitigated.
-        random.shuffle(group)
+        if isinstance(guids, int):
+            guids = [guids]
+
+        # Create deployment group
+        # New guids can be added to a same deployment group later on
+        new_group = False
+        if not group:
+            new_group = True
+            group = self._group_id_generator.next()
+
+        if group not in self._groups:
+            self._groups[group] = []
+
+        self._groups[group].extend(guids)
 
         def wait_all_and_start(group):
+            # Function that checks if all resources are READY
+            # before scheduling a start_with_conditions for each RM
             reschedule = False
-            for guid in group:
+            
+            # Get all guids in group
+            guids = self._groups[group]
+
+            for guid in guids:
                 if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
 
             if reschedule:
+
                 callback = functools.partial(wait_all_and_start, group)
                 self.schedule("1s", callback)
             else:
-                # If all resources are read, we schedule the start
-                for guid in group:
+                # If all resources are ready, we schedule the start
+                for guid in guids:
                     rm = self.get_resource(guid)
                     self.schedule("0s", rm.start_with_conditions)
 
-        if wait_all_ready:
-            # Schedule the function that will check all resources are
-            # READY, and only then it will schedule the start.
-            # This is aimed to reduce the number of tasks looping in the scheduler.
-            # Intead of having N start tasks, we will have only one
+        if wait_all_ready and new_group:
+            # Schedule a function to check that all resources are
+            # READY, and only then schedule the start.
+            # This aims at reducing the number of tasks looping in the 
+            # scheduler. 
+            # Instead of having many start tasks, we will have only one for 
+            # the whole group.
             callback = functools.partial(wait_all_and_start, group)
-            self.schedule("1s", callback)
+            self.schedule("0s", callback)
 
-        for guid in group:
+        for guid in guids:
             rm = self.get_resource(guid)
-            self.schedule("0s", rm.deploy)
+            rm.deployment_group = group
+            self.schedule("0s", rm.deploy_with_conditions)
 
             if not wait_all_ready:
-                self.schedule("1s", rm.start_with_conditions)
+                self.schedule("0s", rm.start_with_conditions)
 
             if rm.conditions.get(ResourceAction.STOP):
                 # Only if the RM has STOP conditions we
                 # schedule a stop. Otherwise the RM will stop immediately
-                self.schedule("2s", rm.stop_with_conditions)
+                self.schedule("0s", rm.stop_with_conditions)
 
-    def release(self, group = None):
-        """ Release the elements of the list 'group' or 
-        all the resources if any group is specified
+    def release(self, guids = None):
+        """ Release al RMs on the guids list or 
+        all the resources if no list is specified
 
-            :param group: List of RM
-            :type group: list
+            :param guids: List of RM guids
+            :type guids: list
 
         """
-        if not group:
-            group = self.resources
+        if not guids:
+            guids = self.resources
 
-        threads = []
-        for guid in group:
+        # Remove all pending tasks from the scheduler queue
+        for tid in list(self._scheduler.pending):
+            self._scheduler.remove(tid)
+
+        self._runner.empty()
+
+        for guid in guids:
             rm = self.get_resource(guid)
-            thread = threading.Thread(target=rm.release)
-            threads.append(thread)
-            thread.setDaemon(True)
-            thread.start()
-
-        while list(threads) and not self.finished:
-            thread = threads[0]
-            # Time out after 5 seconds to check EC not terminated
-            thread.join(5)
-            if not thread.is_alive():
-                threads.remove(thread)
+            self.schedule("0s", rm.release)
+
+        self.wait_released(guids)
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
         Releases all the resources and stops task processing thread
 
         """
+        # If there was a major failure we can't exit gracefully
+        if self._state == ECState.FAILED:
+            raise RuntimeError("EC failure. Can not exit gracefully")
+
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
+        # Stop processing thread
+        self._stop = True
+
         # Notify condition to wake up the processing thread
         self._notify()
         
@@ -711,13 +800,11 @@ class ExperimentController(object):
         that might have been raised by the workers.
 
         """
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
-        runner = ParallelRun(maxthreads = nthreads)
-        runner.start()
+        self._runner.start()
 
-        try:
-            while not self.finished:
+        while not self._stop:
+            try:
                 self._cond.acquire()
 
                 task = self._scheduler.next()
@@ -745,16 +832,22 @@ class ExperimentController(object):
 
                 if task:
                     # Process tasks in parallel
-                    runner.put(self._execute, task)
-        except: 
-            import traceback
-            err = traceback.format_exc()
-            self.logger.error("Error while processing tasks in the EC: %s" % err)
+                    self._runner.put(self._execute, task)
+            except: 
+                import traceback
+                err = traceback.format_exc()
+                self.logger.error("Error while processing tasks in the EC: %s" % err)
+
+                # Set the EC to FAILED state 
+                self._state = ECState.FAILED
+            
+                # Set the FailureManager failure level to EC failure
+                self._fm.set_ec_failure()
 
-            self._state = ECState.FAILED
-        finally:   
-            self.logger.debug("Exiting the task processing loop ... ")
-            runner.sync()
+        self.logger.debug("Exiting the task processing loop ... ")
+        
+        self._runner.sync()
+        self._runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -782,16 +875,6 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the EC to FAILED state (this will force to exit the task
-            # processing thread)
-            self._state = ECState.FAILED
-
-            # Notify condition to wake up the processing thread
-            self._notify()
-
-            # Propage error to the ParallelRunner
-            raise
-
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
         for a new task to be scheduled.