Fixed nasty concurrency bug in EC
[nepi.git] / src / nepi / execution / ec.py
index 34c5c50..b5663c0 100644 (file)
@@ -48,16 +48,27 @@ class ExperimentController(object):
     """
     .. class:: Class Args :
       
-        :param exp_id: Id of the experiment
+        :param exp_id: Human readable identifier for the experiment. 
+                        It will be used in the name of the directory 
+                        where experiment related information is stored
         :type exp_id: int
-        :param root_dir: Root directory of the experiment
+
+        :param root_dir: Root directory where experiment specific folder
+                         will be created to store experiment information
         :type root_dir: str
 
     .. note::
+        The ExperimentController (EC), is the entity responsible for 
+        managing a single experiment. 
+        Through the EC interface the user can create ResourceManagers (RMs),
+        configure them and interconnect them, in order to describe the experiment.
+        
+        Only when the 'deploy()' method is invoked, the EC will take actions
+        to transform the 'described' experiment into a 'running' experiment.
 
-       This class is the only one used by the User. Indeed, the user "talks"
-       only with the Experiment Controller and this latter forward to 
-       the different Resources Manager the order provided by the user.
+        While the experiment is running, it is possible to continue to
+        create/configure/connect RMs, and to deploy them to involve new
+        resources in the experiment.
 
     """
 
@@ -127,22 +138,40 @@ class ExperimentController(object):
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reach the state FINISHED
+            reached the state FINISHED
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids)
+
+    def wait_started(self, guids):
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reached the state STARTED
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
 
+    def wait(self, guids, states = [ResourceState.FINISHED]):
+        """ Blocking method that waits until all the RM from the 'guid' list 
+            reached state 'state' or until a failure occurs
+            
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        while not all([self.state(guid) in [ResourceState.FINISHED, 
-            ResourceState.STOPPED, 
-            ResourceState.FAILED] \
-                for guid in guids]) and not self.finished:
-            # We keep the sleep as large as possible to 
-            # decrese the number of RM state requests
+        while not all([self.state(guid) in states for guid in guids]) and \
+                not any([self.state(guid) in [
+                        ResourceState.STOPPED, 
+                        ResourceState.FAILED] for guid in guids]) and \
+                not self.finished:
+            # We keep the sleep big to decrease the number of RM state queries
             time.sleep(2)
-    
+   
     def get_task(self, tid):
         """ Get a specific task
 
@@ -165,7 +194,8 @@ class ExperimentController(object):
     def resources(self):
         """ Returns the list of all the Resource Manager Id
 
-        :rtype:  set
+        :rtype: set
+
         """
         return self._resources.keys()
 
@@ -175,8 +205,8 @@ class ExperimentController(object):
 
         :param rtype: Type of the RM
         :type rtype: str
-        :return : Id of the RM
-        :rtype:  int
+        :return: Id of the RM
+        :rtype: int
         """
         # Get next available guid
         guid = self._guid_generator.next(guid)
@@ -194,7 +224,7 @@ class ExperimentController(object):
 
         :param guid: Guid of the RM
         :type guid: int
-        :return : List of attributes
+        :return: List of attributes
         :rtype: list
         """
         rm = self.get_resource(guid)
@@ -209,7 +239,6 @@ class ExperimentController(object):
 
             :param guid2: Second guid to connect
             :type guid: ResourceManager
-
         """
         rm1 = self.get_resource(guid1)
         rm2 = self.get_resource(guid2)
@@ -341,10 +370,12 @@ class ExperimentController(object):
 
         """
         rm = self.get_resource(guid)
+        state = rm.state
+
         if hr:
-            return ResourceState2str.get(rm.state)
+            return ResourceState2str.get(state)
 
-        return rm.state
+        return state
 
     def stop(self, guid):
         """ Stop a specific RM defined by its 'guid'
@@ -437,7 +468,15 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not group:
-            group = self.resources
+            # By default, if not deployment group is indicated, 
+            # all RMs that are undeployed will be deployed
+            group = []
+            for guid in self.resources:
+                if self.state(guid) == ResourceState.NEW:
+                    group.append(guid)
+                
+        if isinstance(group, int):
+            group = [group]
 
         # Before starting deployment we disorder the group list with the
         # purpose of speeding up the whole deployment process.
@@ -455,8 +494,7 @@ class ExperimentController(object):
         def wait_all_and_start(group):
             reschedule = False
             for guid in group:
-                rm = self.get_resource(guid)
-                if rm.state < ResourceState.READY:
+                if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
 
@@ -467,7 +505,7 @@ class ExperimentController(object):
                 # If all resources are read, we schedule the start
                 for guid in group:
                     rm = self.get_resource(guid)
-                    self.schedule("0.01s", rm.start_with_conditions)
+                    self.schedule("0s", rm.start_with_conditions)
 
         if wait_all_ready:
             # Schedule the function that will check all resources are
@@ -479,7 +517,7 @@ class ExperimentController(object):
 
         for guid in group:
             rm = self.get_resource(guid)
-            self.schedule("0.001s", rm.deploy)
+            self.schedule("0s", rm.deploy)
 
             if not wait_all_ready:
                 self.schedule("1s", rm.start_with_conditions)
@@ -489,7 +527,6 @@ class ExperimentController(object):
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
-
     def release(self, group = None):
         """ Release the elements of the list 'group' or 
         all the resources if any group is specified
@@ -518,12 +555,16 @@ class ExperimentController(object):
         
     def shutdown(self):
         """ Shutdown the Experiment Controller. 
-        It means : Release all the resources and stop the scheduler
+        Releases all the resources and stops task processing thread
 
         """
         self.release()
 
-        self._stop_scheduler()
+        # Mark the EC state as TERMINATED
+        self._state = ECState.TERMINATED
+
+        # Notify condition to wake up the processing thread
+        self._notify()
         
         if self._thread.is_alive():
            self._thread.join()
@@ -552,68 +593,105 @@ class ExperimentController(object):
 
         if track:
             self._tasks[task.id] = task
-  
+
         # Notify condition to wake up the processing thread
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
+        self._notify()
 
         return task.id
      
     def _process(self):
-        """ Process at executing the task that are in the scheduler.
+        """ Process scheduled tasks.
+
+        .. note::
+
+        The _process method is executed in an independent thread held by the 
+        ExperimentController for as long as the experiment is running.
+        
+        Tasks are scheduled by invoking the schedule method with a target callback. 
+        The schedule method is given a execution time which controls the
+        order in which tasks are processed. 
+
+        Tasks are processed in parallel using multithreading. 
+        The environmental variable NEPI_NTHREADS can be used to control
+        the number of threads used to process tasks. The default value is 50.
+
+        Exception handling:
+
+        To execute tasks in parallel, an ParallelRunner (PR) object, holding
+        a pool of threads (workers), is used.
+        For each available thread in the PR, the next task popped from 
+        the scheduler queue is 'put' in the PR.
+        Upon receiving a task to execute, each PR worker (thread) invokes the 
+        _execute method of the EC, passing the task as argument. 
+        This method, calls task.callback inside a try/except block. If an 
+        exception is raised by the tasks.callback, it will be trapped by the 
+        try block, logged to standard error (usually the console), and the EC 
+        state will be set to ECState.FAILED.
+        The invocation of _notify immediately after, forces the processing
+        loop in the _process method, to wake up if it was blocked waiting for new 
+        tasks to arrived, and to check the EC state.
+        As the EC is in FAILED state, the processing loop exits and the 
+        'finally' block is invoked. In the 'finally' block, the 'sync' method
+        of the PR is invoked, which forces the PR to raise any unchecked errors
+        that might have been raised by the workers.
 
         """
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
-        runner = ParallelRun(maxthreads = 50)
+        runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
         try:
             while not self.finished:
                 self._cond.acquire()
+
                 task = self._scheduler.next()
-                self._cond.release()
                 
                 if not task:
-                    # It there are not tasks in the tasks queue we need to 
-                    # wait until a call to schedule wakes us up
-                    self._cond.acquire()
+                    # No task to execute. Wait for a new task to be scheduled.
                     self._cond.wait()
-                    self._cond.release()
-                else: 
-                    # If the task timestamp is in the future the thread needs to wait
-                    # until time elapse or until another task is scheduled
+                else:
+                    # The task timestamp is in the future. Wait for timeout 
+                    # or until another task is scheduled.
                     now = strfnow()
                     if now < task.timestamp:
-                        # Calculate time difference in seconds
+                        # Calculate timeout in seconds
                         timeout = strfdiff(task.timestamp, now)
+
                         # Re-schedule task with the same timestamp
                         self._scheduler.schedule(task)
-                        # Sleep until timeout or until a new task awakes the condition
-                        self._cond.acquire()
+                        
+                        task = None
+
+                        # Wait timeout or until a new task awakes the condition
                         self._cond.wait(timeout)
-                        self._cond.release()
-                    else:
-                        # Process tasks in parallel
-                        runner.put(self._execute, task)
+               
+                self._cond.release()
+
+                if task:
+                    # Process tasks in parallel
+                    runner.put(self._execute, task)
         except: 
             import traceback
             err = traceback.format_exc()
-            self._logger.error("Error while processing tasks in the EC: %s" % err)
+            self.logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-   
-        # Mark EC state as terminated
-        if self.ecstate == ECState.RUNNING:
-            # Synchronize to get errors if occurred
+        finally:   
+            self.logger.debug("Exiting the task processing loop ... ")
             runner.sync()
-            self._state = ECState.TERMINATED
 
     def _execute(self, task):
-        """ Invoke the callback of the task 'task'
+        """ Executes a single task. 
 
-            :param task: Id of the task
-            :type task: int
+            :param task: Object containing the callback to execute
+            :type task: Task
+
+        .. note::
+
+        If the invokation of the task callback raises an
+        exception, the processing thread of the ExperimentController
+        will be stopped and the experiment will be aborted.
 
         """
         # Invoke callback
@@ -627,24 +705,23 @@ class ExperimentController(object):
             task.result = err
             task.status = TaskStatus.ERROR
             
-            self._logger.error("Error occurred while executing task: %s" % err)
+            self.logger.error("Error occurred while executing task: %s" % err)
 
-            self._stop_scheduler()
+            # Set the EC to FAILED state (this will force to exit the task
+            # processing thread)
+            self._state = ECState.FAILED
+
+            # Notify condition to wake up the processing thread
+            self._notify()
 
             # Propage error to the ParallelRunner
             raise
 
-    def _stop_scheduler(self):
-        """ Stop the scheduler and put the EC into a FAILED State.
-
+    def _notify(self):
+        """ Awakes the processing thread in case it is blocked waiting
+        for a new task to be scheduled.
         """
-
-        # Mark the EC as failed
-        self._state = ECState.FAILED
-
-        # Wake up the EC in case it was sleeping
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
 
-