Commiting improvements to Collector. Local_dir added to ExperimentController
[nepi.git] / src / nepi / execution / ec.py
index cf0e0bb..1357e4f 100644 (file)
@@ -24,6 +24,8 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
+from nepi.util.plotter import ECPlotter, PFormats
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -32,6 +34,7 @@ import functools
 import logging
 import os
 import sys
+import tempfile
 import time
 import threading
 import weakref
@@ -67,7 +70,7 @@ class FailureManager(object):
 
     def eval_failure(self, guid):
         if self._failure_level == FailureLevel.OK:
-            rm = self.get_resource(guid)
+            rm = self.ec.get_resource(guid)
             state = rm.state
             critical = rm.get("critical")
 
@@ -148,10 +151,31 @@ class ExperimentController(object):
     exp_id, which can be re-used in different ExperimentController,
     and the run_id, which is unique to one ExperimentController instance, and
     is automatically generated by NEPI.
-        
+   
     """
 
-    def __init__(self, exp_id = None): 
+    @classmethod
+    def load(cls, filepath, format = SFormats.XML):
+        serializer = ECSerializer()
+        ec = serializer.load(filepath)
+        return ec
+
+    def __init__(self, exp_id = None, local_dir = None, persist = False):
+        """ ExperimentController entity to model an execute a network experiment.
+        
+        :param exp_id: Human readable name to identify the experiment
+        :type name: str
+
+        :param local_dir: Path to local directory where to store experiment
+            related files
+        :type name: str
+
+        :param persist: Save an XML description of the experiment after 
+        completion at local_dir
+        :type name: bool
+
+        """
+
         super(ExperimentController, self).__init__()
 
         # Logging
@@ -169,6 +193,17 @@ class ExperimentController(object):
         # resources used, etc)
         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
 
+        # Local path where to store experiment related files (results, etc)
+        if not local_dir:
+            local_dir = tempfile.mkdtemp()
+
+        self._local_dir = local_dir
+        self._exp_dir = os.path.join(local_dir, self.exp_id)
+        self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+        # If True persist the experiment controller in XML format, after completion
+        self._persist = persist
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -200,8 +235,8 @@ class ExperimentController(object):
 
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "20"))
-        self._runner = ParallelRun(maxthreads = nthreads)
+        self._nthreads = 20
+        self._runner = None
 
         # Event processing thread
         self._cond = threading.Condition()
@@ -246,6 +281,43 @@ class ExperimentController(object):
         """
         return self._run_id
 
+    @property
+    def nthreads(self):
+        """ Returns the number of processing nthreads used
+
+        """
+        return self._nthreads
+
+    @property
+    def local_dir(self):
+        """ Root local directory for experiment files
+
+        """
+        return self._local_dir
+
+    @property
+    def exp_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment.
+
+        """
+        return self._exp_dir
+
+    @property
+    def run_dir(self):
+        """ Local directory to store results and other files related to the 
+        experiment run.
+
+        """
+        return self._run_dir
+
+    @property
+    def persist(self):
+        """ If Trie persist the ExperimentController to XML format upon completion
+
+        """
+        return self._persist
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -365,7 +437,23 @@ class ExperimentController(object):
                 guids.append(guid)
 
                 time.sleep(0.5)
-  
+
+    def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
+        plotter = ECPlotter()
+        fpath = plotter.plot(self, dirpath = dirpath, format= format, 
+                show = show)
+        return fpath
+
+    def serialize(self, format = SFormats.XML):
+        serializer = ECSerializer()
+        sec = serializer.load(self, format = format)
+        return sec
+
+    def save(self, dirpath = None, format = SFormats.XML):
+        serializer = ECSerializer()
+        path = serializer.save(self, dirpath  = None, format = format)
+        return path
+
     def get_task(self, tid):
         """ Returns a task by its id
 
@@ -380,7 +468,7 @@ class ExperimentController(object):
     def get_resource(self, guid):
         """ Returns a registered ResourceManager by its guid
 
-            :param guid: Id of the task
+            :param guid: Id of the resource
             :type guid: int
             
             :rtype: ResourceManager
@@ -389,6 +477,21 @@ class ExperimentController(object):
         rm = self._resources.get(guid)
         return rm
 
+    def get_resources_by_type(self, rtype):
+        """ Returns a registered ResourceManager by its guid
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of ResourceManagers
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == type: 
+                rms.append(rm)
+        return rms
+
     def remove_resource(self, guid):
         del self._resources[guid]
 
@@ -895,6 +998,9 @@ class ExperimentController(object):
 
         self.wait_released(guids)
 
+        if self.persist:
+            self.save(dirpath = self.run_dir)
+
         for guid in guids:
             if self.get(guid, "hardRelease"):
                 self.remove_resource(guid)
@@ -1000,6 +1106,8 @@ class ExperimentController(object):
 
         """
 
+        self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+        self._runner = ParallelRun(maxthreads = self.nthreads)
         self._runner.start()
 
         while not self._stop: