7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
9 from neco.execution.resource import ResourceFactory, ResourceAction, \
11 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
12 from neco.util.parallel import ParallelRun
14 # TODO: use multiprocessing instead of threading
16 class ExperimentController(object):
17 def __init__(self, root_dir = "/tmp", loglevel = 'error'):
18 super(ExperimentController, self).__init__()
19 # root directory to store files
20 self._root_dir = root_dir
22 # generator of globally unique ids
23 self._guid_generator = guid.GuidGenerator()
26 self._resources = dict()
32 self._scheduler = HeapScheduler()
37 # Event processing thread
39 self._cond = threading.Condition()
40 self._thread = threading.Thread(target = self._process)
44 self._logger = logging.getLogger("neco.execution.ec")
45 self._logger.setLevel(getattr(logging, loglevel.upper()))
47 def get_task(self, tid):
48 return self._tasks.get(tid)
50 def get_resource(self, guid):
51 return self._resources.get(guid)
55 return self._resources.keys()
57 def register_resource(self, rtype, guid = None):
58 # Get next available guid
59 guid = self._guid_generator.next(guid)
62 rm = ResourceFactory.create(rtype, self, guid)
65 self._resources[guid] = rm
69 def create_group(self, *args):
70 guid = self._guid_generator.next(guid)
72 grp = [arg for arg in args]
74 self._resources[guid] = grp
79 def get_attributes(self, guid):
80 rm = self.get_resource(guid)
81 return rm.get_attributes()
83 def get_filters(self, guid):
84 rm = self.get_resource(guid)
85 return rm.get_filters()
87 def register_connection(self, guid1, guid2):
88 rm1 = self.get_resource(guid1)
89 rm2 = self.get_resource(guid2)
94 def register_condition(self, group1, action, group2, state,
96 """ Registers an action START or STOP for all RM on group1 to occur
97 time 'time' after all elements in group2 reached state 'state'.
99 :param group1: List of guids of RMs subjected to action
102 :param action: Action to register (either START or STOP)
103 :type action: ResourceAction
105 :param group2: List of guids of RMs to we waited for
108 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
109 :type state: ResourceState
111 :param time: Time to wait after group2 has reached status
115 if isinstance(group1, int):
117 if isinstance(group2, int):
121 rm = self.get_resource(guid1)
122 rm.register_condition(action, group2, state, time)
124 def discover(self, guid, filters):
125 rm = self.get_resource(guid)
126 return rm.discover(filters)
128 def provision(self, guid, filters):
129 rm = self.get_resource(guid)
130 return rm.provision(filters)
132 def get(self, guid, name):
133 rm = self.get_resource(guid)
136 def set(self, guid, name, value):
137 rm = self.get_resource(guid)
138 return rm.set(name, value)
140 def state(self, guid):
141 rm = self.get_resource(guid)
144 def stop(self, guid):
145 rm = self.get_resource(guid)
148 def start(self, guid):
149 rm = self.get_resource(guid)
152 def set_with_conditions(self, name, value, group1, group2, state,
154 """ Set value 'value' on attribute with name 'name' on all RMs of
155 group1 when 'time' has elapsed since all elements in group2
156 have reached state 'state'.
158 :param name: Name of attribute to set in RM
161 :param value: Value of attribute to set in RM
164 :param group1: List of guids of RMs subjected to action
167 :param action: Action to register (either START or STOP)
168 :type action: ResourceAction
170 :param group2: List of guids of RMs to we waited for
173 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
174 :type state: ResourceState
176 :param time: Time to wait after group2 has reached status
180 if isinstance(group1, int):
182 if isinstance(group2, int):
186 rm = self.get_resource(guid)
187 rm.set_with_conditions(name, value, group2, state, time)
189 def stop_with_conditions(self, guid):
190 rm = self.get_resource(guid)
191 return rm.stop_with_conditions()
193 def start_with_conditions(self, guid):
194 rm = self.get_resource(guid)
195 return rm.start_with_condition()
197 def deploy(self, group = None, wait_all_deployed = True):
198 """ Deploy all resource manager in group
200 :param group: List of guids of RMs to deploy
203 :param wait_all_deployed: Wait until all RMs are deployed in
204 order to start the RMs
210 rm.start_with_conditions()
212 # Only if the RM has STOP consitions we
213 # schedule a stop. Otherwise the RM will stop immediately
214 if rm.conditions.get(ResourceAction.STOP):
215 rm.stop_with_conditions()
218 group = self.resources
222 rm = self.get_resource(guid)
224 if wait_all_deployed:
227 self.register_condition(guid, ResourceAction.START,
228 towait, ResourceState.DEPLOYED)
230 thread = threading.Thread(target = steps, args = (rm,))
231 threads.append(thread)
234 for thread in threads:
237 def release(self, group = None):
239 group = self.resources
243 rm = self.get_resource(guid)
244 thread = threading.Thread(target=rm.release)
245 threads.append(thread)
248 for thread in threads:
258 if self._thread.is_alive():
261 def schedule(self, date, callback, track = False):
262 """ Schedule a callback to be executed at time date.
264 date string containing execution time for the task.
265 It can be expressed as an absolute time, using
266 timestamp format, or as a relative time matching
267 ^\d+.\d+(h|m|s|ms|us)$
269 callback code to be executed for the task. Must be a
270 Python function, and receives args and kwargs
273 track if set to True, the task will be retrivable with
274 the get_task() method
276 timestamp = strfvalid(date)
278 task = Task(timestamp, callback)
279 task = self._scheduler.schedule(task)
282 self._tasks[task.id] = task
284 # Notify condition to wake up the processing thread
292 runner = ParallelRun(maxthreads = 50)
296 while not self._stop:
298 task = self._scheduler.next()
302 # It there are not tasks in the tasks queue we need to
303 # wait until a call to schedule wakes us up
308 # If the task timestamp is in the future the thread needs to wait
309 # until time elapse or until another task is scheduled
311 if now < task.timestamp:
312 # Calculate time difference in seconds
313 timeout = strfdiff(task.timestamp, now)
314 # Re-schedule task with the same timestamp
315 self._scheduler.schedule(task)
316 # Sleep until timeout or until a new task awakes the condition
318 self._cond.wait(timeout)
321 # Process tasks in parallel
322 runner.put(self._execute, task)
325 err = traceback.format_exc()
326 self._logger.error("Error while processing tasks in the EC: %s" % err)
328 def _execute(self, task):
330 task.status = TaskStatus.DONE
333 task.result = task.callback()
336 err = traceback.format_exc()
337 self._logger.error("Error while executing event: %s" % err)
340 task.status = TaskStatus.ERROR