7 from neco.util import guid
8 from neco.util.parallel import ParallelRun
9 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
10 from neco.execution.resource import ResourceFactory, ResourceAction, \
12 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
13 from neco.execution.trace import TraceAttr
15 # TODO: use multiprocessing instead of threading
16 # TODO: Improve speed. Too slow... !!
18 class ECState(object):
23 class ExperimentController(object):
24 def __init__(self, exp_id = None, root_dir = "/tmp"):
25 super(ExperimentController, self).__init__()
26 # root directory to store files
27 self._root_dir = root_dir
29 # experiment identifier given by the user
30 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
32 # generator of globally unique ids
33 self._guid_generator = guid.GuidGenerator()
36 self._resources = dict()
39 self._scheduler = HeapScheduler()
44 # Event processing thread
45 self._cond = threading.Condition()
46 self._thread = threading.Thread(target = self._process)
47 self._thread.setDaemon(True)
51 self._state = ECState.RUNNING
54 self._logger = logging.getLogger("ExperimentController")
67 if not exp_id.startswith("nepi-"):
68 exp_id = "nepi-" + exp_id
73 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
75 def wait_finished(self, guids):
76 while not all([self.state(guid) == ResourceState.FINISHED \
77 for guid in guids]) and not self.finished:
80 def get_task(self, tid):
81 return self._tasks.get(tid)
83 def get_resource(self, guid):
84 return self._resources.get(guid)
88 return self._resources.keys()
90 def register_resource(self, rtype, guid = None):
91 # Get next available guid
92 guid = self._guid_generator.next(guid)
95 rm = ResourceFactory.create(rtype, self, guid)
98 self._resources[guid] = rm
102 def get_attributes(self, guid):
103 rm = self.get_resource(guid)
104 return rm.get_attributes()
106 def get_filters(self, guid):
107 rm = self.get_resource(guid)
108 return rm.get_filters()
110 def register_connection(self, guid1, guid2):
111 rm1 = self.get_resource(guid1)
112 rm2 = self.get_resource(guid2)
117 def register_condition(self, group1, action, group2, state,
119 """ Registers an action START or STOP for all RM on group1 to occur
120 time 'time' after all elements in group2 reached state 'state'.
122 :param group1: List of guids of RMs subjected to action
125 :param action: Action to register (either START or STOP)
126 :type action: ResourceAction
128 :param group2: List of guids of RMs to we waited for
131 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
132 :type state: ResourceState
134 :param time: Time to wait after group2 has reached status
138 if isinstance(group1, int):
140 if isinstance(group2, int):
144 rm = self.get_resource(guid1)
145 rm.register_condition(action, group2, state, time)
147 def register_trace(self, guid, name):
150 :param name: Name of the trace
153 rm = self.get_resource(guid)
154 rm.register_trace(name)
156 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
157 """ Get information on collected trace
159 :param name: Name of the trace
162 :param attr: Can be one of:
163 - TraceAttr.ALL (complete trace content),
164 - TraceAttr.STREAM (block in bytes to read starting at offset),
165 - TraceAttr.PATH (full path to the trace file),
166 - TraceAttr.SIZE (size of trace file).
169 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
172 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
177 rm = self.get_resource(guid)
178 return rm.trace(name, attr, block, offset)
180 def discover(self, guid, filters):
181 rm = self.get_resource(guid)
182 return rm.discover(filters)
184 def provision(self, guid, filters):
185 rm = self.get_resource(guid)
186 return rm.provision(filters)
188 def get(self, guid, name):
189 rm = self.get_resource(guid)
192 def set(self, guid, name, value):
193 rm = self.get_resource(guid)
194 return rm.set(name, value)
196 def state(self, guid):
197 rm = self.get_resource(guid)
200 def stop(self, guid):
201 rm = self.get_resource(guid)
204 def start(self, guid):
205 rm = self.get_resource(guid)
208 def set_with_conditions(self, name, value, group1, group2, state,
210 """ Set value 'value' on attribute with name 'name' on all RMs of
211 group1 when 'time' has elapsed since all elements in group2
212 have reached state 'state'.
214 :param name: Name of attribute to set in RM
217 :param value: Value of attribute to set in RM
220 :param group1: List of guids of RMs subjected to action
223 :param action: Action to register (either START or STOP)
224 :type action: ResourceAction
226 :param group2: List of guids of RMs to we waited for
229 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
230 :type state: ResourceState
232 :param time: Time to wait after group2 has reached status
236 if isinstance(group1, int):
238 if isinstance(group2, int):
242 rm = self.get_resource(guid)
243 rm.set_with_conditions(name, value, group2, state, time)
245 def stop_with_conditions(self, guid):
246 rm = self.get_resource(guid)
247 return rm.stop_with_conditions()
249 def start_with_conditions(self, guid):
250 rm = self.get_resource(guid)
251 return rm.start_with_condition()
253 def deploy(self, group = None, wait_all_ready = True):
254 """ Deploy all resource manager in group
256 :param group: List of guids of RMs to deploy
259 :param wait_all_ready: Wait until all RMs are ready in
260 order to start the RMs
264 self.logger.debug(" ------- DEPLOY START ------ ")
268 rm.start_with_conditions()
270 # Only if the RM has STOP consitions we
271 # schedule a stop. Otherwise the RM will stop immediately
272 if rm.conditions.get(ResourceAction.STOP):
273 rm.stop_with_conditions()
276 group = self.resources
280 rm = self.get_resource(guid)
285 self.register_condition(guid, ResourceAction.START,
286 towait, ResourceState.READY)
288 thread = threading.Thread(target = steps, args = (rm,))
289 threads.append(thread)
290 thread.setDaemon(True)
293 while list(threads) and not self.finished:
295 # Time out after 5 seconds to check EC not terminated
297 if not thread.is_alive():
298 threads.remove(thread)
300 def release(self, group = None):
302 group = self.resources
306 rm = self.get_resource(guid)
307 thread = threading.Thread(target=rm.release)
308 threads.append(thread)
309 thread.setDaemon(True)
312 while list(threads) and not self.finished:
314 # Time out after 5 seconds to check EC not terminated
316 if not thread.is_alive():
317 threads.remove(thread)
319 self._state = ECState.TERMINATED
328 if self._thread.is_alive():
331 def schedule(self, date, callback, track = False):
332 """ Schedule a callback to be executed at time date.
334 date string containing execution time for the task.
335 It can be expressed as an absolute time, using
336 timestamp format, or as a relative time matching
337 ^\d+.\d+(h|m|s|ms|us)$
339 callback code to be executed for the task. Must be a
340 Python function, and receives args and kwargs
343 track if set to True, the task will be retrivable with
344 the get_task() method
346 timestamp = strfvalid(date)
348 task = Task(timestamp, callback)
349 task = self._scheduler.schedule(task)
352 self._tasks[task.id] = task
354 # Notify condition to wake up the processing thread
362 runner = ParallelRun(maxthreads = 50)
366 while not self.finished:
368 task = self._scheduler.next()
372 # It there are not tasks in the tasks queue we need to
373 # wait until a call to schedule wakes us up
378 # If the task timestamp is in the future the thread needs to wait
379 # until time elapse or until another task is scheduled
381 if now < task.timestamp:
382 # Calculate time difference in seconds
383 timeout = strfdiff(task.timestamp, now)
384 # Re-schedule task with the same timestamp
385 self._scheduler.schedule(task)
386 # Sleep until timeout or until a new task awakes the condition
388 self._cond.wait(timeout)
391 # Process tasks in parallel
392 runner.put(self._execute, task)
396 err = traceback.format_exc()
397 self._logger.error("Error while processing tasks in the EC: %s" % err)
399 self._state = ECState.FAILED
402 # Mark EC state as terminated
403 if self.ecstate == ECState.RUNNING:
404 self._state = ECState.TERMINATED
406 def _execute(self, task):
408 task.status = TaskStatus.DONE
411 task.result = task.callback()
414 err = traceback.format_exc()
416 task.status = TaskStatus.ERROR
418 self._logger.error("Error occurred while executing task: %s" % err)
420 # Mark the EC as failed
421 self._state = ECState.FAILED
423 # Wake up the EC in case it was sleeping
428 # Propage error to the ParallelRunner