Added Linux Application
[nepi.git] / src / neco / execution / ec.py
index a7a33d8..07a4ee6 100644 (file)
@@ -5,20 +5,24 @@ import time
 import threading
 
 from neco.util import guid
+from neco.util.parallel import ParallelRun
 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
 from neco.execution.resource import ResourceFactory, ResourceAction, \
         ResourceState
 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
-from neco.util.parallel import ParallelRun
+from neco.execution.trace import TraceAttr
 
 # TODO: use multiprocessing instead of threading
 
 class ExperimentController(object):
-    def __init__(self, root_dir = "/tmp", loglevel = 'error'): 
+    def __init__(self, exp_id = None, root_dir = "/tmp"): 
         super(ExperimentController, self).__init__()
         # root directory to store files
         self._root_dir = root_dir
 
+        # experiment identifier given by the user
+        self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
@@ -42,7 +46,17 @@ class ExperimentController(object):
 
         # Logging
         self._logger = logging.getLogger("neco.execution.ec")
-        self._logger.setLevel(getattr(logging, loglevel.upper()))
+
+    @property
+    def logger(self):
+        return self._logger
+
+    @property
+    def exp_id(self):
+        exp_id = self._exp_id
+        if not exp_id.startswith("nepi-"):
+            exp_id = "nepi-" + exp_id
+        return exp_id
 
     def get_task(self, tid):
         return self._tasks.get(tid)
@@ -66,15 +80,15 @@ class ExperimentController(object):
 
         return guid
 
-    def create_group(self, *args):
-        guid = self._guid_generator.next(guid)
+    def register_group(self, group):
+        guid = self._guid_generator.next()
 
-        grp = [arg for arg in args]
+        if not isinstance(group, list):
+            group = [group] 
 
-        self._resources[guid] = grp
+        self._groups[guid] = group
 
         return guid
 
     def get_attributes(self, guid):
         rm = self.get_resource(guid)
@@ -121,6 +135,39 @@ class ExperimentController(object):
             rm = self.get_resource(guid1)
             rm.register_condition(action, group2, state, time)
 
+    def register_trace(self, guid, name):
+        """ Enable trace
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        rm = self.get_resource(guid)
+        rm.register_trace(name)
+
+    def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        """ Get information on collected trace
+
+        :param name: Name of the trace
+        :type name: str
+
+        :param attr: Can be one of:
+                         - TraceAttr.ALL (complete trace content), 
+                         - TraceAttr.STREAM (block in bytes to read starting at offset), 
+                         - TraceAttr.PATH (full path to the trace file),
+                         - TraceAttr.SIZE (size of trace file). 
+        :type attr: str
+
+        :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :rtype: str
+        """
+        rm = self.get_resource(guid)
+        return rm.trace(name, attr, block, offset)
+
     def discover(self, guid, filters):
         rm = self.get_resource(guid)
         return rm.discover(filters)
@@ -194,17 +241,19 @@ class ExperimentController(object):
         rm = self.get_resource(guid)
         return rm.start_with_condition()
 
-    def deploy(self, group = None, wait_all_deployed = True):
+    def deploy(self, group = None, wait_all_ready = True):
         """ Deploy all resource manager in group
 
         :param group: List of guids of RMs to deploy
         :type group: list
 
-        :param wait_all_deployed: Wait until all RMs are deployed in
+        :param wait_all_ready: Wait until all RMs are ready in
             order to start the RMs
         :type guid: int
 
         """
+        self.logger.debug(" ------- DEPLOY START ------ ")
+
         def steps(rm):
             rm.deploy()
             rm.start_with_conditions()
@@ -221,11 +270,11 @@ class ExperimentController(object):
         for guid in group:
             rm = self.get_resource(guid)
 
-            if wait_all_deployed:
+            if wait_all_ready:
                 towait = list(group)
                 towait.remove(guid)
                 self.register_condition(guid, ResourceAction.START, 
-                        towait, ResourceState.DEPLOYED)
+                        towait, ResourceState.READY)
 
             thread = threading.Thread(target = steps, args = (rm,))
             threads.append(thread)