X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fexecution%2Fec.py;h=1357e4f7575525a59c5387ce2c40a6243f817957;hb=bac63fdc5983e2ade1902f711c1e7899d82ca4ae;hp=cf0e0bb701167cb810a07a589c95c38b77dbfa85;hpb=6be239a7c874733f31bd0d1085f5d52072ee411d;p=nepi.git diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index cf0e0bb7..1357e4f7 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -24,6 +24,8 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, \ ResourceState, ResourceState2str from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus from nepi.execution.trace import TraceAttr +from nepi.util.serializer import ECSerializer, SFormats +from nepi.util.plotter import ECPlotter, PFormats # TODO: use multiprocessing instead of threading # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode) @@ -32,6 +34,7 @@ import functools import logging import os import sys +import tempfile import time import threading import weakref @@ -67,7 +70,7 @@ class FailureManager(object): def eval_failure(self, guid): if self._failure_level == FailureLevel.OK: - rm = self.get_resource(guid) + rm = self.ec.get_resource(guid) state = rm.state critical = rm.get("critical") @@ -148,10 +151,31 @@ class ExperimentController(object): exp_id, which can be re-used in different ExperimentController, and the run_id, which is unique to one ExperimentController instance, and is automatically generated by NEPI. - + """ - def __init__(self, exp_id = None): + @classmethod + def load(cls, filepath, format = SFormats.XML): + serializer = ECSerializer() + ec = serializer.load(filepath) + return ec + + def __init__(self, exp_id = None, local_dir = None, persist = False): + """ ExperimentController entity to model an execute a network experiment. + + :param exp_id: Human readable name to identify the experiment + :type name: str + + :param local_dir: Path to local directory where to store experiment + related files + :type name: str + + :param persist: Save an XML description of the experiment after + completion at local_dir + :type name: bool + + """ + super(ExperimentController, self).__init__() # Logging @@ -169,6 +193,17 @@ class ExperimentController(object): # resources used, etc) self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex') + # Local path where to store experiment related files (results, etc) + if not local_dir: + local_dir = tempfile.mkdtemp() + + self._local_dir = local_dir + self._exp_dir = os.path.join(local_dir, self.exp_id) + self._run_dir = os.path.join(self.exp_dir, self.run_id) + + # If True persist the experiment controller in XML format, after completion + self._persist = persist + # generator of globally unique ids self._guid_generator = guid.GuidGenerator() @@ -200,8 +235,8 @@ class ExperimentController(object): # The runner is a pool of threads used to parallelize # execution of tasks - nthreads = int(os.environ.get("NEPI_NTHREADS", "20")) - self._runner = ParallelRun(maxthreads = nthreads) + self._nthreads = 20 + self._runner = None # Event processing thread self._cond = threading.Condition() @@ -246,6 +281,43 @@ class ExperimentController(object): """ return self._run_id + @property + def nthreads(self): + """ Returns the number of processing nthreads used + + """ + return self._nthreads + + @property + def local_dir(self): + """ Root local directory for experiment files + + """ + return self._local_dir + + @property + def exp_dir(self): + """ Local directory to store results and other files related to the + experiment. + + """ + return self._exp_dir + + @property + def run_dir(self): + """ Local directory to store results and other files related to the + experiment run. + + """ + return self._run_dir + + @property + def persist(self): + """ If Trie persist the ExperimentController to XML format upon completion + + """ + return self._persist + @property def abort(self): """ Returns True if the experiment has failed and should be interrupted, @@ -365,7 +437,23 @@ class ExperimentController(object): guids.append(guid) time.sleep(0.5) - + + def plot(self, dirpath = None, format= PFormats.FIGURE, show = False): + plotter = ECPlotter() + fpath = plotter.plot(self, dirpath = dirpath, format= format, + show = show) + return fpath + + def serialize(self, format = SFormats.XML): + serializer = ECSerializer() + sec = serializer.load(self, format = format) + return sec + + def save(self, dirpath = None, format = SFormats.XML): + serializer = ECSerializer() + path = serializer.save(self, dirpath = None, format = format) + return path + def get_task(self, tid): """ Returns a task by its id @@ -380,7 +468,7 @@ class ExperimentController(object): def get_resource(self, guid): """ Returns a registered ResourceManager by its guid - :param guid: Id of the task + :param guid: Id of the resource :type guid: int :rtype: ResourceManager @@ -389,6 +477,21 @@ class ExperimentController(object): rm = self._resources.get(guid) return rm + def get_resources_by_type(self, rtype): + """ Returns a registered ResourceManager by its guid + + :param rtype: Resource type + :type rtype: string + + :rtype: list of ResourceManagers + + """ + rms = [] + for guid, rm in self._resources.iteritems(): + if rm.get_rtype() == type: + rms.append(rm) + return rms + def remove_resource(self, guid): del self._resources[guid] @@ -895,6 +998,9 @@ class ExperimentController(object): self.wait_released(guids) + if self.persist: + self.save(dirpath = self.run_dir) + for guid in guids: if self.get(guid, "hardRelease"): self.remove_resource(guid) @@ -1000,6 +1106,8 @@ class ExperimentController(object): """ + self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads))) + self._runner = ParallelRun(maxthreads = self.nthreads) self._runner.start() while not self._stop: