Added Linux Application
[nepi.git] / src / neco / execution / ec.py
index 01a6aba..07a4ee6 100644 (file)
 import logging
 import os
 import sys
-import threading
 import time
-import weakref
+import threading
 
-from neco.execution import scheduler, tasks
 from neco.util import guid
-from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
 from neco.util.parallel import ParallelRun
+from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
+from neco.execution.resource import ResourceFactory, ResourceAction, \
+        ResourceState
+from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
+from neco.execution.trace import TraceAttr
 
-_reschedule_delay = "0.1s"
+# TODO: use multiprocessing instead of threading
 
 class ExperimentController(object):
-    def __init__(self, root_dir = "/tmp", loglevel = 'error'):
+    def __init__(self, exp_id = None, root_dir = "/tmp"): 
         super(ExperimentController, self).__init__()
         # root directory to store files
         self._root_dir = root_dir
 
+        # experiment identifier given by the user
+        self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
+
         # generator of globally unique ids
         self._guid_generator = guid.GuidGenerator()
         
+        # Resource managers
+        self._resources = dict()
+
+        # Resource managers
+        self._group = dict()
+
         # Scheduler
-        self._scheduler = scheduler.HeapScheduler()
+        self._scheduler = HeapScheduler()
 
         # Tasks
         self._tasks = dict()
-        # Resources
-        self._resources = dict()
-       
+
         # Event processing thread
-        self._cond = threading.Condition()
         self._stop = False
-        self._thread = threading.Thread(target = self._process_tasks)
+        self._cond = threading.Condition()
+        self._thread = threading.Thread(target = self._process)
         self._thread.start()
-       
+
         # Logging
         self._logger = logging.getLogger("neco.execution.ec")
-        self._logger.setLevel(getattr(logging, loglevel.upper()))
 
-    def resource(self, guid):
+    @property
+    def logger(self):
+        return self._logger
+
+    @property
+    def exp_id(self):
+        exp_id = self._exp_id
+        if not exp_id.startswith("nepi-"):
+            exp_id = "nepi-" + exp_id
+        return exp_id
+
+    def get_task(self, tid):
+        return self._tasks.get(tid)
+
+    def get_resource(self, guid):
         return self._resources.get(guid)
 
-    def terminate(self):
+    @property
+    def resources(self):
+        return self._resources.keys()
+
+    def register_resource(self, rtype, guid = None):
+        # Get next available guid
+        guid = self._guid_generator.next(guid)
+        
+        # Instantiate RM
+        rm = ResourceFactory.create(rtype, self, guid)
+
+        # Store RM
+        self._resources[guid] = rm
+
+        return guid
+
+    def register_group(self, group):
+        guid = self._guid_generator.next()
+
+        if not isinstance(group, list):
+            group = [group] 
+
+        self._groups[guid] = group
+
+        return guid
+
+    def get_attributes(self, guid):
+        rm = self.get_resource(guid)
+        return rm.get_attributes()
+
+    def get_filters(self, guid):
+        rm = self.get_resource(guid)
+        return rm.get_filters()
+
+    def register_connection(self, guid1, guid2):
+        rm1 = self.get_resource(guid1)
+        rm2 = self.get_resource(guid2)
+
+        rm1.connect(guid2)
+        rm2.connect(guid1)
+
+    def register_condition(self, group1, action, group2, state,
+            time = None):
+        """ Registers an action START or STOP for all RM on group1 to occur 
+            time 'time' after all elements in group2 reached state 'state'.
+
+            :param group1: List of guids of RMs subjected to action
+            :type group1: list
+
+            :param action: Action to register (either START or STOP)
+            :type action: ResourceAction
+
+            :param group2: List of guids of RMs to we waited for
+            :type group2: list
+
+            :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+            :type state: ResourceState
+
+            :param time: Time to wait after group2 has reached status 
+            :type time: string
+
+        """
+        if isinstance(group1, int):
+            group1 = [group1]
+        if isinstance(group2, int):
+            group2 = [group2]
+
+        for guid1 in group1:
+            rm = self.get_resource(guid1)
+            rm.register_condition(action, group2, state, time)
+
+    def register_trace(self, guid, name):
+        """ Enable trace
+
+        :param name: Name of the trace
+        :type name: str
+        """
+        rm = self.get_resource(guid)
+        rm.register_trace(name)
+
+    def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
+        """ Get information on collected trace
+
+        :param name: Name of the trace
+        :type name: str
+
+        :param attr: Can be one of:
+                         - TraceAttr.ALL (complete trace content), 
+                         - TraceAttr.STREAM (block in bytes to read starting at offset), 
+                         - TraceAttr.PATH (full path to the trace file),
+                         - TraceAttr.SIZE (size of trace file). 
+        :type attr: str
+
+        :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
+        :type name: int
+
+        :rtype: str
+        """
+        rm = self.get_resource(guid)
+        return rm.trace(name, attr, block, offset)
+
+    def discover(self, guid, filters):
+        rm = self.get_resource(guid)
+        return rm.discover(filters)
+
+    def provision(self, guid, filters):
+        rm = self.get_resource(guid)
+        return rm.provision(filters)
+
+    def get(self, guid, name):
+        rm = self.get_resource(guid)
+        return rm.get(name)
+
+    def set(self, guid, name, value):
+        rm = self.get_resource(guid)
+        return rm.set(name, value)
+
+    def state(self, guid):
+        rm = self.get_resource(guid)
+        return rm.state
+
+    def stop(self, guid):
+        rm = self.get_resource(guid)
+        return rm.stop()
+
+    def start(self, guid):
+        rm = self.get_resource(guid)
+        return rm.start()
+
+    def set_with_conditions(self, name, value, group1, group2, state,
+            time = None):
+        """ Set value 'value' on attribute with name 'name' on all RMs of
+            group1 when 'time' has elapsed since all elements in group2 
+            have reached state 'state'.
+
+            :param name: Name of attribute to set in RM
+            :type name: string
+
+            :param value: Value of attribute to set in RM
+            :type name: string
+
+            :param group1: List of guids of RMs subjected to action
+            :type group1: list
+
+            :param action: Action to register (either START or STOP)
+            :type action: ResourceAction
+
+            :param group2: List of guids of RMs to we waited for
+            :type group2: list
+
+            :param state: State to wait for on RMs (STARTED, STOPPED, etc)
+            :type state: ResourceState
+
+            :param time: Time to wait after group2 has reached status 
+            :type time: string
+
+        """
+        if isinstance(group1, int):
+            group1 = [group1]
+        if isinstance(group2, int):
+            group2 = [group2]
+
+        for guid1 in group1:
+            rm = self.get_resource(guid)
+            rm.set_with_conditions(name, value, group2, state, time)
+
+    def stop_with_conditions(self, guid):
+        rm = self.get_resource(guid)
+        return rm.stop_with_conditions()
+
+    def start_with_conditions(self, guid):
+        rm = self.get_resource(guid)
+        return rm.start_with_condition()
+
+    def deploy(self, group = None, wait_all_ready = True):
+        """ Deploy all resource manager in group
+
+        :param group: List of guids of RMs to deploy
+        :type group: list
+
+        :param wait_all_ready: Wait until all RMs are ready in
+            order to start the RMs
+        :type guid: int
+
+        """
+        self.logger.debug(" ------- DEPLOY START ------ ")
+
+        def steps(rm):
+            rm.deploy()
+            rm.start_with_conditions()
+
+            # Only if the RM has STOP consitions we
+            # schedule a stop. Otherwise the RM will stop immediately
+            if rm.conditions.get(ResourceAction.STOP):
+                rm.stop_with_conditions()
+
+        if not group:
+            group = self.resources
+
+        threads = []
+        for guid in group:
+            rm = self.get_resource(guid)
+
+            if wait_all_ready:
+                towait = list(group)
+                towait.remove(guid)
+                self.register_condition(guid, ResourceAction.START, 
+                        towait, ResourceState.READY)
+
+            thread = threading.Thread(target = steps, args = (rm,))
+            threads.append(thread)
+            thread.start()
+
+        for thread in threads:
+            thread.join()
+
+    def release(self, group = None):
+        if not group:
+            group = self.resources
+
+        threads = []
+        for guid in group:
+            rm = self.get_resource(guid)
+            thread = threading.Thread(target=rm.release)
+            threads.append(thread)
+            thread.start()
+
+        for thread in threads:
+            thread.join()
+
+    def shutdown(self):
+        self.release()
+        
         self._stop = True
         self._cond.acquire()
         self._cond.notify()
@@ -51,14 +307,9 @@ class ExperimentController(object):
         if self._thread.is_alive():
            self._thread.join()
 
-    def task_info(self, tid):
-        task = self._tasks.get(tid)
-        if not task:
-            return (None, None)
-        return (task.status, task.result)
+    def schedule(self, date, callback, track = False):
+        """ Schedule a callback to be executed at time date.
 
-    def schedule(self, date, callback, args = None, kwargs = None):
-        """
             date    string containing execution time for the task.
                     It can be expressed as an absolute time, using
                     timestamp format, or as a relative time matching
@@ -67,42 +318,26 @@ class ExperimentController(object):
             callback    code to be executed for the task. Must be a
                         Python function, and receives args and kwargs
                         as arguments.
-                        The callback will always be invoked passing a 
-                        week reference to the controller as first 
-                        argument.
-                        The callback must return a (status, result) 
-                        tuple where status is one of : 
-                        task.TaskStatus.FAIL, 
-                        task.TaskStatus.SUCCESS, 
-                        task.TaskStatus.RETRY, 
-                        task.TaskStatus.RECYCLE 
+
+            track   if set to True, the task will be retrivable with
+                    the get_task() method
         """
         timestamp = strfvalid(date)
         
-        args = args or []
-        kwargs = kwargs or {}
-
-        task = tasks.Task(timestamp, callback, args, kwargs)
-        task = self._schedule(task)
-
-        self._tasks[task.id] = task
-
-        return task.id
-
-    ###########################################################################
-    #### Internal methods
-    ###########################################################################
-
-    def _schedule(self, task):
+        task = Task(timestamp, callback)
         task = self._scheduler.schedule(task)
 
+        if track:
+            self._tasks[task.id] = task
+  
         # Notify condition to wake up the processing thread
         self._cond.acquire()
         self._cond.notify()
         self._cond.release()
-        return task
+
+        return task.id
      
-    def _process_tasks(self):
+    def _process(self):
         runner = ParallelRun(maxthreads = 50)
         runner.start()
 
@@ -133,32 +368,23 @@ class ExperimentController(object):
                         self._cond.release()
                     else:
                         # Process tasks in parallel
-                        runner.put(self._execute_task, task)
+                        runner.put(self._execute, task)
         except:  
             import traceback
             err = traceback.format_exc()
             self._logger.error("Error while processing tasks in the EC: %s" % err)
-    def _execute_task(self, task):
+
+    def _execute(self, task):
         # Invoke callback
-        ec = weakref.ref(self)
+        task.status = TaskStatus.DONE
+
         try:
-            (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs)
+            task.result = task.callback()
         except:
             import traceback
             err = traceback.format_exc()
             self._logger.error("Error while executing event: %s" % err)
 
-            # task marked as FAIL
-            task.status = tasks.TaskStatus.FAIL
             task.result = err
-
-        if task.status == tasks.TaskStatus.RETRY:
-            # Re-schedule same task in the near future
-            task.timestamp = strfvalid(_reschedule_delay)
-            self._schedule(task)
-        elif task.status == tasks.TaskStatus.RECYCLE:
-            # Re-schedule t in the future
-            timestamp = strfvalid(task.result)
-            self.schedule(timestamp, task.callback, task.args, task.kwargs)
+            task.status = TaskStatus.ERROR