7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
9 from neco.execution.resource import ResourceFactory, ResourceAction, \
11 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
12 from neco.util.parallel import ParallelRun
14 # TODO: use multiprocessing instead of threading
16 class ExperimentController(object):
17 def __init__(self, root_dir = "/tmp"):
18 super(ExperimentController, self).__init__()
19 # root directory to store files
20 self._root_dir = root_dir
22 # generator of globally unique ids
23 self._guid_generator = guid.GuidGenerator()
26 self._resources = dict()
32 self._scheduler = HeapScheduler()
37 # Event processing thread
39 self._cond = threading.Condition()
40 self._thread = threading.Thread(target = self._process)
44 self._logger = logging.getLogger("neco.execution.ec")
51 def get_task(self, tid):
52 return self._tasks.get(tid)
54 def get_resource(self, guid):
55 return self._resources.get(guid)
59 return self._resources.keys()
61 def register_resource(self, rtype, guid = None):
62 # Get next available guid
63 guid = self._guid_generator.next(guid)
66 rm = ResourceFactory.create(rtype, self, guid)
69 self._resources[guid] = rm
73 def register_group(self, group):
74 guid = self._guid_generator.next()
76 if not isinstance(group, list):
79 self._groups[guid] = group
83 def get_attributes(self, guid):
84 rm = self.get_resource(guid)
85 return rm.get_attributes()
87 def get_filters(self, guid):
88 rm = self.get_resource(guid)
89 return rm.get_filters()
91 def register_connection(self, guid1, guid2):
92 rm1 = self.get_resource(guid1)
93 rm2 = self.get_resource(guid2)
98 def register_condition(self, group1, action, group2, state,
100 """ Registers an action START or STOP for all RM on group1 to occur
101 time 'time' after all elements in group2 reached state 'state'.
103 :param group1: List of guids of RMs subjected to action
106 :param action: Action to register (either START or STOP)
107 :type action: ResourceAction
109 :param group2: List of guids of RMs to we waited for
112 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
113 :type state: ResourceState
115 :param time: Time to wait after group2 has reached status
119 if isinstance(group1, int):
121 if isinstance(group2, int):
125 rm = self.get_resource(guid1)
126 rm.register_condition(action, group2, state, time)
128 def discover(self, guid, filters):
129 rm = self.get_resource(guid)
130 return rm.discover(filters)
132 def provision(self, guid, filters):
133 rm = self.get_resource(guid)
134 return rm.provision(filters)
136 def get(self, guid, name):
137 rm = self.get_resource(guid)
140 def set(self, guid, name, value):
141 rm = self.get_resource(guid)
142 return rm.set(name, value)
144 def state(self, guid):
145 rm = self.get_resource(guid)
148 def stop(self, guid):
149 rm = self.get_resource(guid)
152 def start(self, guid):
153 rm = self.get_resource(guid)
156 def set_with_conditions(self, name, value, group1, group2, state,
158 """ Set value 'value' on attribute with name 'name' on all RMs of
159 group1 when 'time' has elapsed since all elements in group2
160 have reached state 'state'.
162 :param name: Name of attribute to set in RM
165 :param value: Value of attribute to set in RM
168 :param group1: List of guids of RMs subjected to action
171 :param action: Action to register (either START or STOP)
172 :type action: ResourceAction
174 :param group2: List of guids of RMs to we waited for
177 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
178 :type state: ResourceState
180 :param time: Time to wait after group2 has reached status
184 if isinstance(group1, int):
186 if isinstance(group2, int):
190 rm = self.get_resource(guid)
191 rm.set_with_conditions(name, value, group2, state, time)
193 def stop_with_conditions(self, guid):
194 rm = self.get_resource(guid)
195 return rm.stop_with_conditions()
197 def start_with_conditions(self, guid):
198 rm = self.get_resource(guid)
199 return rm.start_with_condition()
201 def deploy(self, group = None, wait_all_deployed = True):
202 """ Deploy all resource manager in group
204 :param group: List of guids of RMs to deploy
207 :param wait_all_deployed: Wait until all RMs are deployed in
208 order to start the RMs
212 self.logger.debug(" ------- DEPLOY START ------ ")
216 rm.start_with_conditions()
218 # Only if the RM has STOP consitions we
219 # schedule a stop. Otherwise the RM will stop immediately
220 if rm.conditions.get(ResourceAction.STOP):
221 rm.stop_with_conditions()
224 group = self.resources
228 rm = self.get_resource(guid)
230 if wait_all_deployed:
233 self.register_condition(guid, ResourceAction.START,
234 towait, ResourceState.READY)
236 thread = threading.Thread(target = steps, args = (rm,))
237 threads.append(thread)
240 for thread in threads:
243 def release(self, group = None):
245 group = self.resources
249 rm = self.get_resource(guid)
250 thread = threading.Thread(target=rm.release)
251 threads.append(thread)
254 for thread in threads:
264 if self._thread.is_alive():
267 def schedule(self, date, callback, track = False):
268 """ Schedule a callback to be executed at time date.
270 date string containing execution time for the task.
271 It can be expressed as an absolute time, using
272 timestamp format, or as a relative time matching
273 ^\d+.\d+(h|m|s|ms|us)$
275 callback code to be executed for the task. Must be a
276 Python function, and receives args and kwargs
279 track if set to True, the task will be retrivable with
280 the get_task() method
282 timestamp = strfvalid(date)
284 task = Task(timestamp, callback)
285 task = self._scheduler.schedule(task)
288 self._tasks[task.id] = task
290 # Notify condition to wake up the processing thread
298 runner = ParallelRun(maxthreads = 50)
302 while not self._stop:
304 task = self._scheduler.next()
308 # It there are not tasks in the tasks queue we need to
309 # wait until a call to schedule wakes us up
314 # If the task timestamp is in the future the thread needs to wait
315 # until time elapse or until another task is scheduled
317 if now < task.timestamp:
318 # Calculate time difference in seconds
319 timeout = strfdiff(task.timestamp, now)
320 # Re-schedule task with the same timestamp
321 self._scheduler.schedule(task)
322 # Sleep until timeout or until a new task awakes the condition
324 self._cond.wait(timeout)
327 # Process tasks in parallel
328 runner.put(self._execute, task)
331 err = traceback.format_exc()
332 self._logger.error("Error while processing tasks in the EC: %s" % err)
334 def _execute(self, task):
336 task.status = TaskStatus.DONE
339 task.result = task.callback()
342 err = traceback.format_exc()
343 self._logger.error("Error while executing event: %s" % err)
346 task.status = TaskStatus.ERROR