Bugfixing LinuxNode and LinuxApplication
[nepi.git] / src / nepi / execution / ec.py
index d6fcf87..ba573c4 100644 (file)
@@ -1,21 +1,21 @@
-"""
-    NEPI, a framework to manage network experiments
-    Copyright (C) 2013 INRIA
-
-    This program is free software: you can redistribute it and/or modify
-    it under the terms of the GNU General Public License as published by
-    the Free Software Foundation, either version 3 of the License, or
-    (at your option) any later version.
-
-    This program is distributed in the hope that it will be useful,
-    but WITHOUT ANY WARRANTY; without even the implied warranty of
-    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-    GNU General Public License for more details.
-
-    You should have received a copy of the GNU General Public License
-    along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-"""
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Alina Quereilhac <alina.quereilhac@inria.fr>
 
 import functools
 import logging
@@ -29,7 +29,7 @@ from nepi.util import guid
 from nepi.util.parallel import ParallelRun
 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid 
 from nepi.execution.resource import ResourceFactory, ResourceAction, \
-        ResourceState
+        ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
 
@@ -37,11 +37,30 @@ from nepi.execution.trace import TraceAttr
 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
 
 class ECState(object):
+    """ State of the Experiment Controller
+   
+    """
     RUNNING = 1
     FAILED = 2
     TERMINATED = 3
 
 class ExperimentController(object):
+    """
+    .. class:: Class Args :
+      
+        :param exp_id: Id of the experiment
+        :type exp_id: int
+        :param root_dir: Root directory of the experiment
+        :type root_dir: str
+
+    .. note::
+
+       This class is the only one used by the User. Indeed, the user "talks"
+       only with the Experiment Controller and this latter forward to 
+       the different Resources Manager the order provided by the user.
+
+    """
+
     def __init__(self, exp_id = None, root_dir = "/tmp"): 
         super(ExperimentController, self).__init__()
         # root directory to store files
@@ -76,14 +95,23 @@ class ExperimentController(object):
 
     @property
     def logger(self):
+        """ Return the logger of the Experiment Controller
+
+        """
         return self._logger
 
     @property
     def ecstate(self):
+        """ Return the state of the Experiment Controller
+
+        """
         return self._state
 
     @property
     def exp_id(self):
+        """ Return the experiment ID
+
+        """
         exp_id = self._exp_id
         if not exp_id.startswith("nepi-"):
             exp_id = "nepi-" + exp_id
@@ -91,26 +119,65 @@ class ExperimentController(object):
 
     @property
     def finished(self):
+        """ Put the state of the Experiment Controller into a final state :
+            Either TERMINATED or FAILED
+
+        """
         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
 
     def wait_finished(self, guids):
-        while not all([self.state(guid) == ResourceState.FINISHED \
+        """ Blocking method that wait until all the RM from the 'guid' list 
+            reach the state FINISHED
+
+        :param guids: List of guids
+        :type guids: list
+        """
+        if isinstance(guids, int):
+            guids = [guids]
+
+        while not all([self.state(guid) in [ResourceState.FINISHED, 
+            ResourceState.STOPPED, 
+            ResourceState.FAILED] \
                 for guid in guids]) and not self.finished:
             # We keep the sleep as large as possible to 
             # decrese the number of RM state requests
             time.sleep(2)
     
     def get_task(self, tid):
+        """ Get a specific task
+
+        :param tid: Id of the task
+        :type tid: int
+        :rtype:  unknow
+        """
         return self._tasks.get(tid)
 
     def get_resource(self, guid):
+        """ Get a specific Resource Manager
+
+        :param guid: Id of the task
+        :type guid: int
+        :rtype:  ResourceManager
+        """
         return self._resources.get(guid)
 
     @property
     def resources(self):
+        """ Returns the list of all the Resource Manager Id
+
+        :rtype:  set
+        """
         return self._resources.keys()
 
     def register_resource(self, rtype, guid = None):
+        """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
+        for the RM of type 'rtype' and add it to the list of Resources.
+
+        :param rtype: Type of the RM
+        :type rtype: str
+        :return : Id of the RM
+        :rtype:  int
+        """
         # Get next available guid
         guid = self._guid_generator.next(guid)
         
@@ -123,14 +190,27 @@ class ExperimentController(object):
         return guid
 
     def get_attributes(self, guid):
-        rm = self.get_resource(guid)
-        return rm.get_attributes()
+        """ Return all the attibutes of a specific RM
 
-    def get_filters(self, guid):
+        :param guid: Guid of the RM
+        :type guid: int
+        :return : List of attributes
+        :rtype: list
+        """
         rm = self.get_resource(guid)
-        return rm.get_filters()
+        return rm.get_attributes()
 
     def register_connection(self, guid1, guid2):
+        """ Registers a guid1 with a guid2. 
+            The declaration order is not important
+
+            :param guid1: First guid to connect
+            :type guid1: ResourceManager
+
+            :param guid2: Second guid to connect
+            :type guid: ResourceManager
+
+        """
         rm1 = self.get_resource(guid1)
         rm2 = self.get_resource(guid2)
 
@@ -200,31 +280,89 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.trace(name, attr, block, offset)
 
-    def discover(self, guid, filters):
+    def discover(self, guid):
+        """ Discover a specific RM defined by its 'guid'
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+        """
         rm = self.get_resource(guid)
-        return rm.discover(filters)
+        return rm.discover()
+
+    def provision(self, guid):
+        """ Provision a specific RM defined by its 'guid'
+
+            :param guid: Guid of the RM
+            :type guid: int
 
-    def provision(self, guid, filters):
+        """
         rm = self.get_resource(guid)
-        return rm.provision(filters)
+        return rm.provision()
 
     def get(self, guid, name):
+        """ Get a specific attribute 'name' from the RM 'guid'
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+            :param name: attribute's name
+            :type name: str
+
+        """
         rm = self.get_resource(guid)
         return rm.get(name)
 
     def set(self, guid, name, value):
+        """ Set a specific attribute 'name' from the RM 'guid' 
+            with the value 'value' 
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+            :param name: attribute's name
+            :type name: str
+
+            :param value: attribute's value
+
+        """
         rm = self.get_resource(guid)
         return rm.set(name, value)
 
-    def state(self, guid):
+    def state(self, guid, hr = False):
+        """ Returns the state of a resource
+
+            :param guid: Resource guid
+            :type guid: integer
+
+            :param hr: Human readable. Forces return of a 
+                status string instead of a number 
+            :type hr: boolean
+
+        """
         rm = self.get_resource(guid)
+        if hr:
+            return ResourceState2str.get(rm.state)
+
         return rm.state
 
     def stop(self, guid):
+        """ Stop a specific RM defined by its 'guid'
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+        """
         rm = self.get_resource(guid)
         return rm.stop()
 
     def start(self, guid):
+        """ Start a specific RM defined by its 'guid'
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+        """
         rm = self.get_resource(guid)
         return rm.start()
 
@@ -266,10 +404,22 @@ class ExperimentController(object):
             rm.set_with_conditions(name, value, group2, state, time)
 
     def stop_with_conditions(self, guid):
+        """ Stop a specific RM defined by its 'guid' only if all the conditions are true
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+        """
         rm = self.get_resource(guid)
         return rm.stop_with_conditions()
 
     def start_with_conditions(self, guid):
+        """ Start a specific RM defined by its 'guid' only if all the conditions are true
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+        """
         rm = self.get_resource(guid)
         return rm.start_with_condition()
 
@@ -289,6 +439,9 @@ class ExperimentController(object):
         if not group:
             group = self.resources
 
+        if isinstance(group, int):
+            group = [group]
+
         # Before starting deployment we disorder the group list with the
         # purpose of speeding up the whole deployment process.
         # It is likely that the user inserted in the 'group' list closely
@@ -300,7 +453,7 @@ class ExperimentController(object):
         # same conditions (e.g. LinuxApplications running on a same 
         # node share a single lock, so they will tend to be serialized).
         # If we disorder the group list, this problem can be mitigated.
-        random.shuffle(group)
+        #random.shuffle(group)
 
         def wait_all_and_start(group):
             reschedule = False
@@ -317,7 +470,7 @@ class ExperimentController(object):
                 # If all resources are read, we schedule the start
                 for guid in group:
                     rm = self.get_resource(guid)
-                    self.schedule("0.01s", rm.start_with_conditions)
+                    self.schedule("0s", rm.start_with_conditions)
 
         if wait_all_ready:
             # Schedule the function that will check all resources are
@@ -329,7 +482,7 @@ class ExperimentController(object):
 
         for guid in group:
             rm = self.get_resource(guid)
-            self.schedule("0.001s", rm.deploy)
+            self.schedule("0s", rm.deploy)
 
             if not wait_all_ready:
                 self.schedule("1s", rm.start_with_conditions)
@@ -341,6 +494,13 @@ class ExperimentController(object):
 
 
     def release(self, group = None):
+        """ Release the elements of the list 'group' or 
+        all the resources if any group is specified
+
+            :param group: List of RM
+            :type group: list
+
+        """
         if not group:
             group = self.resources
 
@@ -360,9 +520,17 @@ class ExperimentController(object):
                 threads.remove(thread)
         
     def shutdown(self):
+        """ Shutdown the Experiment Controller. 
+        Releases all the resources and stops task processing thread
+
+        """
         self.release()
 
-        self._stop_scheduler()
+        # Mark the EC state as TERMINATED
+        self._state = ECState.TERMINATED
+
+        # Notify condition to wake up the processing thread
+        self._notify()
         
         if self._thread.is_alive():
            self._thread.join()
@@ -370,17 +538,19 @@ class ExperimentController(object):
     def schedule(self, date, callback, track = False):
         """ Schedule a callback to be executed at time date.
 
-            date    string containing execution time for the task.
+            :param date: string containing execution time for the task.
                     It can be expressed as an absolute time, using
                     timestamp format, or as a relative time matching
                     ^\d+.\d+(h|m|s|ms|us)$
 
-            callback    code to be executed for the task. Must be a
+            :param callback: code to be executed for the task. Must be a
                         Python function, and receives args and kwargs
                         as arguments.
 
-            track   if set to True, the task will be retrivable with
+            :param track: if set to True, the task will be retrivable with
                     the get_task() method
+
+            :return : The Id of the task
         """
         timestamp = strfvalid(date)
         
@@ -391,14 +561,28 @@ class ExperimentController(object):
             self._tasks[task.id] = task
   
         # Notify condition to wake up the processing thread
-        self._cond.acquire()
-        self._cond.notify()
-        self._cond.release()
+        self._notify()
 
         return task.id
      
     def _process(self):
-        runner = ParallelRun(maxthreads = 50)
+        """ Process scheduled tasks.
+
+        The _process method is executed in an independent thread held by the 
+        ExperimentController for as long as the experiment is running.
+        
+        Tasks are scheduled by invoking the schedule method with a target callback. 
+        The schedule method is givedn a execution time which controls the
+        order in which tasks are processed. 
+
+        Tasks are processed in parallel using multithreading. 
+        The environmental variable NEPI_NTHREADS can be used to control
+        the number of threads used to process tasks. The default value is 50.
+
+        """
+        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
+
+        runner = ParallelRun(maxthreads = nthreads)
         runner.start()
 
         try:
@@ -435,14 +619,20 @@ class ExperimentController(object):
             self._logger.error("Error while processing tasks in the EC: %s" % err)
 
             self._state = ECState.FAILED
-   
-        # Mark EC state as terminated
-        if self.ecstate == ECState.RUNNING:
-            # Synchronize to get errors if occurred
+        finally:   
             runner.sync()
-            self._state = ECState.TERMINATED
 
     def _execute(self, task):
+        """ Executes a single task. 
+
+            If the invokation of the task callback raises an
+            exception, the processing thread of the ExperimentController
+            will be stopped and the experiment will be aborted.
+
+            :param task: Object containing the callback to execute
+            :type task: Task
+
+        """
         # Invoke callback
         task.status = TaskStatus.DONE
 
@@ -456,18 +646,21 @@ class ExperimentController(object):
             
             self._logger.error("Error occurred while executing task: %s" % err)
 
-            self._stop_scheduler()
+            # Set the EC to FAILED state (this will force to exit the task
+            # processing thread)
+            self._state = ECState.FAILED
+
+            # Notify condition to wake up the processing thread
+            self._notify()
 
             # Propage error to the ParallelRunner
             raise
 
-    def _stop_scheduler(self):
-        # Mark the EC as failed
-        self._state = ECState.FAILED
-
-        # Wake up the EC in case it was sleeping
+    def _notify(self):
+        """ Awakes the processing thread in case it is blocked waiting
+        for a new task to be scheduled.
+        """
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
 
-