7 from neco.util import guid
8 from neco.util.parallel import ParallelRun
9 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
10 from neco.execution.resource import ResourceFactory, ResourceAction, \
12 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
13 from neco.execution.trace import TraceAttr
15 # TODO: use multiprocessing instead of threading
16 # TODO: Improve speed. Too slow... !!
18 class ECState(object):
23 class ExperimentController(object):
24 def __init__(self, exp_id = None, root_dir = "/tmp"):
25 super(ExperimentController, self).__init__()
26 # root directory to store files
27 self._root_dir = root_dir
29 # experiment identifier given by the user
30 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
32 # generator of globally unique ids
33 self._guid_generator = guid.GuidGenerator()
36 self._resources = dict()
39 self._scheduler = HeapScheduler()
44 # Event processing thread
45 self._cond = threading.Condition()
46 self._thread = threading.Thread(target = self._process)
47 self._thread.setDaemon(True)
51 self._state = ECState.RUNNING
54 self._logger = logging.getLogger("ExperimentController")
67 if not exp_id.startswith("nepi-"):
68 exp_id = "nepi-" + exp_id
73 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
75 def wait_finished(self, guids):
76 while not all([self.state(guid) == ResourceState.FINISHED \
77 for guid in guids]) and not self.finished:
78 # We keep the sleep as large as possible to
79 # decrese the number of RM state requests
82 def get_task(self, tid):
83 return self._tasks.get(tid)
85 def get_resource(self, guid):
86 return self._resources.get(guid)
90 return self._resources.keys()
92 def register_resource(self, rtype, guid = None):
93 # Get next available guid
94 guid = self._guid_generator.next(guid)
97 rm = ResourceFactory.create(rtype, self, guid)
100 self._resources[guid] = rm
104 def get_attributes(self, guid):
105 rm = self.get_resource(guid)
106 return rm.get_attributes()
108 def get_filters(self, guid):
109 rm = self.get_resource(guid)
110 return rm.get_filters()
112 def register_connection(self, guid1, guid2):
113 rm1 = self.get_resource(guid1)
114 rm2 = self.get_resource(guid2)
119 def register_condition(self, group1, action, group2, state,
121 """ Registers an action START or STOP for all RM on group1 to occur
122 time 'time' after all elements in group2 reached state 'state'.
124 :param group1: List of guids of RMs subjected to action
127 :param action: Action to register (either START or STOP)
128 :type action: ResourceAction
130 :param group2: List of guids of RMs to we waited for
133 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
134 :type state: ResourceState
136 :param time: Time to wait after group2 has reached status
140 if isinstance(group1, int):
142 if isinstance(group2, int):
146 rm = self.get_resource(guid1)
147 rm.register_condition(action, group2, state, time)
149 def register_trace(self, guid, name):
152 :param name: Name of the trace
155 rm = self.get_resource(guid)
156 rm.register_trace(name)
158 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
159 """ Get information on collected trace
161 :param name: Name of the trace
164 :param attr: Can be one of:
165 - TraceAttr.ALL (complete trace content),
166 - TraceAttr.STREAM (block in bytes to read starting at offset),
167 - TraceAttr.PATH (full path to the trace file),
168 - TraceAttr.SIZE (size of trace file).
171 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
174 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
179 rm = self.get_resource(guid)
180 return rm.trace(name, attr, block, offset)
182 def discover(self, guid, filters):
183 rm = self.get_resource(guid)
184 return rm.discover(filters)
186 def provision(self, guid, filters):
187 rm = self.get_resource(guid)
188 return rm.provision(filters)
190 def get(self, guid, name):
191 rm = self.get_resource(guid)
194 def set(self, guid, name, value):
195 rm = self.get_resource(guid)
196 return rm.set(name, value)
198 def state(self, guid):
199 rm = self.get_resource(guid)
202 def stop(self, guid):
203 rm = self.get_resource(guid)
206 def start(self, guid):
207 rm = self.get_resource(guid)
210 def set_with_conditions(self, name, value, group1, group2, state,
212 """ Set value 'value' on attribute with name 'name' on all RMs of
213 group1 when 'time' has elapsed since all elements in group2
214 have reached state 'state'.
216 :param name: Name of attribute to set in RM
219 :param value: Value of attribute to set in RM
222 :param group1: List of guids of RMs subjected to action
225 :param action: Action to register (either START or STOP)
226 :type action: ResourceAction
228 :param group2: List of guids of RMs to we waited for
231 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
232 :type state: ResourceState
234 :param time: Time to wait after group2 has reached status
238 if isinstance(group1, int):
240 if isinstance(group2, int):
244 rm = self.get_resource(guid)
245 rm.set_with_conditions(name, value, group2, state, time)
247 def stop_with_conditions(self, guid):
248 rm = self.get_resource(guid)
249 return rm.stop_with_conditions()
251 def start_with_conditions(self, guid):
252 rm = self.get_resource(guid)
253 return rm.start_with_condition()
255 def deploy(self, group = None, wait_all_ready = True):
256 """ Deploy all resource manager in group
258 :param group: List of guids of RMs to deploy
261 :param wait_all_ready: Wait until all RMs are ready in
262 order to start the RMs
266 self.logger.debug(" ------- DEPLOY START ------ ")
270 rm.start_with_conditions()
272 # Only if the RM has STOP consitions we
273 # schedule a stop. Otherwise the RM will stop immediately
274 if rm.conditions.get(ResourceAction.STOP):
275 rm.stop_with_conditions()
278 group = self.resources
282 rm = self.get_resource(guid)
287 self.register_condition(guid, ResourceAction.START,
288 towait, ResourceState.READY)
290 thread = threading.Thread(target = steps, args = (rm,))
291 threads.append(thread)
292 thread.setDaemon(True)
295 while list(threads) and not self.finished:
297 # Time out after 5 seconds to check EC not terminated
299 if not thread.is_alive():
300 threads.remove(thread)
302 def release(self, group = None):
304 group = self.resources
308 rm = self.get_resource(guid)
309 thread = threading.Thread(target=rm.release)
310 threads.append(thread)
311 thread.setDaemon(True)
314 while list(threads) and not self.finished:
316 # Time out after 5 seconds to check EC not terminated
318 if not thread.is_alive():
319 threads.remove(thread)
321 self._state = ECState.TERMINATED
330 if self._thread.is_alive():
333 def schedule(self, date, callback, track = False):
334 """ Schedule a callback to be executed at time date.
336 date string containing execution time for the task.
337 It can be expressed as an absolute time, using
338 timestamp format, or as a relative time matching
339 ^\d+.\d+(h|m|s|ms|us)$
341 callback code to be executed for the task. Must be a
342 Python function, and receives args and kwargs
345 track if set to True, the task will be retrivable with
346 the get_task() method
348 timestamp = strfvalid(date)
350 task = Task(timestamp, callback)
351 task = self._scheduler.schedule(task)
354 self._tasks[task.id] = task
356 # Notify condition to wake up the processing thread
364 runner = ParallelRun(maxthreads = 50)
368 while not self.finished:
370 task = self._scheduler.next()
374 # It there are not tasks in the tasks queue we need to
375 # wait until a call to schedule wakes us up
380 # If the task timestamp is in the future the thread needs to wait
381 # until time elapse or until another task is scheduled
383 if now < task.timestamp:
384 # Calculate time difference in seconds
385 timeout = strfdiff(task.timestamp, now)
386 # Re-schedule task with the same timestamp
387 self._scheduler.schedule(task)
388 # Sleep until timeout or until a new task awakes the condition
390 self._cond.wait(timeout)
393 # Process tasks in parallel
394 runner.put(self._execute, task)
398 err = traceback.format_exc()
399 self._logger.error("Error while processing tasks in the EC: %s" % err)
401 self._state = ECState.FAILED
404 # Mark EC state as terminated
405 if self.ecstate == ECState.RUNNING:
406 self._state = ECState.TERMINATED
408 def _execute(self, task):
410 task.status = TaskStatus.DONE
413 task.result = task.callback()
416 err = traceback.format_exc()
418 task.status = TaskStatus.ERROR
420 self._logger.error("Error occurred while executing task: %s" % err)
422 # Mark the EC as failed
423 self._state = ECState.FAILED
425 # Wake up the EC in case it was sleeping
430 # Propage error to the ParallelRunner