8 from neco.util import guid
9 from neco.util.parallel import ParallelRun
10 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
11 from neco.execution.resource import ResourceFactory, ResourceAction, \
13 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
14 from neco.execution.trace import TraceAttr
16 # TODO: use multiprocessing instead of threading
17 # TODO: Improve speed. Too slow... !!
18 # TODO: When something fails during deployment NECO leaves scp and ssh processes running behind!!
20 class ECState(object):
25 class ExperimentController(object):
26 def __init__(self, exp_id = None, root_dir = "/tmp"):
27 super(ExperimentController, self).__init__()
28 # root directory to store files
29 self._root_dir = root_dir
31 # experiment identifier given by the user
32 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
34 # generator of globally unique ids
35 self._guid_generator = guid.GuidGenerator()
38 self._resources = dict()
41 self._scheduler = HeapScheduler()
46 # Event processing thread
47 self._cond = threading.Condition()
48 self._thread = threading.Thread(target = self._process)
49 self._thread.setDaemon(True)
53 self._state = ECState.RUNNING
56 self._logger = logging.getLogger("ExperimentController")
69 if not exp_id.startswith("nepi-"):
70 exp_id = "nepi-" + exp_id
75 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
77 def wait_finished(self, guids):
78 while not all([self.state(guid) == ResourceState.FINISHED \
79 for guid in guids]) and not self.finished:
80 # We keep the sleep as large as possible to
81 # decrese the number of RM state requests
84 def get_task(self, tid):
85 return self._tasks.get(tid)
87 def get_resource(self, guid):
88 return self._resources.get(guid)
92 return self._resources.keys()
94 def register_resource(self, rtype, guid = None):
95 # Get next available guid
96 guid = self._guid_generator.next(guid)
99 rm = ResourceFactory.create(rtype, self, guid)
102 self._resources[guid] = rm
106 def get_attributes(self, guid):
107 rm = self.get_resource(guid)
108 return rm.get_attributes()
110 def get_filters(self, guid):
111 rm = self.get_resource(guid)
112 return rm.get_filters()
114 def register_connection(self, guid1, guid2):
115 rm1 = self.get_resource(guid1)
116 rm2 = self.get_resource(guid2)
121 def register_condition(self, group1, action, group2, state,
123 """ Registers an action START or STOP for all RM on group1 to occur
124 time 'time' after all elements in group2 reached state 'state'.
126 :param group1: List of guids of RMs subjected to action
129 :param action: Action to register (either START or STOP)
130 :type action: ResourceAction
132 :param group2: List of guids of RMs to we waited for
135 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
136 :type state: ResourceState
138 :param time: Time to wait after group2 has reached status
142 if isinstance(group1, int):
144 if isinstance(group2, int):
148 rm = self.get_resource(guid1)
149 rm.register_condition(action, group2, state, time)
151 def register_trace(self, guid, name):
154 :param name: Name of the trace
157 rm = self.get_resource(guid)
158 rm.register_trace(name)
160 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
161 """ Get information on collected trace
163 :param name: Name of the trace
166 :param attr: Can be one of:
167 - TraceAttr.ALL (complete trace content),
168 - TraceAttr.STREAM (block in bytes to read starting at offset),
169 - TraceAttr.PATH (full path to the trace file),
170 - TraceAttr.SIZE (size of trace file).
173 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
176 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
181 rm = self.get_resource(guid)
182 return rm.trace(name, attr, block, offset)
184 def discover(self, guid, filters):
185 rm = self.get_resource(guid)
186 return rm.discover(filters)
188 def provision(self, guid, filters):
189 rm = self.get_resource(guid)
190 return rm.provision(filters)
192 def get(self, guid, name):
193 rm = self.get_resource(guid)
196 def set(self, guid, name, value):
197 rm = self.get_resource(guid)
198 return rm.set(name, value)
200 def state(self, guid):
201 rm = self.get_resource(guid)
204 def stop(self, guid):
205 rm = self.get_resource(guid)
208 def start(self, guid):
209 rm = self.get_resource(guid)
212 def set_with_conditions(self, name, value, group1, group2, state,
214 """ Set value 'value' on attribute with name 'name' on all RMs of
215 group1 when 'time' has elapsed since all elements in group2
216 have reached state 'state'.
218 :param name: Name of attribute to set in RM
221 :param value: Value of attribute to set in RM
224 :param group1: List of guids of RMs subjected to action
227 :param action: Action to register (either START or STOP)
228 :type action: ResourceAction
230 :param group2: List of guids of RMs to we waited for
233 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
234 :type state: ResourceState
236 :param time: Time to wait after group2 has reached status
240 if isinstance(group1, int):
242 if isinstance(group2, int):
246 rm = self.get_resource(guid)
247 rm.set_with_conditions(name, value, group2, state, time)
249 def stop_with_conditions(self, guid):
250 rm = self.get_resource(guid)
251 return rm.stop_with_conditions()
253 def start_with_conditions(self, guid):
254 rm = self.get_resource(guid)
255 return rm.start_with_condition()
257 def deploy(self, group = None, wait_all_ready = True):
258 """ Deploy all resource manager in group
260 :param group: List of guids of RMs to deploy
263 :param wait_all_ready: Wait until all RMs are ready in
264 order to start the RMs
268 self.logger.debug(" ------- DEPLOY START ------ ")
275 rm.start_with_conditions()
277 # Only if the RM has STOP consitions we
278 # schedule a stop. Otherwise the RM will stop immediately
279 if rm.conditions.get(ResourceAction.STOP):
280 rm.stop_with_conditions()
283 err = traceback.format_exc()
285 self._logger.error("Error occurred while deploying resources: %s" % err)
291 group = self.resources
293 # Before starting deployment we disorder the group list with the
294 # purpose of speeding up the whole deployment process.
295 # It is likely that the user inserted in the 'group' list closely
296 # resources resources one after another (e.g. all applications
297 # connected to the same node can likely appear one after another).
298 # This can originate a slow down in the deployment since the N
299 # threads the parallel runner uses to processes tasks may all
300 # be taken up by the same family of resources waiting for the
302 # If we disorder the group list, this problem can be mitigated
303 random.shuffle(group)
307 rm = self.get_resource(guid)
312 self.register_condition(guid, ResourceAction.START,
313 towait, ResourceState.READY)
315 thread = threading.Thread(target = steps, args = (rm,))
316 threads.append(thread)
317 thread.setDaemon(True)
320 while list(threads) and not self.finished and not stop:
322 # Time out after 5 seconds to check EC not terminated
324 if not thread.is_alive():
325 threads.remove(thread)
329 self._stop_scheduler()
331 if self._thread.is_alive():
334 raise RuntimeError, "Error occurred, interrupting deployment "
336 def release(self, group = None):
338 group = self.resources
342 rm = self.get_resource(guid)
343 thread = threading.Thread(target=rm.release)
344 threads.append(thread)
345 thread.setDaemon(True)
348 while list(threads) and not self.finished:
350 # Time out after 5 seconds to check EC not terminated
352 if not thread.is_alive():
353 threads.remove(thread)
358 self._stop_scheduler()
360 if self._thread.is_alive():
363 def schedule(self, date, callback, track = False):
364 """ Schedule a callback to be executed at time date.
366 date string containing execution time for the task.
367 It can be expressed as an absolute time, using
368 timestamp format, or as a relative time matching
369 ^\d+.\d+(h|m|s|ms|us)$
371 callback code to be executed for the task. Must be a
372 Python function, and receives args and kwargs
375 track if set to True, the task will be retrivable with
376 the get_task() method
378 timestamp = strfvalid(date)
380 task = Task(timestamp, callback)
381 task = self._scheduler.schedule(task)
384 self._tasks[task.id] = task
386 # Notify condition to wake up the processing thread
394 runner = ParallelRun(maxthreads = 50)
398 while not self.finished:
400 task = self._scheduler.next()
404 # It there are not tasks in the tasks queue we need to
405 # wait until a call to schedule wakes us up
410 # If the task timestamp is in the future the thread needs to wait
411 # until time elapse or until another task is scheduled
413 if now < task.timestamp:
414 # Calculate time difference in seconds
415 timeout = strfdiff(task.timestamp, now)
416 # Re-schedule task with the same timestamp
417 self._scheduler.schedule(task)
418 # Sleep until timeout or until a new task awakes the condition
420 self._cond.wait(timeout)
423 # Process tasks in parallel
424 runner.put(self._execute, task)
428 err = traceback.format_exc()
429 self._logger.error("Error while processing tasks in the EC: %s" % err)
431 self._state = ECState.FAILED
435 # Mark EC state as terminated
436 if self.ecstate == ECState.RUNNING:
437 self._state = ECState.TERMINATED
439 def _execute(self, task):
441 task.status = TaskStatus.DONE
444 task.result = task.callback()
447 err = traceback.format_exc()
449 task.status = TaskStatus.ERROR
451 self._logger.error("Error occurred while executing task: %s" % err)
453 self._stop_scheduler()
455 # Propage error to the ParallelRunner
458 def _stop_scheduler(self):
459 # Mark the EC as failed
460 self._state = ECState.FAILED
462 # Wake up the EC in case it was sleeping