7 from neco.util import guid
8 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
9 from neco.execution.resource import ResourceFactory, ResourceAction, \
11 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
12 from neco.util.parallel import ParallelRun
14 # TODO: use multiprocessing instead of threading
16 class ExperimentController(object):
17 def __init__(self, root_dir = "/tmp", loglevel = 'error'):
18 super(ExperimentController, self).__init__()
19 # root directory to store files
20 self._root_dir = root_dir
22 # generator of globally unique ids
23 self._guid_generator = guid.GuidGenerator()
26 self._resources = dict()
29 self._scheduler = HeapScheduler()
34 # Event processing thread
36 self._cond = threading.Condition()
37 self._thread = threading.Thread(target = self._process)
41 self._logger = logging.getLogger("neco.execution.ec")
42 self._logger.setLevel(getattr(logging, loglevel.upper()))
44 def get_task(self, tid):
45 return self._tasks.get(tid)
47 def get_resource(self, guid):
48 return self._resources.get(guid)
52 return self._resources.keys()
54 def register_resource(self, rtype, guid = None, creds = None):
55 # Get next available guid
56 guid = self._guid_generator.next(guid)
59 rm = ResourceFactory.create(rtype, self, guid, creds)
62 self._resources[guid] = rm
66 def get_attributes(self, guid):
67 rm = self.get_resource(guid)
68 return rm.get_attributes()
70 def get_filters(self, guid):
71 rm = self.get_resource(guid)
72 return rm.get_filters()
74 def register_connection(self, guid1, guid2):
75 rm1 = self.get_resource(guid1)
76 rm2 = self.get_resource(guid2)
81 def register_condition(self, group1, action, group2, state,
83 """ Registers an action START or STOP for all RM on group1 to occur
84 time 'time' after all elements in group2 reached state 'state'.
86 :param group1: List of guids of RMs subjected to action
89 :param action: Action to register (either START or STOP)
90 :type action: ResourceAction
92 :param group2: List of guids of RMs to we waited for
95 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
96 :type state: ResourceState
98 :param time: Time to wait after group2 has reached status
102 if isinstance(group1, int):
103 group1 = list[group1]
104 if isinstance(group2, int):
105 group2 = list[group2]
108 rm = self.get_resource(guid)
109 rm.register_condition(action, group2, state, time)
111 def discover(self, guid, filters):
112 rm = self.get_resource(guid)
113 return rm.discover(filters)
115 def provision(self, guid, filters):
116 rm = self.get_resource(guid)
117 return rm.provision(filters)
119 def get(self, guid, name):
120 rm = self.get_resource(guid)
123 def set(self, guid, name, value):
124 rm = self.get_resource(guid)
125 return rm.set(name, value)
127 def state(self, guid):
128 rm = self.get_resource(guid)
131 def stop(self, guid):
132 rm = self.get_resource(guid)
135 def start(self, guid):
136 rm = self.get_resource(guid)
139 def set_with_conditions(self, name, value, group1, group2, state,
141 """ Set value 'value' on attribute with name 'name' on all RMs of
142 group1 when 'time' has elapsed since all elements in group2
143 have reached state 'state'.
145 :param name: Name of attribute to set in RM
148 :param value: Value of attribute to set in RM
151 :param group1: List of guids of RMs subjected to action
154 :param action: Action to register (either START or STOP)
155 :type action: ResourceAction
157 :param group2: List of guids of RMs to we waited for
160 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
161 :type state: ResourceState
163 :param time: Time to wait after group2 has reached status
167 if isinstance(group1, int):
168 group1 = list[group1]
169 if isinstance(group2, int):
170 group2 = list[group2]
173 rm = self.get_resource(guid)
174 rm.set_with_conditions(name, value, group2, state, time)
176 def stop_with_conditions(self, guid):
177 rm = self.get_resource(guid)
178 return rm.stop_with_conditions()
180 def start_with_conditions(self, guid):
181 rm = self.get_resource(guid)
182 return rm.start_with_condition()
184 def deploy(self, group = None, wait_all_ready = True):
185 """ Deploy all resource manager in group
187 :param group: List of guids of RMs to deploy
190 :param wait_all_ready: Wait until all RMs are deployed in
191 order to start the RMs
197 rm.start_with_conditions()
199 # Only if the RM has STOP consitions we
200 # schedule a stop. Otherwise the RM will stop immediately
201 if rm.conditions.get(ResourceAction.STOP):
202 rm.stop_with_conditions()
205 group = self.resources
209 rm = self.get_resource(guid)
214 self.register_condition(guid, ResourceAction.START,
215 towait, ResourceState.DEPLOYED)
217 thread = threading.Thread(target = steps, args = (rm))
218 threads.append(thread)
221 for thread in threads:
224 def release(self, group = None):
226 group = self.resources
230 rm = self.get_resource(guid)
231 thread = threading.Thread(target=rm.release)
232 threads.append(thread)
235 for thread in threads:
245 if self._thread.is_alive():
248 def schedule(self, date, callback, track = False):
249 """ Schedule a callback to be executed at time date.
251 date string containing execution time for the task.
252 It can be expressed as an absolute time, using
253 timestamp format, or as a relative time matching
254 ^\d+.\d+(h|m|s|ms|us)$
256 callback code to be executed for the task. Must be a
257 Python function, and receives args and kwargs
260 track if set to True, the task will be retrivable with
261 the get_task() method
263 timestamp = strfvalid(date)
265 task = Task(timestamp, callback)
266 task = self._scheduler.schedule(task)
269 self._tasks[task.id] = task
271 # Notify condition to wake up the processing thread
279 runner = ParallelRun(maxthreads = 50)
283 while not self._stop:
285 task = self._scheduler.next()
289 # It there are not tasks in the tasks queue we need to
290 # wait until a call to schedule wakes us up
295 # If the task timestamp is in the future the thread needs to wait
296 # until time elapse or until another task is scheduled
298 if now < task.timestamp:
299 # Calculate time difference in seconds
300 timeout = strfdiff(task.timestamp, now)
301 # Re-schedule task with the same timestamp
302 self._scheduler.schedule(task)
303 # Sleep until timeout or until a new task awakes the condition
305 self._cond.wait(timeout)
308 # Process tasks in parallel
309 runner.put(self._execute, task)
312 err = traceback.format_exc()
313 self._logger.error("Error while processing tasks in the EC: %s" % err)
315 def _execute(self, task):
317 task.status = TaskStatus.DONE
320 task.result = task.callback()
323 err = traceback.format_exc()
324 self._logger.error("Error while executing event: %s" % err)
327 task.status = TaskStatus.ERROR