ResourceState, ResourceState2str
from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
from nepi.execution.trace import TraceAttr
+from nepi.util.serializer import ECSerializer, SFormats
+from nepi.util.plotter import ECPlotter, PFormats
# TODO: use multiprocessing instead of threading
# TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
import logging
import os
import sys
+import tempfile
import time
import threading
import weakref
exp_id, which can be re-used in different ExperimentController,
and the run_id, which is unique to one ExperimentController instance, and
is automatically generated by NEPI.
-
+
"""
- def __init__(self, exp_id = None):
+ @classmethod
+ def load(cls, filepath, format = SFormats.XML):
+ serializer = ECSerializer()
+ ec = serializer.load(filepath)
+ return ec
+
+ def __init__(self, exp_id = None, local_dir = None, persist = False):
+ """ ExperimentController entity to model an execute a network experiment.
+
+ :param exp_id: Human readable name to identify the experiment
+ :type name: str
+
+ :param local_dir: Path to local directory where to store experiment
+ related files
+ :type name: str
+
+ :param persist: Save an XML description of the experiment after
+ completion at local_dir
+ :type name: bool
+
+ """
+
super(ExperimentController, self).__init__()
# Logging
# resources used, etc)
self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
+ # Local path where to store experiment related files (results, etc)
+ if not local_dir:
+ local_dir = tempfile.mkdtemp()
+
+ self._local_dir = local_dir
+ self._exp_dir = os.path.join(local_dir, self.exp_id)
+ self._run_dir = os.path.join(self.exp_dir, self.run_id)
+
+ # If True persist the experiment controller in XML format, after completion
+ self._persist = persist
+
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
# The runner is a pool of threads used to parallelize
# execution of tasks
- nthreads = int(os.environ.get("NEPI_NTHREADS", "20"))
- self._runner = ParallelRun(maxthreads = nthreads)
+ self._nthreads = 20
+ self._runner = None
# Event processing thread
self._cond = threading.Condition()
"""
return self._run_id
+ @property
+ def nthreads(self):
+ """ Returns the number of processing nthreads used
+
+ """
+ return self._nthreads
+
+ @property
+ def local_dir(self):
+ """ Root local directory for experiment files
+
+ """
+ return self._local_dir
+
+ @property
+ def exp_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment.
+
+ """
+ return self._exp_dir
+
+ @property
+ def run_dir(self):
+ """ Local directory to store results and other files related to the
+ experiment run.
+
+ """
+ return self._run_dir
+
+ @property
+ def persist(self):
+ """ If Trie persist the ExperimentController to XML format upon completion
+
+ """
+ return self._persist
+
@property
def abort(self):
""" Returns True if the experiment has failed and should be interrupted,
guids.append(guid)
time.sleep(0.5)
-
+
+ def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
+ plotter = ECPlotter()
+ fpath = plotter.plot(self, dirpath = dirpath, format= format,
+ show = show)
+ return fpath
+
+ def serialize(self, format = SFormats.XML):
+ serializer = ECSerializer()
+ sec = serializer.load(self, format = format)
+ return sec
+
+ def save(self, dirpath = None, format = SFormats.XML):
+ serializer = ECSerializer()
+ path = serializer.save(self, dirpath = None, format = format)
+ return path
+
def get_task(self, tid):
""" Returns a task by its id
def get_resource(self, guid):
""" Returns a registered ResourceManager by its guid
- :param guid: Id of the task
+ :param guid: Id of the resource
:type guid: int
:rtype: ResourceManager
rm = self._resources.get(guid)
return rm
+ def get_resources_by_type(self, rtype):
+ """ Returns a registered ResourceManager by its guid
+
+ :param rtype: Resource type
+ :type rtype: string
+
+ :rtype: list of ResourceManagers
+
+ """
+ rms = []
+ for guid, rm in self._resources.iteritems():
+ if rm.get_rtype() == type:
+ rms.append(rm)
+ return rms
+
def remove_resource(self, guid):
del self._resources[guid]
self.wait_released(guids)
+ if self.persist:
+ self.save(dirpath = self.run_dir)
+
for guid in guids:
if self.get(guid, "hardRelease"):
self.remove_resource(guid)
"""
+ self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
+ self._runner = ParallelRun(maxthreads = self.nthreads)
self._runner.start()
while not self._stop: