8 from neco.execution import scheduler, tasks
9 from neco.util import guid
10 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
11 from neco.util.parallel import ParallelRun
13 _reschedule_delay = "0.1s"
15 class ExperimentController(object):
16 def __init__(self, root_dir = "/tmp", loglevel = 'error'):
17 super(ExperimentController, self).__init__()
18 # root directory to store files
19 self._root_dir = root_dir
21 # generator of globally unique ids
22 self._guid_generator = guid.GuidGenerator()
25 self._scheduler = scheduler.HeapScheduler()
31 self._resources = dict()
33 # Event processing thread
34 self._cond = threading.Condition()
36 self._thread = threading.Thread(target = self._process_tasks)
40 self._logger = logging.getLogger("neco.execution.ec")
41 self._logger.setLevel(getattr(logging, loglevel.upper()))
43 def resource(self, guid):
44 return self._resources.get(guid)
51 if self._thread.is_alive():
54 def task_info(self, tid):
55 task = self._tasks.get(tid)
58 return (task.status, task.result)
60 def schedule(self, date, callback, args = None, kwargs = None):
62 date string containing execution time for the task.
63 It can be expressed as an absolute time, using
64 timestamp format, or as a relative time matching
65 ^\d+.\d+(h|m|s|ms|us)$
67 callback code to be executed for the task. Must be a
68 Python function, and receives args and kwargs
70 The callback will always be invoked passing a
71 week reference to the controller as first
73 The callback must return a (status, result)
74 tuple where status is one of :
76 task.TaskStatus.SUCCESS,
77 task.TaskStatus.RETRY,
78 task.TaskStatus.RECYCLE
80 timestamp = strfvalid(date)
85 task = tasks.Task(timestamp, callback, args, kwargs)
86 task = self._schedule(task)
88 self._tasks[task.id] = task
92 ###########################################################################
94 ###########################################################################
96 def _schedule(self, task):
97 task = self._scheduler.schedule(task)
99 # Notify condition to wake up the processing thread
105 def _process_tasks(self):
106 runner = ParallelRun(maxthreads = 50)
110 while not self._stop:
112 task = self._scheduler.next()
116 # It there are not tasks in the tasks queue we need to
117 # wait until a call to schedule wakes us up
122 # If the task timestamp is in the future the thread needs to wait
123 # until time elapse or until another task is scheduled
125 if now < task.timestamp:
126 # Calculate time difference in seconds
127 timeout = strfdiff(task.timestamp, now)
128 # Re-schedule task with the same timestamp
129 self._scheduler.schedule(task)
130 # Sleep until timeout or until a new task awakes the condition
132 self._cond.wait(timeout)
135 # Process tasks in parallel
136 runner.put(self._execute_task, task)
139 err = traceback.format_exc()
140 self._logger.error("Error while processing tasks in the EC: %s" % err)
142 def _execute_task(self, task):
144 ec = weakref.ref(self)
146 (task.status, task.result) = task.callback(ec, *task.args, **task.kwargs)
149 err = traceback.format_exc()
150 self._logger.error("Error while executing event: %s" % err)
152 # task marked as FAIL
153 task.status = tasks.TaskStatus.FAIL
156 if task.status == tasks.TaskStatus.RETRY:
157 # Re-schedule same task in the near future
158 task.timestamp = strfvalid(_reschedule_delay)
160 elif task.status == tasks.TaskStatus.RECYCLE:
161 # Re-schedule t in the future
162 timestamp = strfvalid(task.result)
163 self.schedule(timestamp, task.callback, task.args, task.kwargs)