Bugfixes for LinuxApplication and LinuxNode
[nepi.git] / src / nepi / execution / ec.py
index c4917c0..c5dc10f 100644 (file)
@@ -48,15 +48,27 @@ class ExperimentController(object):
     """
     .. class:: Class Args :
       
-        :param exp_id: Id of the experiment
+        :param exp_id: Human readable identifier for the experiment. 
+                        It will be used in the name of the directory 
+                        where experiment related information is stored
         :type exp_id: int
-        :param root_dir: Root directory of the experiment
+
+        :param root_dir: Root directory where experiment specific folder
+                         will be created to store experiment information
         :type root_dir: str
 
     .. note::
-        This class is the only one used by the User. Indeed, the user "talks"
-        only with the Experiment Controller and this latter forward to 
-        the different Resources Manager the order provided by the user.
+        The ExperimentController (EC), is the entity responsible for 
+        managing a single experiment. 
+        Through the EC interface the user can create ResourceManagers (RMs),
+        configure them and interconnect them, in order to describe the experiment.
+        
+        Only when the 'deploy()' method is invoked, the EC will take actions
+        to transform the 'described' experiment into a 'running' experiment.
+
+        While the experiment is running, it is possible to continue to
+        create/configure/connect RMs, and to deploy them to involve new
+        resources in the experiment.
 
     """
 
@@ -126,22 +138,40 @@ class ExperimentController(object):
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reach the state FINISHED
+            reached the state FINISHED
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids)
+
+    def wait_started(self, guids):
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reached the state STARTED
 
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+
+    def wait(self, guids, states = [ResourceState.FINISHED]):
+        """ Blocking method that waits until all the RM from the 'guid' list 
+            reached state 'state' or until a failure occurs
+            
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        while not all([self.state(guid) in [ResourceState.FINISHED, 
-            ResourceState.STOPPED, 
-            ResourceState.FAILED] \
-                for guid in guids]) and not self.finished:
-            # We keep the sleep as large as possible to 
-            # decrese the number of RM state requests
+        while not all([self.state(guid) in states for guid in guids]) and \
+                not any([self.state(guid) in [
+                        ResourceState.STOPPED, 
+                        ResourceState.FAILED] for guid in guids]) and \
+                not self.finished:
+            # We keep the sleep big to decrease the number of RM state queries
             time.sleep(2)
-    
+   
     def get_task(self, tid):
         """ Get a specific task
 
@@ -436,8 +466,13 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not group:
-            group = self.resources
-
+            # By default, if not deployment group is indicated, 
+            # all RMs that are undeployed will be deployed
+            group = []
+            for guid in self.resources:
+                if self.state(guid) == ResourceState.NEW:
+                    group.append(guid)
+                
         if isinstance(group, int):
             group = [group]
 
@@ -571,13 +606,33 @@ class ExperimentController(object):
         ExperimentController for as long as the experiment is running.
         
         Tasks are scheduled by invoking the schedule method with a target callback. 
-        The schedule method is givedn a execution time which controls the
+        The schedule method is given a execution time which controls the
         order in which tasks are processed. 
 
         Tasks are processed in parallel using multithreading. 
         The environmental variable NEPI_NTHREADS can be used to control
         the number of threads used to process tasks. The default value is 50.
 
+        Exception handling:
+
+        To execute tasks in parallel, an ParallelRunner (PR) object, holding
+        a pool of threads (workers), is used.
+        For each available thread in the PR, the next task popped from 
+        the scheduler queue is 'put' in the PR.
+        Upon receiving a task to execute, each PR worker (thread) invokes the 
+        _execute method of the EC, passing the task as argument. 
+        This method, calls task.callback inside a try/except block. If an 
+        exception is raised by the tasks.callback, it will be trapped by the 
+        try block, logged to standard error (usually the console), and the EC 
+        state will be set to ECState.FAILED.
+        The invocation of _notify immediately after, forces the processing
+        loop in the _process method, to wake up if it was blocked waiting for new 
+        tasks to arrived, and to check the EC state.
+        As the EC is in FAILED state, the processing loop exits and the 
+        'finally' block is invoked. In the 'finally' block, the 'sync' method
+        of the PR is invoked, which forces the PR to raise any unchecked errors
+        that might have been raised by the workers.
+
         """
         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
@@ -619,6 +674,7 @@ class ExperimentController(object):
 
             self._state = ECState.FAILED
         finally:   
+            self._logger.info("Exiting the task processing loop ... ")
             runner.sync()
 
     def _execute(self, task):