X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=cf41983859f7d31ed2201ac880cdd053877cd94a;hb=561719f7cb8a42f139905c33e75ec5071c321170;hp=257742eef99b14d69fdb9178b4a8b4cc4711132b;hpb=55ae6a7a02598cb490163bfc57a79f37f24d8974;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 257742ee..cf419838 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -24,6 +24,7 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \ ResourceState, ResourceState2str from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr +from nepi.util.serializer import ECSerializer, SFormats # TODO: use multiprocessing instead of threading # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) @@ -151,6 +152,12 @@ class ExperimentController(object): """ + @classmethod + def load(cls, path, format = SFormats.XML): + serializer = ECSerializer() + ec = serializer.load(path) + return ec + def __init__(self, exp_id = None): super(ExperimentController, self).__init__() @@ -200,8 +207,8 @@ class ExperimentController(object): # The runner is a pool of threads used to parallelize # execution of tasks - nthreads = int(os.environ.get("NEPI_NTHREADS", "20")) - self._runner = ParallelRun(maxthreads = nthreads) + self._nthreads = 20 + self._runner = None # Event processing thread self._cond = threading.Condition() @@ -246,6 +253,14 @@ class ExperimentController(object): """ return self._run_id + @property + def nthreads(self): + """ Returns the number of processing nthreads used + + """ + return self._nthreads + + @property def abort(self): """ Returns True if the experiment has failed and should be interrupted, @@ -365,7 +380,17 @@ class ExperimentController(object): guids.append(guid) time.sleep(0.5) - + + def serialize(self, format = SFormats.XML): + serializer = ECSerializer() + sec = serializer.load(self, format = format) + return sec + + def save(self, path, format = SFormats.XML): + serializer = ECSerializer() + path = serializer.save(self, path, format = format) + return path + def get_task(self, tid): """ Returns a task by its id @@ -380,7 +405,7 @@ class ExperimentController(object): def get_resource(self, guid): """ Returns a registered ResourceManager by its guid - :param guid: Id of the task + :param guid: Id of the resource :type guid: int :rtype: ResourceManager @@ -389,6 +414,21 @@ class ExperimentController(object): rm = self._resources.get(guid) return rm + def get_resources_by_type(self, rtype): + """ Returns a registered ResourceManager by its guid + + :param rtype: Resource type + :type rtype: string + + :rtype: list of ResourceManagers + + """ + rms = [] + for guid, rm in self._resources.iteritems(): + if rm.get_rtype() == type: + rms.append(rm) + return rms + def remove_resource(self, guid): del self._resources[guid] @@ -1000,6 +1040,8 @@ class ExperimentController(object): """ + self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads))) + self._runner = ParallelRun(maxthreads = self.nthreads) self._runner.start() while not self._stop: