Adding CCN RMs for Linux backend
[nepi.git] / src / nepi / execution / ec.py
index ba573c4..b5663c0 100644 (file)
@@ -48,16 +48,27 @@ class ExperimentController(object):
     """
     .. class:: Class Args :
       
     """
     .. class:: Class Args :
       
-        :param exp_id: Id of the experiment
+        :param exp_id: Human readable identifier for the experiment. 
+                        It will be used in the name of the directory 
+                        where experiment related information is stored
         :type exp_id: int
         :type exp_id: int
-        :param root_dir: Root directory of the experiment
+
+        :param root_dir: Root directory where experiment specific folder
+                         will be created to store experiment information
         :type root_dir: str
 
     .. note::
         :type root_dir: str
 
     .. note::
+        The ExperimentController (EC), is the entity responsible for 
+        managing a single experiment. 
+        Through the EC interface the user can create ResourceManagers (RMs),
+        configure them and interconnect them, in order to describe the experiment.
+        
+        Only when the 'deploy()' method is invoked, the EC will take actions
+        to transform the 'described' experiment into a 'running' experiment.
 
 
-       This class is the only one used by the User. Indeed, the user "talks"
-       only with the Experiment Controller and this latter forward to 
-       the different Resources Manager the order provided by the user.
+        While the experiment is running, it is possible to continue to
+        create/configure/connect RMs, and to deploy them to involve new
+        resources in the experiment.
 
     """
 
 
     """
 
@@ -127,22 +138,40 @@ class ExperimentController(object):
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
 
     def wait_finished(self, guids):
         """ Blocking method that wait until all the RM from the 'guid' list 
-            reach the state FINISHED
+            reached the state FINISHED
 
 
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids)
+
+    def wait_started(self, guids):
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reached the state STARTED
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
+
+    def wait(self, guids, states = [ResourceState.FINISHED]):
+        """ Blocking method that waits until all the RM from the 'guid' list 
+            reached state 'state' or until a failure occurs
+            
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
         :param guids: List of guids
         :type guids: list
         """
         if isinstance(guids, int):
             guids = [guids]
 
-        while not all([self.state(guid) in [ResourceState.FINISHED, 
-            ResourceState.STOPPED, 
-            ResourceState.FAILED] \
-                for guid in guids]) and not self.finished:
-            # We keep the sleep as large as possible to 
-            # decrese the number of RM state requests
+        while not all([self.state(guid) in states for guid in guids]) and \
+                not any([self.state(guid) in [
+                        ResourceState.STOPPED, 
+                        ResourceState.FAILED] for guid in guids]) and \
+                not self.finished:
+            # We keep the sleep big to decrease the number of RM state queries
             time.sleep(2)
             time.sleep(2)
-    
+   
     def get_task(self, tid):
         """ Get a specific task
 
     def get_task(self, tid):
         """ Get a specific task
 
@@ -165,7 +194,8 @@ class ExperimentController(object):
     def resources(self):
         """ Returns the list of all the Resource Manager Id
 
     def resources(self):
         """ Returns the list of all the Resource Manager Id
 
-        :rtype:  set
+        :rtype: set
+
         """
         return self._resources.keys()
 
         """
         return self._resources.keys()
 
@@ -175,8 +205,8 @@ class ExperimentController(object):
 
         :param rtype: Type of the RM
         :type rtype: str
 
         :param rtype: Type of the RM
         :type rtype: str
-        :return : Id of the RM
-        :rtype:  int
+        :return: Id of the RM
+        :rtype: int
         """
         # Get next available guid
         guid = self._guid_generator.next(guid)
         """
         # Get next available guid
         guid = self._guid_generator.next(guid)
@@ -194,7 +224,7 @@ class ExperimentController(object):
 
         :param guid: Guid of the RM
         :type guid: int
 
         :param guid: Guid of the RM
         :type guid: int
-        :return : List of attributes
+        :return: List of attributes
         :rtype: list
         """
         rm = self.get_resource(guid)
         :rtype: list
         """
         rm = self.get_resource(guid)
@@ -209,7 +239,6 @@ class ExperimentController(object):
 
             :param guid2: Second guid to connect
             :type guid: ResourceManager
 
             :param guid2: Second guid to connect
             :type guid: ResourceManager
-
         """
         rm1 = self.get_resource(guid1)
         rm2 = self.get_resource(guid2)
         """
         rm1 = self.get_resource(guid1)
         rm2 = self.get_resource(guid2)
@@ -341,10 +370,12 @@ class ExperimentController(object):
 
         """
         rm = self.get_resource(guid)
 
         """
         rm = self.get_resource(guid)
+        state = rm.state
+
         if hr:
         if hr:
-            return ResourceState2str.get(rm.state)
+            return ResourceState2str.get(state)
 
 
-        return rm.state
+        return state
 
     def stop(self, guid):
         """ Stop a specific RM defined by its 'guid'
 
     def stop(self, guid):
         """ Stop a specific RM defined by its 'guid'
@@ -437,8 +468,13 @@ class ExperimentController(object):
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not group:
         self.logger.debug(" ------- DEPLOY START ------ ")
 
         if not group:
-            group = self.resources
-
+            # By default, if not deployment group is indicated, 
+            # all RMs that are undeployed will be deployed
+            group = []
+            for guid in self.resources:
+                if self.state(guid) == ResourceState.NEW:
+                    group.append(guid)
+                
         if isinstance(group, int):
             group = [group]
 
         if isinstance(group, int):
             group = [group]
 
@@ -453,13 +489,12 @@ class ExperimentController(object):
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
-        #random.shuffle(group)
+        random.shuffle(group)
 
         def wait_all_and_start(group):
             reschedule = False
             for guid in group:
 
         def wait_all_and_start(group):
             reschedule = False
             for guid in group:
-                rm = self.get_resource(guid)
-                if rm.state < ResourceState.READY:
+                if self.state(guid) < ResourceState.READY:
                     reschedule = True
                     break
 
                     reschedule = True
                     break
 
@@ -492,7 +527,6 @@ class ExperimentController(object):
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
                 # schedule a stop. Otherwise the RM will stop immediately
                 self.schedule("2s", rm.stop_with_conditions)
 
-
     def release(self, group = None):
         """ Release the elements of the list 'group' or 
         all the resources if any group is specified
     def release(self, group = None):
         """ Release the elements of the list 'group' or 
         all the resources if any group is specified
@@ -559,7 +593,7 @@ class ExperimentController(object):
 
         if track:
             self._tasks[task.id] = task
 
         if track:
             self._tasks[task.id] = task
-  
+
         # Notify condition to wake up the processing thread
         self._notify()
 
         # Notify condition to wake up the processing thread
         self._notify()
 
@@ -568,17 +602,39 @@ class ExperimentController(object):
     def _process(self):
         """ Process scheduled tasks.
 
     def _process(self):
         """ Process scheduled tasks.
 
+        .. note::
+
         The _process method is executed in an independent thread held by the 
         ExperimentController for as long as the experiment is running.
         
         Tasks are scheduled by invoking the schedule method with a target callback. 
         The _process method is executed in an independent thread held by the 
         ExperimentController for as long as the experiment is running.
         
         Tasks are scheduled by invoking the schedule method with a target callback. 
-        The schedule method is givedn a execution time which controls the
+        The schedule method is given a execution time which controls the
         order in which tasks are processed. 
 
         Tasks are processed in parallel using multithreading. 
         The environmental variable NEPI_NTHREADS can be used to control
         the number of threads used to process tasks. The default value is 50.
 
         order in which tasks are processed. 
 
         Tasks are processed in parallel using multithreading. 
         The environmental variable NEPI_NTHREADS can be used to control
         the number of threads used to process tasks. The default value is 50.
 
+        Exception handling:
+
+        To execute tasks in parallel, an ParallelRunner (PR) object, holding
+        a pool of threads (workers), is used.
+        For each available thread in the PR, the next task popped from 
+        the scheduler queue is 'put' in the PR.
+        Upon receiving a task to execute, each PR worker (thread) invokes the 
+        _execute method of the EC, passing the task as argument. 
+        This method, calls task.callback inside a try/except block. If an 
+        exception is raised by the tasks.callback, it will be trapped by the 
+        try block, logged to standard error (usually the console), and the EC 
+        state will be set to ECState.FAILED.
+        The invocation of _notify immediately after, forces the processing
+        loop in the _process method, to wake up if it was blocked waiting for new 
+        tasks to arrived, and to check the EC state.
+        As the EC is in FAILED state, the processing loop exits and the 
+        'finally' block is invoked. In the 'finally' block, the 'sync' method
+        of the PR is invoked, which forces the PR to raise any unchecked errors
+        that might have been raised by the workers.
+
         """
         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
         """
         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
 
@@ -588,50 +644,55 @@ class ExperimentController(object):
         try:
             while not self.finished:
                 self._cond.acquire()
         try:
             while not self.finished:
                 self._cond.acquire()
+
                 task = self._scheduler.next()
                 task = self._scheduler.next()
-                self._cond.release()
                 
                 if not task:
                 
                 if not task:
-                    # It there are not tasks in the tasks queue we need to 
-                    # wait until a call to schedule wakes us up
-                    self._cond.acquire()
+                    # No task to execute. Wait for a new task to be scheduled.
                     self._cond.wait()
                     self._cond.wait()
-                    self._cond.release()
-                else: 
-                    # If the task timestamp is in the future the thread needs to wait
-                    # until time elapse or until another task is scheduled
+                else:
+                    # The task timestamp is in the future. Wait for timeout 
+                    # or until another task is scheduled.
                     now = strfnow()
                     if now < task.timestamp:
                     now = strfnow()
                     if now < task.timestamp:
-                        # Calculate time difference in seconds
+                        # Calculate timeout in seconds
                         timeout = strfdiff(task.timestamp, now)
                         timeout = strfdiff(task.timestamp, now)
+
                         # Re-schedule task with the same timestamp
                         self._scheduler.schedule(task)
                         # Re-schedule task with the same timestamp
                         self._scheduler.schedule(task)
-                        # Sleep until timeout or until a new task awakes the condition
-                        self._cond.acquire()
+                        
+                        task = None
+
+                        # Wait timeout or until a new task awakes the condition
                         self._cond.wait(timeout)
                         self._cond.wait(timeout)
-                        self._cond.release()
-                    else:
-                        # Process tasks in parallel
-                        runner.put(self._execute, task)
+               
+                self._cond.release()
+
+                if task:
+                    # Process tasks in parallel
+                    runner.put(self._execute, task)
         except: 
             import traceback
             err = traceback.format_exc()
         except: 
             import traceback
             err = traceback.format_exc()
-            self._logger.error("Error while processing tasks in the EC: %s" % err)
+            self.logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
         finally:   
 
             self._state = ECState.FAILED
         finally:   
+            self.logger.debug("Exiting the task processing loop ... ")
             runner.sync()
 
     def _execute(self, task):
         """ Executes a single task. 
 
             runner.sync()
 
     def _execute(self, task):
         """ Executes a single task. 
 
-            If the invokation of the task callback raises an
-            exception, the processing thread of the ExperimentController
-            will be stopped and the experiment will be aborted.
-
             :param task: Object containing the callback to execute
             :type task: Task
 
             :param task: Object containing the callback to execute
             :type task: Task
 
+        .. note::
+
+        If the invokation of the task callback raises an
+        exception, the processing thread of the ExperimentController
+        will be stopped and the experiment will be aborted.
+
         """
         # Invoke callback
         task.status = TaskStatus.DONE
         """
         # Invoke callback
         task.status = TaskStatus.DONE
@@ -644,7 +705,7 @@ class ExperimentController(object):
             task.result = err
             task.status = TaskStatus.ERROR
             
             task.result = err
             task.status = TaskStatus.ERROR
             
-            self._logger.error("Error occurred while executing task: %s" % err)
+            self.logger.error("Error occurred while executing task: %s" % err)
 
             # Set the EC to FAILED state (this will force to exit the task
             # processing thread)
 
             # Set the EC to FAILED state (this will force to exit the task
             # processing thread)