import threading
from neco.util import guid
+from neco.util.parallel import ParallelRun
from neco.util.timefuncs import strfnow, strfdiff, strfvalid
from neco.execution.resource import ResourceFactory, ResourceAction, \
ResourceState
from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
-from neco.util.parallel import ParallelRun
+from neco.execution.trace import TraceAttr
# TODO: use multiprocessing instead of threading
class ExperimentController(object):
- def __init__(self, root_dir = "/tmp"):
+ def __init__(self, exp_id = None, root_dir = "/tmp"):
super(ExperimentController, self).__init__()
# root directory to store files
self._root_dir = root_dir
+ # experiment identifier given by the user
+ self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+
# generator of globally unique ids
self._guid_generator = guid.GuidGenerator()
self._thread.start()
# Logging
- self._logger = logging.getLogger("neco.execution.ec")
+ self._logger = logging.getLogger("ExperimentController")
@property
def logger(self):
return self._logger
+ @property
+ def exp_id(self):
+ exp_id = self._exp_id
+ if not exp_id.startswith("nepi-"):
+ exp_id = "nepi-" + exp_id
+ return exp_id
def get_task(self, tid):
return self._tasks.get(tid)
rm = self.get_resource(guid1)
rm.register_condition(action, group2, state, time)
+ def register_trace(self, guid, name):
+ """ Enable trace
+
+ :param name: Name of the trace
+ :type name: str
+ """
+ rm = self.get_resource(guid)
+ rm.register_trace(name)
+
+ def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+ """ Get information on collected trace
+
+ :param name: Name of the trace
+ :type name: str
+
+ :param attr: Can be one of:
+ - TraceAttr.ALL (complete trace content),
+ - TraceAttr.STREAM (block in bytes to read starting at offset),
+ - TraceAttr.PATH (full path to the trace file),
+ - TraceAttr.SIZE (size of trace file).
+ :type attr: str
+
+ :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
+ :type name: int
+
+ :rtype: str
+ """
+ rm = self.get_resource(guid)
+ return rm.trace(name, attr, block, offset)
+
def discover(self, guid, filters):
rm = self.get_resource(guid)
return rm.discover(filters)
rm = self.get_resource(guid)
return rm.start_with_condition()
- def deploy(self, group = None, wait_all_deployed = True):
+ def deploy(self, group = None, wait_all_ready = True):
""" Deploy all resource manager in group
:param group: List of guids of RMs to deploy
:type group: list
- :param wait_all_deployed: Wait until all RMs are deployed in
+ :param wait_all_ready: Wait until all RMs are ready in
order to start the RMs
:type guid: int
for guid in group:
rm = self.get_resource(guid)
- if wait_all_deployed:
+ if wait_all_ready:
towait = list(group)
towait.remove(guid)
self.register_condition(guid, ResourceAction.START,