Removing ResourceState.FINISHED beacuse it is redundant with STOPPED
[nepi.git] / src / nepi / execution / ec.py
index 0a90dec..0d0e477 100644 (file)
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 #
 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
-import functools
-import logging
-import os
-import random
-import sys
-import time
-import threading
-
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
 from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
@@ -34,9 +26,56 @@ from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 from nepi.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
-# TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
 
+import functools
+import logging
+import os
+import random
+import sys
+import time
+import threading
+import weakref
+
+class FailureLevel(object):
+    """ Describes the system failure state
+    """
+    OK = 1
+    RM_FAILURE = 2
+    EC_FAILURE = 3
+
+class FailureManager(object):
+    """ The FailureManager is responsible for handling errors,
+    and deciding whether an experiment should be aborted
+    """
+
+    def __init__(self, ec):
+        self._ec = weakref.ref(ec)
+        self._failure_level = FailureLevel.OK
+
+    @property
+    def ec(self):
+        """ Returns the Experiment Controller """
+        return self._ec()
+
+    @property
+    def abort(self):
+        if self._failure_level == FailureLevel.OK:
+            for guid in self.ec.resources:
+                state = self.ec.state(guid)
+                critical = self.ec.get(guid, "critical")
+                if state == ResourceState.FAILED and critical:
+                    self._failure_level = FailureLevel.RM_FAILURE
+                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
+                    break
+
+        return self._failure_level != FailureLevel.OK
+
+    def set_ec_failure(self):
+        self._failure_level = FailureLevel.EC_FAILURE
+
+
 class ECState(object):
     """ State of the Experiment Controller
    
 class ECState(object):
     """ State of the Experiment Controller
    
@@ -50,19 +89,23 @@ class ExperimentController(object):
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
     .. class:: Class Args :
       
         :param exp_id: Human readable identifier for the experiment scenario. 
-                       It will be used in the name of the directory 
-                        where experiment related information is stored
         :type exp_id: str
 
     .. note::
 
         :type exp_id: str
 
     .. note::
 
-        An experiment, or scenario, is defined by a concrete use, behavior,
-        configuration and interconnection of resources that describe a single
-        experiment case (We call this the experiment description). 
-        A same experiment (scenario) can be run many times.
+        An experiment, or scenario, is defined by a concrete set of resources,
+        behavior, configuration and interconnection of those resources. 
+        The Experiment Description (ED) is a detailed representation of a
+        single experiment. It contains all the necessary information to 
+        allow repeating the experiment. NEPI allows to describe
+        experiments by registering components (resources), configuring them
+        and interconnecting them.
+        
+        A same experiment (scenario) can be executed many times, generating 
+        different results. We call an experiment execution (instance) a 'run'.
 
 
-        The ExperimentController (EC), is the entity responsible for 
-        managing an experiment instance (run). The same scenario can be 
+        The ExperimentController (EC), is the entity responsible of
+        managing an experiment run. The same scenario can be 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
         recreated (and re-run) by instantiating an EC and recreating 
         the same experiment description. 
 
@@ -76,15 +119,15 @@ class ExperimentController(object):
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
         single resource. ResourceManagers are specific to a resource
         type (i.e. An RM to control a Linux application will not be
         the same as the RM used to control a ns-3 simulation).
-        In order for a new type of resource to be supported in NEPI
-        a new RM must be implemented. NEPI already provides different
+        To support a new type of resource in NEPI, a new RM must be 
+        implemented. NEPI already provides a variety of
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
         RMs to control basic resources, and new can be extended from
         the existing ones.
 
         Through the EC interface the user can create ResourceManagers (RMs),
-        configure them and interconnect them, in order to describe an experiment.
+        configure them and interconnect them, to describe an experiment.
         Describing an experiment through the EC does not run the experiment.
         Describing an experiment through the EC does not run the experiment.
-        Only when the 'deploy()' method is invoked on the EC, will the EC take 
+        Only when the 'deploy()' method is invoked on the EC, the EC will take 
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
         actions to transform the 'described' experiment into a 'running' experiment.
 
         While the experiment is running, it is possible to continue to
@@ -98,22 +141,27 @@ class ExperimentController(object):
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
         However, since a same 'experiment' can be run many times, the experiment
         id is not enough to identify an experiment instance (run).
         For this reason, the ExperimentController has two identifier, the 
-        exp_id, which can be re-used by different ExperimentController instances,
-        and the run_id, which unique to a ExperimentController instance, and
+        exp_id, which can be re-used in different ExperimentController,
+        and the run_id, which is unique to one ExperimentController instance, and
         is automatically generated by NEPI.
         
     """
 
     def __init__(self, exp_id = None): 
         super(ExperimentController, self).__init__()
         is automatically generated by NEPI.
         
     """
 
     def __init__(self, exp_id = None): 
         super(ExperimentController, self).__init__()
-        # root directory to store files
+        # Logging
+        self._logger = logging.getLogger("ExperimentController")
 
 
-        # Run identifier. It identifies a concrete instance (run) of an experiment.
-        # Since a same experiment (same configuration) can be run many times,
-        # this id permits to identify concrete exoeriment run
+        # Run identifier. It identifies a concrete execution instance (run) 
+        # of an experiment.
+        # Since a same experiment (same configuration) can be executed many 
+        # times, this run_id permits to separate result files generated on 
+        # different experiment executions
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
         self._run_id = tsformat()
 
         # Experiment identifier. Usually assigned by the user
+        # Identifies the experiment scenario (i.e. configuration, 
+        # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
         # generator of globally unique ids
@@ -122,24 +170,40 @@ class ExperimentController(object):
         # Resource managers
         self._resources = dict()
 
         # Resource managers
         self._resources = dict()
 
-        # Scheduler
+        # Scheduler. It a queue that holds tasks scheduled for
+        # execution, and yields the next task to be executed 
+        # ordered by execution and arrival time
         self._scheduler = HeapScheduler()
 
         # Tasks
         self._tasks = dict()
 
         self._scheduler = HeapScheduler()
 
         # Tasks
         self._tasks = dict()
 
+        # RM groups (for deployment) 
+        self._groups = dict()
+
+        # generator of globally unique id for groups
+        self._group_id_generator = guid.GuidGenerator()
+
+        # Flag to stop processing thread
+        self._stop = False
+    
+        # Entity in charge of managing system failures
+        self._fm = FailureManager(self)
+
+        # EC state
+        self._state = ECState.RUNNING
+
+        # The runner is a pool of threads used to parallelize 
+        # execution of tasks
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+        self._runner = ParallelRun(maxthreads = nthreads)
+
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
 
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
 
-        # EC state
-        self._state = ECState.RUNNING
-
-        # Logging
-        self._logger = logging.getLogger("ExperimentController")
-
     @property
     def logger(self):
         """ Return the logger of the Experiment Controller
     @property
     def logger(self):
         """ Return the logger of the Experiment Controller
@@ -169,84 +233,105 @@ class ExperimentController(object):
         return self._run_id
 
     @property
         return self._run_id
 
     @property
-    def finished(self):
-        """ Put the state of the Experiment Controller into a final state :
-            Either TERMINATED or FAILED
-
-        """
-        return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
+    def abort(self):
+        return self._fm.abort
 
     def wait_finished(self, guids):
 
     def wait_finished(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state FINISHED
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STOPPED (i.e. STOPPED, FAILED or 
+            RELEASED ) or until a System Failure occurs (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
 
         :param guids: List of guids
         :type guids: list
+
         """
         """
-        return self.wait(guids)
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STOPPED, 
+                quit = quit)
 
     def wait_started(self, guids):
 
     def wait_started(self, guids):
-        """ Blocking method that wait until all the RM from the 'guid' list 
-            reached the state STARTED
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= STARTED or until a System Failure occurs 
+            (e.g. Task Failure) 
 
         :param guids: List of guids
         :type guids: list
         """
 
         :param guids: List of guids
         :type guids: list
         """
-        return self.wait(guids, states = [ResourceState.STARTED,
-            ResourceState.STOPPED,
-            ResourceState.FAILED,
-            ResourceState.FINISHED])
 
 
-    def wait(self, guids, states = [ResourceState.FINISHED, 
-            ResourceState.FAILED,
-            ResourceState.STOPPED]):
-        """ Blocking method that waits until all the RM from the 'guid' list 
-            reached state 'state' or until a failure occurs
-            
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.STARTED, 
+                quit = quit)
+
+    def wait_released(self, guids):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state = RELEASED or until the EC fails
+
+        :param guids: List of guids
+        :type guids: list
+        """
+
+        def quit():
+            return self._state == ECState.FAILED
+
+        return self.wait(guids, state = ResourceState.RELEASED, 
+                quit = quit)
+
+    def wait_deployed(self, guids):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= READY or until a System Failure occurs 
+            (e.g. Task Failure) 
+
+        :param guids: List of guids
+        :type guids: list
+        """
+
+        def quit():
+            return self.abort
+
+        return self.wait(guids, state = ResourceState.READY, 
+                quit = quit)
+
+    def wait(self, guids, state, quit):
+        """ Blocking method that wait until all RMs in the 'guid' list 
+            reach a state >= 'state' or until quit yileds True
+           
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        # we randomly alter the order of the guids to avoid ordering
-        # dependencies (e.g. LinuxApplication RMs runing on the same
-        # linux host will be synchronized by the LinuxNode SSH lock)
-        random.shuffle(guids)
+        # Make a copy to avoid modifying the original guids list
+        guids = list(guids)
 
         while True:
 
         while True:
-            # If no more guids to wait for or an error occured, then exit
-            if len(guids) == 0 or self.finished:
+            # If there are no more guids to wait for
+            # or the quit function returns True, exit the loop
+            if len(guids) == 0 or quit():
                 break
 
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
                 break
 
             # If a guid reached one of the target states, remove it from list
             guid = guids[0]
-            state = self.state(guid)
+            rstate = self.state(guid)
+            
+            hrrstate = ResourceState2str.get(rstate)
+            hrstate = ResourceState2str.get(state)
 
 
-            if state in states:
+            if rstate >= state:
                 guids.remove(guid)
                 guids.remove(guid)
+                rm = self.get_resource(guid)
+                self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
+                    rm.get_rtype(), guid, hrrstate, hrstate))
             else:
                 # Debug...
             else:
                 # Debug...
-                self.logger.debug(" WAITING FOR %g - state %s " % (guid,
-                    self.state(guid, hr = True)))
-
-                # Take the opportunity to 'refresh' the states of the RMs.
-                # Query only the first up to N guids (not to overwhelm 
-                # the local machine)
-                n = 100
-                lim = n if len(guids) > n else ( len(guids) -1 )
-                nguids = guids[0: lim]
-
-                # schedule state request for all guids (take advantage of
-                # scheduler multi threading).
-                for guid in nguids:
-                    callback = functools.partial(self.state, guid)
-                    self.schedule("0s", callback)
-
-                # If the guid is not in one of the target states, wait and
-                # continue quering. We keep the sleep big to decrease the
-                # number of RM state queries
-                time.sleep(2)
+                self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
+                    guid, hrrstate, hrstate))
+                time.sleep(0.5)
   
     def get_task(self, tid):
         """ Get a specific task
   
     def get_task(self, tid):
         """ Get a specific task
@@ -322,44 +407,53 @@ class ExperimentController(object):
         rm1.register_connection(guid2)
         rm2.register_connection(guid1)
 
         rm1.register_connection(guid2)
         rm2.register_connection(guid1)
 
-    def register_condition(self, group1, action, group2, state,
+    def register_condition(self, guids1, action, guids2, state,
             time = None):
             time = None):
-        """ Registers an action START or STOP for all RM on group1 to occur 
-            time 'time' after all elements in group2 reached state 'state'.
+        """ Registers an action START or STOP for all RM on guids1 to occur 
+            time 'time' after all elements in guids2 reached state 'state'.
 
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid1)
             rm = self.get_resource(guid1)
-            rm.register_condition(action, group2, state, time)
+            rm.register_condition(action, guids2, state, time)
 
 
-    def register_trace(self, guid, name):
+    def enable_trace(self, guid, name):
         """ Enable trace
 
         :param name: Name of the trace
         :type name: str
         """
         rm = self.get_resource(guid)
         """ Enable trace
 
         :param name: Name of the trace
         :type name: str
         """
         rm = self.get_resource(guid)
-        rm.register_trace(name)
+        rm.enable_trace(name)
+
+    def trace_enabled(self, guid, name):
+        """ Returns True if trace is enabled
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        rm = self.get_resource(guid)
+        return rm.trace_enabled(name)
 
     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         """ Get information on collected trace
 
     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
         """ Get information on collected trace
@@ -473,10 +567,10 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start()
 
         rm = self.get_resource(guid)
         return rm.start()
 
-    def set_with_conditions(self, name, value, group1, group2, state,
+    def set_with_conditions(self, name, value, guids1, guids2, state,
             time = None):
         """ Set value 'value' on attribute with name 'name' on all RMs of
             time = None):
         """ Set value 'value' on attribute with name 'name' on all RMs of
-            group1 when 'time' has elapsed since all elements in group
+            guids1 when 'time' has elapsed since all elements in guids
             have reached state 'state'.
 
             :param name: Name of attribute to set in RM
             have reached state 'state'.
 
             :param name: Name of attribute to set in RM
@@ -485,160 +579,155 @@ class ExperimentController(object):
             :param value: Value of attribute to set in RM
             :type name: string
 
             :param value: Value of attribute to set in RM
             :type name: string
 
-            :param group1: List of guids of RMs subjected to action
-            :type group1: list
+            :param guids1: List of guids of RMs subjected to action
+            :type guids1: list
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
 
             :param action: Action to register (either START or STOP)
             :type action: ResourceAction
 
-            :param group2: List of guids of RMs to we waited for
-            :type group2: list
+            :param guids2: List of guids of RMs to we waited for
+            :type guids2: list
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
 
             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
             :type state: ResourceState
 
-            :param time: Time to wait after group2 has reached status 
+            :param time: Time to wait after guids2 has reached status 
             :type time: string
 
         """
             :type time: string
 
         """
-        if isinstance(group1, int):
-            group1 = [group1]
-        if isinstance(group2, int):
-            group2 = [group2]
+        if isinstance(guids1, int):
+            guids1 = [guids1]
+        if isinstance(guids2, int):
+            guids2 = [guids2]
 
 
-        for guid1 in group1:
+        for guid1 in guids1:
             rm = self.get_resource(guid)
             rm = self.get_resource(guid)
-            rm.set_with_conditions(name, value, group2, state, time)
-
-    def stop_with_conditions(self, guid):
-        """ Stop a specific RM defined by its 'guid' only if all the conditions are true
-
-            :param guid: Guid of the RM
-            :type guid: int
-
-        """
-        rm = self.get_resource(guid)
-        return rm.stop_with_conditions()
-
-    def start_with_conditions(self, guid):
-        """ Start a specific RM defined by its 'guid' only if all the conditions are true
-
-            :param guid: Guid of the RM
-            :type guid: int
-
-        """
-        rm = self.get_resource(guid)
-        return rm.start_with_condition()
+            rm.set_with_conditions(name, value, guids2, state, time)
 
 
-    def deploy(self, group = None, wait_all_ready = True):
-        """ Deploy all resource manager in group
+    def deploy(self, guids = None, wait_all_ready = True, group = None):
+        """ Deploy all resource manager in guids list
 
 
-        :param group: List of guids of RMs to deploy
-        :type group: list
+        :param guids: List of guids of RMs to deploy
+        :type guids: list
 
         :param wait_all_ready: Wait until all RMs are ready in
             order to start the RMs
         :type guid: int
 
 
         :param wait_all_ready: Wait until all RMs are ready in
             order to start the RMs
         :type guid: int
 
+        :param group: Id of deployment group in which to deploy RMs
+        :type group: int
+
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         """
         self.logger.debug(" ------- DEPLOY START ------ ")
 
-        if not group:
-            # By default, if not deployment group is indicated, 
-            # all RMs that are undeployed will be deployed
-            group = []
+        if not guids:
+            # If no guids list was passed, all 'NEW' RMs will be deployed
+            guids = []
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
             for guid in self.resources:
                 if self.state(guid) == ResourceState.NEW:
-                    group.append(guid)
+                    guids.append(guid)
                 
                 
-        if isinstance(group, int):
-            group = [group]
-
-        # Before starting deployment we disorder the group list with the
-        # purpose of speeding up the whole deployment process.
-        # It is likely that the user inserted in the 'group' list closely
-        # resources one after another (e.g. all applications
-        # connected to the same node can likely appear one after another).
-        # This can originate a slow down in the deployment since the N 
-        # threads the parallel runner uses to processes tasks may all
-        # be taken up by the same family of resources waiting for the 
-        # same conditions (e.g. LinuxApplications running on a same 
-        # node share a single lock, so they will tend to be serialized).
-        # If we disorder the group list, this problem can be mitigated.
-        random.shuffle(group)
+        if isinstance(guids, int):
+            guids = [guids]
+
+        # Create deployment group
+        # New guids can be added to a same deployment group later on
+        new_group = False
+        if not group:
+            new_group = True
+            group = self._group_id_generator.next()
+
+        if group not in self._groups:
+            self._groups[group] = []
+
+        self._groups[group].extend(guids)
 
         def wait_all_and_start(group):
 
         def wait_all_and_start(group):
+            # Function that checks if all resources are READY
+            # before scheduling a start_with_conditions for each RM
             reschedule = False
             reschedule = False
-            for guid in group:
+            
+            # Get all guids in group
+            guids = self._groups[group]
+
+            for guid in guids:
                 if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
 
             if reschedule:
                 if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
 
             if reschedule:
+
                 callback = functools.partial(wait_all_and_start, group)
                 self.schedule("1s", callback)
             else:
                 callback = functools.partial(wait_all_and_start, group)
                 self.schedule("1s", callback)
             else:
-                # If all resources are read, we schedule the start
-                for guid in group:
+                # If all resources are ready, we schedule the start
+                for guid in guids:
                     rm = self.get_resource(guid)
                     self.schedule("0s", rm.start_with_conditions)
 
                     rm = self.get_resource(guid)
                     self.schedule("0s", rm.start_with_conditions)
 
-        if wait_all_ready:
-            # Schedule the function that will check all resources are
-            # READY, and only then it will schedule the start.
-            # This is aimed to reduce the number of tasks looping in the scheduler.
-            # Intead of having N start tasks, we will have only one
+        if wait_all_ready and new_group:
+            # Schedule a function to check that all resources are
+            # READY, and only then schedule the start.
+            # This aims at reducing the number of tasks looping in the 
+            # scheduler. 
+            # Instead of having many start tasks, we will have only one for 
+            # the whole group.
             callback = functools.partial(wait_all_and_start, group)
             callback = functools.partial(wait_all_and_start, group)
-            self.schedule("1s", callback)
+            self.schedule("0s", callback)
 
 
-        for guid in group:
+        for guid in guids:
             rm = self.get_resource(guid)
             rm = self.get_resource(guid)
-            self.schedule("0s", rm.deploy)
+            rm.deployment_group = group
+            self.schedule("0s", rm.deploy_with_conditions)
 
             if not wait_all_ready:
 
             if not wait_all_ready:
-                self.schedule("1s", rm.start_with_conditions)
+                self.schedule("0s", rm.start_with_conditions)
 
             if rm.conditions.get(ResourceAction.STOP):
                 # Only if the RM has STOP conditions we
                 # schedule a stop. Otherwise the RM will stop immediately
 
             if rm.conditions.get(ResourceAction.STOP):
                 # Only if the RM has STOP conditions we
                 # schedule a stop. Otherwise the RM will stop immediately
-                self.schedule("2s", rm.stop_with_conditions)
+                self.schedule("0s", rm.stop_with_conditions)
 
 
-    def release(self, group = None):
-        """ Release the elements of the list 'group' or 
-        all the resources if any group is specified
+    def release(self, guids = None):
+        """ Release al RMs on the guids list or 
+        all the resources if no list is specified
 
 
-            :param group: List of RM
-            :type group: list
+            :param guids: List of RM guids
+            :type guids: list
 
         """
 
         """
-        if not group:
-            group = self.resources
+        if not guids:
+            guids = self.resources
 
 
-        threads = []
-        for guid in group:
+        # Remove all pending tasks from the scheduler queue
+        for tid in list(self._scheduler.pending):
+            self._scheduler.remove(tid)
+
+        self._runner.empty()
+
+        for guid in guids:
             rm = self.get_resource(guid)
             rm = self.get_resource(guid)
-            thread = threading.Thread(target=rm.release)
-            threads.append(thread)
-            thread.setDaemon(True)
-            thread.start()
-
-        while list(threads) and not self.finished:
-            thread = threads[0]
-            # Time out after 5 seconds to check EC not terminated
-            thread.join(5)
-            if not thread.is_alive():
-                threads.remove(thread)
+            self.schedule("0s", rm.release)
+
+        self.wait_released(guids)
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
         Releases all the resources and stops task processing thread
 
         """
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
         Releases all the resources and stops task processing thread
 
         """
+        # If there was a major failure we can't exit gracefully
+        if self._state == ECState.FAILED:
+            raise RuntimeError("EC failure. Can not exit gracefully")
+
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
         self.release()
 
         # Mark the EC state as TERMINATED
         self._state = ECState.TERMINATED
 
+        # Stop processing thread
+        self._stop = True
+
         # Notify condition to wake up the processing thread
         self._notify()
         
         # Notify condition to wake up the processing thread
         self._notify()
         
@@ -711,13 +800,11 @@ class ExperimentController(object):
         that might have been raised by the workers.
 
         """
         that might have been raised by the workers.
 
         """
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
 
-        runner = ParallelRun(maxthreads = nthreads)
-        runner.start()
+        self._runner.start()
 
 
-        try:
-            while not self.finished:
+        while not self._stop:
+            try:
                 self._cond.acquire()
 
                 task = self._scheduler.next()
                 self._cond.acquire()
 
                 task = self._scheduler.next()
@@ -745,16 +832,22 @@ class ExperimentController(object):
 
                 if task:
                     # Process tasks in parallel
 
                 if task:
                     # Process tasks in parallel
-                    runner.put(self._execute, task)
-        except: 
-            import traceback
-            err = traceback.format_exc()
-            self.logger.error("Error while processing tasks in the EC: %s" % err)
+                    self._runner.put(self._execute, task)
+            except: 
+                import traceback
+                err = traceback.format_exc()
+                self.logger.error("Error while processing tasks in the EC: %s" % err)
+
+                # Set the EC to FAILED state 
+                self._state = ECState.FAILED
+            
+                # Set the FailureManager failure level to EC failure
+                self._fm.set_ec_failure()
 
 
-            self._state = ECState.FAILED
-        finally:   
-            self.logger.debug("Exiting the task processing loop ... ")
-            runner.sync()
+        self.logger.debug("Exiting the task processing loop ... ")
+        
+        self._runner.sync()
+        self._runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
 
     def _execute(self, task):
         """ Executes a single task. 
@@ -782,16 +875,6 @@ class ExperimentController(object):
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
             
             self.logger.error("Error occurred while executing task: %s" % err)
 
-            # Set the EC to FAILED state (this will force to exit the task
-            # processing thread)
-            self._state = ECState.FAILED
-
-            # Notify condition to wake up the processing thread
-            self._notify()
-
-            # Propage error to the ParallelRunner
-            raise
-
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
         for a new task to be scheduled.
     def _notify(self):
         """ Awakes the processing thread in case it is blocked waiting
         for a new task to be scheduled.