8 from neco.util import guid
9 from neco.util.parallel import ParallelRun
10 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
11 from neco.execution.resource import ResourceFactory, ResourceAction, \
13 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
14 from neco.execution.trace import TraceAttr
16 # TODO: use multiprocessing instead of threading
17 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
19 class ECState(object):
24 class ExperimentController(object):
25 def __init__(self, exp_id = None, root_dir = "/tmp"):
26 super(ExperimentController, self).__init__()
27 # root directory to store files
28 self._root_dir = root_dir
30 # experiment identifier given by the user
31 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
33 # generator of globally unique ids
34 self._guid_generator = guid.GuidGenerator()
37 self._resources = dict()
40 self._scheduler = HeapScheduler()
45 # Event processing thread
46 self._cond = threading.Condition()
47 self._thread = threading.Thread(target = self._process)
48 self._thread.setDaemon(True)
52 self._state = ECState.RUNNING
55 self._logger = logging.getLogger("ExperimentController")
68 if not exp_id.startswith("nepi-"):
69 exp_id = "nepi-" + exp_id
74 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
76 def wait_finished(self, guids):
77 while not all([self.state(guid) == ResourceState.FINISHED \
78 for guid in guids]) and not self.finished:
79 # We keep the sleep as large as possible to
80 # decrese the number of RM state requests
83 def get_task(self, tid):
84 return self._tasks.get(tid)
86 def get_resource(self, guid):
87 return self._resources.get(guid)
91 return self._resources.keys()
93 def register_resource(self, rtype, guid = None):
94 # Get next available guid
95 guid = self._guid_generator.next(guid)
98 rm = ResourceFactory.create(rtype, self, guid)
101 self._resources[guid] = rm
105 def get_attributes(self, guid):
106 rm = self.get_resource(guid)
107 return rm.get_attributes()
109 def get_filters(self, guid):
110 rm = self.get_resource(guid)
111 return rm.get_filters()
113 def register_connection(self, guid1, guid2):
114 rm1 = self.get_resource(guid1)
115 rm2 = self.get_resource(guid2)
120 def register_condition(self, group1, action, group2, state,
122 """ Registers an action START or STOP for all RM on group1 to occur
123 time 'time' after all elements in group2 reached state 'state'.
125 :param group1: List of guids of RMs subjected to action
128 :param action: Action to register (either START or STOP)
129 :type action: ResourceAction
131 :param group2: List of guids of RMs to we waited for
134 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
135 :type state: ResourceState
137 :param time: Time to wait after group2 has reached status
141 if isinstance(group1, int):
143 if isinstance(group2, int):
147 rm = self.get_resource(guid1)
148 rm.register_condition(action, group2, state, time)
150 def register_trace(self, guid, name):
153 :param name: Name of the trace
156 rm = self.get_resource(guid)
157 rm.register_trace(name)
159 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
160 """ Get information on collected trace
162 :param name: Name of the trace
165 :param attr: Can be one of:
166 - TraceAttr.ALL (complete trace content),
167 - TraceAttr.STREAM (block in bytes to read starting at offset),
168 - TraceAttr.PATH (full path to the trace file),
169 - TraceAttr.SIZE (size of trace file).
172 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
175 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
180 rm = self.get_resource(guid)
181 return rm.trace(name, attr, block, offset)
183 def discover(self, guid, filters):
184 rm = self.get_resource(guid)
185 return rm.discover(filters)
187 def provision(self, guid, filters):
188 rm = self.get_resource(guid)
189 return rm.provision(filters)
191 def get(self, guid, name):
192 rm = self.get_resource(guid)
195 def set(self, guid, name, value):
196 rm = self.get_resource(guid)
197 return rm.set(name, value)
199 def state(self, guid):
200 rm = self.get_resource(guid)
203 def stop(self, guid):
204 rm = self.get_resource(guid)
207 def start(self, guid):
208 rm = self.get_resource(guid)
211 def set_with_conditions(self, name, value, group1, group2, state,
213 """ Set value 'value' on attribute with name 'name' on all RMs of
214 group1 when 'time' has elapsed since all elements in group2
215 have reached state 'state'.
217 :param name: Name of attribute to set in RM
220 :param value: Value of attribute to set in RM
223 :param group1: List of guids of RMs subjected to action
226 :param action: Action to register (either START or STOP)
227 :type action: ResourceAction
229 :param group2: List of guids of RMs to we waited for
232 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
233 :type state: ResourceState
235 :param time: Time to wait after group2 has reached status
239 if isinstance(group1, int):
241 if isinstance(group2, int):
245 rm = self.get_resource(guid)
246 rm.set_with_conditions(name, value, group2, state, time)
248 def stop_with_conditions(self, guid):
249 rm = self.get_resource(guid)
250 return rm.stop_with_conditions()
252 def start_with_conditions(self, guid):
253 rm = self.get_resource(guid)
254 return rm.start_with_condition()
256 def deploy(self, group = None, wait_all_ready = True):
257 """ Deploy all resource manager in group
259 :param group: List of guids of RMs to deploy
262 :param wait_all_ready: Wait until all RMs are ready in
263 order to start the RMs
267 self.logger.debug(" ------- DEPLOY START ------ ")
274 rm.start_with_conditions()
276 # Only if the RM has STOP consitions we
277 # schedule a stop. Otherwise the RM will stop immediately
278 if rm.conditions.get(ResourceAction.STOP):
279 rm.stop_with_conditions()
282 err = traceback.format_exc()
284 self._logger.error("Error occurred while deploying resources: %s" % err)
290 group = self.resources
292 # Before starting deployment we disorder the group list with the
293 # purpose of speeding up the whole deployment process.
294 # It is likely that the user inserted in the 'group' list closely
295 # resources resources one after another (e.g. all applications
296 # connected to the same node can likely appear one after another).
297 # This can originate a slow down in the deployment since the N
298 # threads the parallel runner uses to processes tasks may all
299 # be taken up by the same family of resources waiting for the
301 # If we disorder the group list, this problem can be mitigated
302 random.shuffle(group)
306 rm = self.get_resource(guid)
311 self.register_condition(guid, ResourceAction.START,
312 towait, ResourceState.READY)
314 thread = threading.Thread(target = steps, args = (rm,))
315 threads.append(thread)
316 thread.setDaemon(True)
319 while list(threads) and not self.finished and not stop:
321 # Time out after 5 seconds to check EC not terminated
323 if not thread.is_alive():
324 threads.remove(thread)
328 self._stop_scheduler()
330 if self._thread.is_alive():
333 raise RuntimeError, "Error occurred, interrupting deployment "
335 def release(self, group = None):
337 group = self.resources
341 rm = self.get_resource(guid)
342 thread = threading.Thread(target=rm.release)
343 threads.append(thread)
344 thread.setDaemon(True)
347 while list(threads) and not self.finished:
349 # Time out after 5 seconds to check EC not terminated
351 if not thread.is_alive():
352 threads.remove(thread)
357 self._stop_scheduler()
359 if self._thread.is_alive():
362 def schedule(self, date, callback, track = False):
363 """ Schedule a callback to be executed at time date.
365 date string containing execution time for the task.
366 It can be expressed as an absolute time, using
367 timestamp format, or as a relative time matching
368 ^\d+.\d+(h|m|s|ms|us)$
370 callback code to be executed for the task. Must be a
371 Python function, and receives args and kwargs
374 track if set to True, the task will be retrivable with
375 the get_task() method
377 timestamp = strfvalid(date)
379 task = Task(timestamp, callback)
380 task = self._scheduler.schedule(task)
383 self._tasks[task.id] = task
385 # Notify condition to wake up the processing thread
393 runner = ParallelRun(maxthreads = 50)
397 while not self.finished:
399 task = self._scheduler.next()
403 # It there are not tasks in the tasks queue we need to
404 # wait until a call to schedule wakes us up
409 # If the task timestamp is in the future the thread needs to wait
410 # until time elapse or until another task is scheduled
412 if now < task.timestamp:
413 # Calculate time difference in seconds
414 timeout = strfdiff(task.timestamp, now)
415 # Re-schedule task with the same timestamp
416 self._scheduler.schedule(task)
417 # Sleep until timeout or until a new task awakes the condition
419 self._cond.wait(timeout)
422 # Process tasks in parallel
423 runner.put(self._execute, task)
427 err = traceback.format_exc()
428 self._logger.error("Error while processing tasks in the EC: %s" % err)
430 self._state = ECState.FAILED
434 # Mark EC state as terminated
435 if self.ecstate == ECState.RUNNING:
436 self._state = ECState.TERMINATED
438 def _execute(self, task):
440 task.status = TaskStatus.DONE
443 task.result = task.callback()
446 err = traceback.format_exc()
448 task.status = TaskStatus.ERROR
450 self._logger.error("Error occurred while executing task: %s" % err)
452 self._stop_scheduler()
454 # Propage error to the ParallelRunner
457 def _stop_scheduler(self):
458 # Mark the EC as failed
459 self._state = ECState.FAILED
461 # Wake up the EC in case it was sleeping