Commiting improvements to Collector. Local_dir added to ExperimentController
[nepi.git] / src / nepi / execution / ec.py
index 066e6b4..1357e4f 100644 (file)
@@ -24,6 +24,8 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
+from nepi.util.plotter import ECPlotter, PFormats
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -31,8 +33,8 @@ from nepi.execution.trace import TraceAttr
 import functools
 import logging
 import os
-import random
 import sys
+import tempfile
 import time
 import threading
 import weakref
@@ -52,6 +54,7 @@ class FailureManager(object):
     def __init__(self, ec):
         self._ec = weakref.ref(ec)
         self._failure_level = FailureLevel.OK
+        self._abort = False
 
     @property
     def ec(self):
@@ -63,22 +66,23 @@ class FailureManager(object):
 
     @property
     def abort(self):
+        return self._abort
+
+    def eval_failure(self, guid):
         if self._failure_level == FailureLevel.OK:
-            for guid in self.ec.resources:
-                state = self.ec.state(guid)
-                critical = self.ec.get(guid, "critical")
-                if state == ResourceState.FAILED and critical:
-                    self._failure_level = FailureLevel.RM_FAILURE
-                    self.ec.logger.debug("RM critical failure occurred on guid %d." \
-                            " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
-                    break
+            rm = self.ec.get_resource(guid)
+            state = rm.state
+            critical = rm.get("critical")
 
-        return self._failure_level != FailureLevel.OK
+            if state == ResourceState.FAILED and critical:
+                self._failure_level = FailureLevel.RM_FAILURE
+                self._abort = True
+                self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                    " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
 
-
 class ECState(object):
     """ Possible states for an ExperimentController
    
@@ -147,10 +151,31 @@ class ExperimentController(object):
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
-        
+   
     """
 
-    def __init__(self, exp_id = None): 
+    @classmethod
+    def load(cls, filepath, format = SFormats.XML):
+        serializer = ECSerializer()
+        ec = serializer.load(filepath)
+        return ec
+
+    def __init__(self, exp_id = None, local_dir = None, persist = False):
+        """ ExperimentController entity to model an execute a network experiment.
+        
+        :param exp_id: Human readable name to identify the experiment
+        :type name: str
+
+        :param local_dir: Path to local directory where to store experiment
+            related files
+        :type name: str
+
+        :param persist: Save an XML description of the experiment after 
+        completion at local_dir
+        :type name: bool
+
+        """
+
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -168,6 +193,17 @@ class ExperimentController(object):
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
+        # Local path where to store experiment related files (results, etc)
+        if not local_dir:
+            local_dir = tempfile.mkdtemp()
+
+        self._local_dir = local_dir
+        self._exp_dir = os.path.join(local_dir, self.exp_id)
+        self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+        # If True persist the experiment controller in XML format, after completion
+        self._persist = persist
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -199,15 +235,15 @@ class ExperimentController(object):
 
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
-        self._runner = ParallelRun(maxthreads = nthreads)
+        self._nthreads = 20
+        self._runner = None
 
         # Event processing thread
         self._cond = threading.Condition()
         self._thread = threading.Thread(target = self._process)
         self._thread.setDaemon(True)
         self._thread.start()
-
+        
     @property
     def logger(self):
         """ Returns the logger instance of the Experiment Controller
@@ -215,6 +251,14 @@ class ExperimentController(object):
         """
         return self._logger
 
+    @property
+    def failure_level(self):
+        """ Returns the level of FAILURE of th experiment
+
+        """
+
+        return self._fm._failure_level
+
     @property
     def ecstate(self):
         """ Returns the state of the Experiment Controller
@@ -237,6 +281,43 @@ class ExperimentController(object):
         """
         return self._run_id
 
+    @property
+    def nthreads(self):
+        """ Returns the number of processing nthreads used
+
+        """
+        return self._nthreads
+
+    @property
+    def local_dir(self):
+        """ Root local directory for experiment files
+
+        """
+        return self._local_dir
+
+    @property
+    def exp_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment.
+
+        """
+        return self._exp_dir
+
+    @property
+    def run_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment run.
+
+        """
+        return self._run_dir
+
+    @property
+    def persist(self):
+        """ If Trie persist the ExperimentController to XML format upon completion
+
+        """
+        return self._persist
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -245,12 +326,22 @@ class ExperimentController(object):
         """
         return self._fm.abort
 
+    def inform_failure(self, guid):
+        """ Reports a failure in a RM to the EC for evaluation
+
+            :param guid: Resource id
+            :type guid: int
+
+        """
+
+        return self._fm.eval_failure(guid)
+
     def wait_finished(self, guids):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
         RELEASED ), or until a failure in the experiment occurs 
         (i.e. abort == True) 
-
+        
             :param guids: List of guids
             :type guids: list
 
@@ -266,7 +357,7 @@ class ExperimentController(object):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state >= STARTED, or until a failure in the 
         experiment occurs (i.e. abort == True) 
-
+        
             :param guids: List of guids
             :type guids: list
 
@@ -281,7 +372,7 @@ class ExperimentController(object):
     def wait_released(self, guids):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state == RELEASED, or until the EC fails 
-
+        
             :param guids: List of guids
             :type guids: list
 
@@ -297,7 +388,7 @@ class ExperimentController(object):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state >= READY, or until a failure in the 
         experiment occurs (i.e. abort == True) 
-
+        
             :param guids: List of guids
             :type guids: list
 
@@ -318,7 +409,6 @@ class ExperimentController(object):
             :type guids: list
         
         """
-        
         if isinstance(guids, int):
             guids = [guids]
 
@@ -332,23 +422,38 @@ class ExperimentController(object):
                 break
 
             # If a guid reached one of the target states, remove it from list
-            guid = guids[0]
-            rstate = self.state(guid)
+            guid = guids.pop()
+            rm = self.get_resource(guid)
+            rstate = rm.state
             
-            hrrstate = ResourceState2str.get(rstate)
-            hrstate = ResourceState2str.get(state)
-
             if rstate >= state:
-                guids.remove(guid)
-                rm = self.get_resource(guid)
                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
-                    rm.get_rtype(), guid, hrrstate, hrstate))
+                    rm.get_rtype(), guid, rstate, state))
             else:
                 # Debug...
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
-                    guid, hrrstate, hrstate))
+                    guid, rstate, state))
+
+                guids.append(guid)
+
                 time.sleep(0.5)
-  
+
+    def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
+        plotter = ECPlotter()
+        fpath = plotter.plot(self, dirpath = dirpath, format= format, 
+                show = show)
+        return fpath
+
+    def serialize(self, format = SFormats.XML):
+        serializer = ECSerializer()
+        sec = serializer.load(self, format = format)
+        return sec
+
+    def save(self, dirpath = None, format = SFormats.XML):
+        serializer = ECSerializer()
+        path = serializer.save(self, dirpath  = None, format = format)
+        return path
+
     def get_task(self, tid):
         """ Returns a task by its id
 
@@ -363,13 +468,32 @@ class ExperimentController(object):
     def get_resource(self, guid):
         """ Returns a registered ResourceManager by its guid
 
-            :param guid: Id of the task
+            :param guid: Id of the resource
             :type guid: int
             
             :rtype: ResourceManager
             
         """
-        return self._resources.get(guid)
+        rm = self._resources.get(guid)
+        return rm
+
+    def get_resources_by_type(self, rtype):
+        """ Returns a registered ResourceManager by its guid
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of ResourceManagers
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == type: 
+                rms.append(rm)
+        return rms
+
+    def remove_resource(self, guid):
+        del self._resources[guid]
 
     @property
     def resources(self):
@@ -379,7 +503,9 @@ class ExperimentController(object):
             :rtype: set
 
         """
-        return self._resources.keys()
+        keys = self._resources.keys()
+
+        return keys
 
     def register_resource(self, rtype, guid = None):
         """ Registers a new ResourceManager of type 'rtype' in the experiment
@@ -608,7 +734,39 @@ class ExperimentController(object):
 
         """
         rm = self.get_resource(guid)
-        return rm.set(name, value)
+        rm.set(name, value)
+
+    def get_global(self, rtype, name):
+        """ Returns the value of the global attribute with name 'name' on the
+        RMs of rtype 'rtype'.
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+            :param name: Name of the attribute 
+            :type name: str
+
+            :return: The value of the attribute with name 'name'
+
+        """
+        rclass = ResourceFactory.get_resource_type(rtype)
+        return rclass.get_global(name)
+
+    def set_global(self, rtype, name, value):
+        """ Modifies the value of the global attribute with name 'name' on the 
+        RMs of with rtype 'rtype'.
+
+            :param guid: Guid of the RM
+            :type guid: int
+
+            :param name: Name of the attribute
+            :type name: str
+
+            :param value: Value of the attribute
+
+        """
+        rclass = ResourceFactory.get_resource_type(rtype)
+        return rclass.set_global(name, value)
 
     def state(self, guid, hr = False):
         """ Returns the state of a resource
@@ -655,6 +813,41 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start()
 
+    def get_start_time(self, guid):
+        """ Returns the start time of the RM as a timestamp """
+        rm = self.get_resource(guid)
+        return rm.start_time
+
+    def get_stop_time(self, guid):
+        """ Returns the stop time of the RM as a timestamp """
+        rm = self.get_resource(guid)
+        return rm.stop_time
+
+    def get_discover_time(self, guid):
+        """ Returns the discover time of the RM as a timestamp """
+        rm = self.get_resource(guid)
+        return rm.discover_time
+
+    def get_provision_time(self, guid):
+        """ Returns the provision time of the RM as a timestamp """
+        rm = self.get_resource(guid)
+        return rm.provision_time
+
+    def get_ready_time(self, guid):
+        """ Returns the deployment time of the RM as a timestamp """
+        rm = self.get_resource(guid)
+        return rm.ready_time
+
+    def get_release_time(self, guid):
+        """ Returns the release time of the RM as a timestamp """
+        rm = self.get_resource(guid)
+        return rm.release_time
+
+    def get_failed_time(self, guid):
+        """ Returns the time failure occured for the RM as a timestamp """
+        rm = self.get_resource(guid)
+        return rm.failed_time
+
     def set_with_conditions(self, name, value, guids1, guids2, state,
             time = None):
         """ Modifies the value of attribute with name 'name' on all RMs 
@@ -714,8 +907,8 @@ class ExperimentController(object):
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
-            for guid in self.resources:
-                if self.state(guid) == ResourceState.NEW:
+            for guid, rm in self._resources.iteritems():
+                if rm.state == ResourceState.NEW:
                     guids.append(guid)
                 
         if isinstance(guids, int):
@@ -747,7 +940,6 @@ class ExperimentController(object):
                     break
 
             if reschedule:
-
                 callback = functools.partial(wait_all_and_start, group)
                 self.schedule("1s", callback)
             else:
@@ -756,6 +948,11 @@ class ExperimentController(object):
                     rm = self.get_resource(guid)
                     self.schedule("0s", rm.start_with_conditions)
 
+                    if rm.conditions.get(ResourceAction.STOP):
+                        # Only if the RM has STOP conditions we
+                        # schedule a stop. Otherwise the RM will stop immediately
+                        self.schedule("0s", rm.stop_with_conditions)
+
         if wait_all_ready and new_group:
             # Schedule a function to check that all resources are
             # READY, and only then schedule the start.
@@ -774,10 +971,10 @@ class ExperimentController(object):
             if not wait_all_ready:
                 self.schedule("0s", rm.start_with_conditions)
 
-            if rm.conditions.get(ResourceAction.STOP):
-                # Only if the RM has STOP conditions we
-                # schedule a stop. Otherwise the RM will stop immediately
-                self.schedule("0s", rm.stop_with_conditions)
+                if rm.conditions.get(ResourceAction.STOP):
+                    # Only if the RM has STOP conditions we
+                    # schedule a stop. Otherwise the RM will stop immediately
+                    self.schedule("0s", rm.stop_with_conditions)
 
     def release(self, guids = None):
         """ Releases all ResourceManagers in the guids list.
@@ -789,20 +986,24 @@ class ExperimentController(object):
             :type guids: list
 
         """
+        if isinstance(guids, int):
+            guids = [guids]
+
         if not guids:
             guids = self.resources
 
-        # Remove all pending tasks from the scheduler queue
-        for tid in list(self._scheduler.pending):
-            self._scheduler.remove(tid)
-
-        self._runner.empty()
-
         for guid in guids:
             rm = self.get_resource(guid)
             self.schedule("0s", rm.release)
 
         self.wait_released(guids)
+
+        if self.persist:
+            self.save(dirpath = self.run_dir)
+
+        for guid in guids:
+            if self.get(guid, "hardRelease"):
+                self.remove_resource(guid)
         
     def shutdown(self):
         """ Releases all resources and stops the ExperimentController
@@ -812,6 +1013,13 @@ class ExperimentController(object):
         if self._state == ECState.FAILED:
             raise RuntimeError("EC failure. Can not exit gracefully")
 
+        # Remove all pending tasks from the scheduler queue
+        for tid in list(self._scheduler.pending):
+            self._scheduler.remove(tid)
+
+        # Remove pending tasks from the workers queue
+        self._runner.empty()
+
         self.release()
 
         # Mark the EC state as TERMINATED
@@ -898,6 +1106,8 @@ class ExperimentController(object):
 
         """
 
+        self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+        self._runner = ParallelRun(maxthreads = self.nthreads)
         self._runner.start()
 
         while not self._stop:
@@ -953,11 +1163,10 @@ class ExperimentController(object):
             :type task: Task
 
         """
-        # Invoke callback
-        task.status = TaskStatus.DONE
-
         try:
+            # Invoke callback
             task.result = task.callback()
+            task.status = TaskStatus.DONE
         except:
             import traceback
             err = traceback.format_exc()