Added serialization to EC
[nepi.git] / src / nepi / execution / ec.py
index 257742e..cf41983 100644 (file)
@@ -24,6 +24,7 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState, ResourceState2str
 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
 from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
 
 # TODO: use multiprocessing instead of threading
 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
@@ -151,6 +152,12 @@ class ExperimentController(object):
         
     """
 
+    @classmethod
+    def load(cls, path, format = SFormats.XML):
+        serializer = ECSerializer()
+        ec = serializer.load(path)
+        return ec
+
     def __init__(self, exp_id = None): 
         super(ExperimentController, self).__init__()
 
@@ -200,8 +207,8 @@ class ExperimentController(object):
 
         # The runner is a pool of threads used to parallelize 
         # execution of tasks
-        nthreads = int(os.environ.get("NEPI_NTHREADS", "20"))
-        self._runner = ParallelRun(maxthreads = nthreads)
+        self._nthreads = 20
+        self._runner = None
 
         # Event processing thread
         self._cond = threading.Condition()
@@ -246,6 +253,14 @@ class ExperimentController(object):
         """
         return self._run_id
 
+    @property
+    def nthreads(self):
+        """ Returns the number of processing nthreads used
+
+        """
+        return self._nthreads
+
     @property
     def abort(self):
         """ Returns True if the experiment has failed and should be interrupted,
@@ -365,7 +380,17 @@ class ExperimentController(object):
                 guids.append(guid)
 
                 time.sleep(0.5)
-  
+
+    def serialize(self, format = SFormats.XML):
+        serializer = ECSerializer()
+        sec = serializer.load(self, format = format)
+        return sec
+
+    def save(self, path, format = SFormats.XML):
+        serializer = ECSerializer()
+        path = serializer.save(self, path, format = format)
+        return path
+
     def get_task(self, tid):
         """ Returns a task by its id
 
@@ -380,7 +405,7 @@ class ExperimentController(object):
     def get_resource(self, guid):
         """ Returns a registered ResourceManager by its guid
 
-            :param guid: Id of the task
+            :param guid: Id of the resource
             :type guid: int
             
             :rtype: ResourceManager
@@ -389,6 +414,21 @@ class ExperimentController(object):
         rm = self._resources.get(guid)
         return rm
 
+    def get_resources_by_type(self, rtype):
+        """ Returns a registered ResourceManager by its guid
+
+            :param rtype: Resource type
+            :type rtype: string
+            
+            :rtype: list of ResourceManagers
+            
+        """
+        rms = []
+        for guid, rm in self._resources.iteritems():
+            if rm.get_rtype() == type: 
+                rms.append(rm)
+        return rms
+
     def remove_resource(self, guid):
         del self._resources[guid]
 
@@ -1000,6 +1040,8 @@ class ExperimentController(object):
 
         """
 
+        self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+        self._runner = ParallelRun(maxthreads = self.nthreads)
         self._runner.start()
 
         while not self._stop: