Bugfixes for EC serialization and plotting
[nepi.git] / src / nepi / execution / ec.py
index 5584c68..2c6557f 100644 (file)
@@ -24,6 +24,8 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
+from nepi.util.plotter import ECPlotter, PFormats
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -51,6 +53,7 @@ class FailureManager(object):
     def __init__(self, ec):
         self._ec = weakref.ref(ec)
         self._failure_level = FailureLevel.OK
+        self._abort = False
 
     @property
     def ec(self):
@@ -62,23 +65,19 @@ class FailureManager(object):
 
     @property
     def abort(self):
+        return self._abort
+
+    def eval_failure(self, guid):
         if self._failure_level == FailureLevel.OK:
-            for guid in self.ec.resources:
-                try:
-                    state = self.ec.state(guid)
-                    critical = self.ec.get(guid, "critical")
-                    if state == ResourceState.FAILED and critical:
-                        self._failure_level = FailureLevel.RM_FAILURE
-                        self.ec.logger.debug("RM critical failure occurred on guid %d." \
-                                " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
-                        break
-                except:
-                    # An error might occure because a RM was deleted abruptly.
-                    # In this case the error should be ignored.
-                    if guid in self.ec._resources:
-                        raise
-
-        return self._failure_level != FailureLevel.OK
+            rm = self.ec.get_resource(guid)
+            state = rm.state
+            critical = rm.get("critical")
+
+            if state == ResourceState.FAILED and critical:
+                self._failure_level = FailureLevel.RM_FAILURE
+                self._abort = True
+                self.ec.logger.debug("RM critical failure occurred on guid %d." \
+                    " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
 
     def set_ec_failure(self):
         self._failure_level = FailureLevel.EC_FAILURE
@@ -154,6 +153,12 @@ class ExperimentController(object):
         
     """
 
+    @classmethod
+    def load(cls, filepath, format = SFormats.XML):
+        serializer = ECSerializer()
+        ec = serializer.load(filepath)
+        return ec
+
     def __init__(self, exp_id = None): 
         super(ExperimentController, self).__init__()
 
@@ -201,20 +206,10 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
-        # Blacklist file for PL nodes
-        nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
-        plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
-        if not os.path.exists(plblacklist_file):
-            if os.path.isdir(nepi_home):
-                open(plblacklist_file, 'w').close()
-            else:
-                os.makedirs(nepi_home)
-                open(plblacklist_file, 'w').close()
-                    
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
-        self._runner = ParallelRun(maxthreads = nthreads)
+        self._nthreads = 20
+        self._runner = None
 
         # Event processing thread
         self._cond = threading.Condition()
@@ -229,6 +224,14 @@ class ExperimentController(object):
         """
         return self._logger
 
+    @property
+    def failure_level(self):
+        """ Returns the level of FAILURE of th experiment
+
+        """
+
+        return self._fm._failure_level
+
     @property
     def ecstate(self):
         """ Returns the state of the Experiment Controller
@@ -251,6 +254,14 @@ class ExperimentController(object):
         """
         return self._run_id
 
+    @property
+    def nthreads(self):
+        """ Returns the number of processing nthreads used
+
+        """
+        return self._nthreads
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -259,6 +270,16 @@ class ExperimentController(object):
         """
         return self._fm.abort
 
+    def inform_failure(self, guid):
+        """ Reports a failure in a RM to the EC for evaluation
+
+            :param guid: Resource id
+            :type guid: int
+
+        """
+
+        return self._fm.eval_failure(guid)
+
     def wait_finished(self, guids):
         """ Blocking method that waits until all RMs in the 'guids' list 
         have reached a state >= STOPPED (i.e. STOPPED, FAILED or 
@@ -332,7 +353,6 @@ class ExperimentController(object):
             :type guids: list
         
         """
-        
         if isinstance(guids, int):
             guids = [guids]
 
@@ -346,23 +366,38 @@ class ExperimentController(object):
                 break
 
             # If a guid reached one of the target states, remove it from list
-            guid = guids[0]
-            rstate = self.state(guid)
+            guid = guids.pop()
+            rm = self.get_resource(guid)
+            rstate = rm.state
             
-            hrrstate = ResourceState2str.get(rstate)
-            hrstate = ResourceState2str.get(state)
-
             if rstate >= state:
-                guids.remove(guid)
-                rm = self.get_resource(guid)
                 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
-                    rm.get_rtype(), guid, hrrstate, hrstate))
+                    rm.get_rtype(), guid, rstate, state))
             else:
                 # Debug...
                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
-                    guid, hrrstate, hrstate))
+                    guid, rstate, state))
+
+                guids.append(guid)
+
                 time.sleep(0.5)
-  
+
+    def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
+        plotter = ECPlotter()
+        fpath = plotter.plot(self, dirpath = dirpath, format= format, 
+                show = show)
+        return fpath
+
+    def serialize(self, format = SFormats.XML):
+        serializer = ECSerializer()
+        sec = serializer.load(self, format = format)
+        return sec
+
+    def save(self, dirpath = None, format = SFormats.XML):
+        serializer = ECSerializer()
+        path = serializer.save(self, dirpath  = None, format = format)
+        return path
+
     def get_task(self, tid):
         """ Returns a task by its id
 
@@ -377,7 +412,7 @@ class ExperimentController(object):
     def get_resource(self, guid):
         """ Returns a registered ResourceManager by its guid
 
-            :param guid: Id of the task
+            :param guid: Id of the resource
             :type guid: int
             
             :rtype: ResourceManager
@@ -386,6 +421,21 @@ class ExperimentController(object):
         rm = self._resources.get(guid)
         return rm
 
+    def get_resources_by_type(self, rtype):
+        """ Returns a registered ResourceManager by its guid
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of ResourceManagers
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == type: 
+                rms.append(rm)
+        return rms
+
     def remove_resource(self, guid):
         del self._resources[guid]
 
@@ -801,8 +851,8 @@ class ExperimentController(object):
         if not guids:
             # If no guids list was passed, all 'NEW' RMs will be deployed
             guids = []
-            for guid in self.resources:
-                if self.state(guid) == ResourceState.NEW:
+            for guid, rm in self._resources.iteritems():
+                if rm.state == ResourceState.NEW:
                     guids.append(guid)
                 
         if isinstance(guids, int):
@@ -997,6 +1047,8 @@ class ExperimentController(object):
 
         """
 
+        self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+        self._runner = ParallelRun(maxthreads = self.nthreads)
         self._runner.start()
 
         while not self._stop: