NECo: A tool to design and run experiments on arbitrary platforms.
[nepi.git] / src / neco / execution / ec.py
1 import logging
2 import os
3 import sys
4 import threading
5 import time
6 import weakref
7
8 from neco.execution import scheduler, tasks
9 from neco.util import guid
10 from neco.util.timefuncs import strfnow, strfdiff, strfvalid 
11 from neco.util.parallel import ParallelRun
12
13 _reschedule_delay = "0.1s"
14
15 class ExperimentController(object):
16     def __init__(self, root_dir = "/tmp", loglevel = 'error'):
17         super(ExperimentController, self).__init__()
18         # root directory to store files
19         self._root_dir = root_dir
20
21         # generator of globally unique ids
22         self._guid_generator = guid.GuidGenerator()
23         
24         # Scheduler
25         self._scheduler = scheduler.HeapScheduler()
26
27         # Tasks
28         self._tasks = dict()
29  
30         # Resources
31         self._resources = dict()
32        
33         # Event processing thread
34         self._cond = threading.Condition()
35         self._stop = False
36         self._thread = threading.Thread(target = self._process_tasks)
37         self._thread.start()
38        
39         # Logging
40         self._logger = logging.getLogger("neco.execution.ec")
41         self._logger.setLevel(getattr(logging, loglevel.upper()))
42
43     def resource(self, guid):
44         return self._resources.get(guid)
45
46     def terminate(self):
47         self._stop = True
48         self._cond.acquire()
49         self._cond.notify()
50         self._cond.release()
51         if self._thread.is_alive():
52            self._thread.join()
53
54     def task_info(self, tid):
55         task = self._tasks.get(tid)
56         if not task:
57             return (None, None)
58         return (task.status, task.result)
59
60     def schedule(self, date, callback, args = None, kwargs = None):
61         """
62             date    string containing execution time for the task.
63                     It can be expressed as an absolute time, using
64                     timestamp format, or as a relative time matching
65                     ^\d+.\d+(h|m|s|ms|us)$
66
67             callback    code to be executed for the task. Must be a
68                         Python function, and receives args and kwargs
69                         as arguments.
70                         The callback will always be invoked passing a 
71                         week reference to the controller as first 
72                         argument.
73                         The callback must return a (status, result) 
74                         tuple where status is one of : 
75                         task.TaskStatus.FAIL, 
76                         task.TaskStatus.SUCCESS, 
77                         task.TaskStatus.RETRY, 
78                         task.TaskStatus.RECYCLE 
79         """
80         timestamp = strfvalid(date)
81         
82         args = args or []
83         kwargs = kwargs or {}
84
85         task = tasks.Task(timestamp, callback, args, kwargs)
86         task = self._schedule(task)
87
88         self._tasks[task.id] = task
89
90         return task.id
91
92     ###########################################################################
93     #### Internal methods
94     ###########################################################################
95
96     def _schedule(self, task):
97         task = self._scheduler.schedule(task)
98
99         # Notify condition to wake up the processing thread
100         self._cond.acquire()
101         self._cond.notify()
102         self._cond.release()
103         return task
104      
105     def _process_tasks(self):
106         runner = ParallelRun(maxthreads = 50)
107         runner.start()
108
109         try:
110             while not self._stop:
111                 self._cond.acquire()
112                 task = self._scheduler.next()
113                 self._cond.release()
114
115                 if not task:
116                     # It there are not tasks in the tasks queue we need to 
117                     # wait until a call to schedule wakes us up
118                     self._cond.acquire()
119                     self._cond.wait()
120                     self._cond.release()
121                 else: 
122                     # If the task timestamp is in the future the thread needs to wait
123                     # until time elapse or until another task is scheduled
124                     now = strfnow()
125                     if now < task.timestamp:
126                         # Calculate time difference in seconds
127                         timeout = strfdiff(task.timestamp, now)
128                         # Re-schedule task with the same timestamp
129                         self._scheduler.schedule(task)
130                         # Sleep until timeout or until a new task awakes the condition
131                         self._cond.acquire()
132                         self._cond.wait(timeout)
133                         self._cond.release()
134                     else:
135                         # Process tasks in parallel
136                         runner.put(self._execute_task, task)
137         except:  
138             import traceback
139             err = traceback.format_exc()
140             self._logger.error("Error while processing tasks in the EC: %s" % err)
141  
142     def _execute_task(self, task):
143         # Invoke callback
144         ec = weakref.ref(self)
145         try:
146             (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs)
147         except:
148             import traceback
149             err = traceback.format_exc()
150             self._logger.error("Error while executing event: %s" % err)
151
152             # task marked as FAIL
153             task.status = tasks.TaskStatus.FAIL
154             task.result = err
155
156         if task.status == tasks.TaskStatus.RETRY:
157             # Re-schedule same task in the near future
158             task.timestamp = strfvalid(_reschedule_delay)
159             self._schedule(task)
160         elif task.status == tasks.TaskStatus.RECYCLE:
161             # Re-schedule t in the future
162             timestamp = strfvalid(task.result)
163             self.schedule(timestamp, task.callback, task.args, task.kwargs)
164