7 from neco.util import guid
8 from neco.util.parallel import ParallelRun
9 from neco.util.timefuncs import strfnow, strfdiff, strfvalid
10 from neco.execution.resource import ResourceFactory, ResourceAction, \
12 from neco.execution.scheduler import HeapScheduler, Task, TaskStatus
13 from neco.execution.trace import TraceAttr
15 # TODO: use multiprocessing instead of threading
17 class ExperimentController(object):
18 def __init__(self, exp_id = None, root_dir = "/tmp"):
19 super(ExperimentController, self).__init__()
20 # root directory to store files
21 self._root_dir = root_dir
23 # experiment identifier given by the user
24 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
26 # generator of globally unique ids
27 self._guid_generator = guid.GuidGenerator()
30 self._resources = dict()
36 self._scheduler = HeapScheduler()
41 # Event processing thread
43 self._cond = threading.Condition()
44 self._thread = threading.Thread(target = self._process)
48 self._logger = logging.getLogger("neco.execution.ec")
57 if not exp_id.startswith("nepi-"):
58 exp_id = "nepi-" + exp_id
61 def get_task(self, tid):
62 return self._tasks.get(tid)
64 def get_resource(self, guid):
65 return self._resources.get(guid)
69 return self._resources.keys()
71 def register_resource(self, rtype, guid = None):
72 # Get next available guid
73 guid = self._guid_generator.next(guid)
76 rm = ResourceFactory.create(rtype, self, guid)
79 self._resources[guid] = rm
83 def register_group(self, group):
84 guid = self._guid_generator.next()
86 if not isinstance(group, list):
89 self._groups[guid] = group
93 def get_attributes(self, guid):
94 rm = self.get_resource(guid)
95 return rm.get_attributes()
97 def get_filters(self, guid):
98 rm = self.get_resource(guid)
99 return rm.get_filters()
101 def register_connection(self, guid1, guid2):
102 rm1 = self.get_resource(guid1)
103 rm2 = self.get_resource(guid2)
108 def register_condition(self, group1, action, group2, state,
110 """ Registers an action START or STOP for all RM on group1 to occur
111 time 'time' after all elements in group2 reached state 'state'.
113 :param group1: List of guids of RMs subjected to action
116 :param action: Action to register (either START or STOP)
117 :type action: ResourceAction
119 :param group2: List of guids of RMs to we waited for
122 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
123 :type state: ResourceState
125 :param time: Time to wait after group2 has reached status
129 if isinstance(group1, int):
131 if isinstance(group2, int):
135 rm = self.get_resource(guid1)
136 rm.register_condition(action, group2, state, time)
138 def register_trace(self, guid, name):
141 :param name: Name of the trace
144 rm = self.get_resource(guid)
145 rm.register_trace(name)
147 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
148 """ Get information on collected trace
150 :param name: Name of the trace
153 :param attr: Can be one of:
154 - TraceAttr.ALL (complete trace content),
155 - TraceAttr.STREAM (block in bytes to read starting at offset),
156 - TraceAttr.PATH (full path to the trace file),
157 - TraceAttr.SIZE (size of trace file).
160 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
163 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
168 rm = self.get_resource(guid)
169 return rm.trace(name, attr, block, offset)
171 def discover(self, guid, filters):
172 rm = self.get_resource(guid)
173 return rm.discover(filters)
175 def provision(self, guid, filters):
176 rm = self.get_resource(guid)
177 return rm.provision(filters)
179 def get(self, guid, name):
180 rm = self.get_resource(guid)
183 def set(self, guid, name, value):
184 rm = self.get_resource(guid)
185 return rm.set(name, value)
187 def state(self, guid):
188 rm = self.get_resource(guid)
191 def stop(self, guid):
192 rm = self.get_resource(guid)
195 def start(self, guid):
196 rm = self.get_resource(guid)
199 def set_with_conditions(self, name, value, group1, group2, state,
201 """ Set value 'value' on attribute with name 'name' on all RMs of
202 group1 when 'time' has elapsed since all elements in group2
203 have reached state 'state'.
205 :param name: Name of attribute to set in RM
208 :param value: Value of attribute to set in RM
211 :param group1: List of guids of RMs subjected to action
214 :param action: Action to register (either START or STOP)
215 :type action: ResourceAction
217 :param group2: List of guids of RMs to we waited for
220 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
221 :type state: ResourceState
223 :param time: Time to wait after group2 has reached status
227 if isinstance(group1, int):
229 if isinstance(group2, int):
233 rm = self.get_resource(guid)
234 rm.set_with_conditions(name, value, group2, state, time)
236 def stop_with_conditions(self, guid):
237 rm = self.get_resource(guid)
238 return rm.stop_with_conditions()
240 def start_with_conditions(self, guid):
241 rm = self.get_resource(guid)
242 return rm.start_with_condition()
244 def deploy(self, group = None, wait_all_ready = True):
245 """ Deploy all resource manager in group
247 :param group: List of guids of RMs to deploy
250 :param wait_all_ready: Wait until all RMs are ready in
251 order to start the RMs
255 self.logger.debug(" ------- DEPLOY START ------ ")
259 rm.start_with_conditions()
261 # Only if the RM has STOP consitions we
262 # schedule a stop. Otherwise the RM will stop immediately
263 if rm.conditions.get(ResourceAction.STOP):
264 rm.stop_with_conditions()
267 group = self.resources
271 rm = self.get_resource(guid)
276 self.register_condition(guid, ResourceAction.START,
277 towait, ResourceState.READY)
279 thread = threading.Thread(target = steps, args = (rm,))
280 threads.append(thread)
283 for thread in threads:
286 def release(self, group = None):
288 group = self.resources
292 rm = self.get_resource(guid)
293 thread = threading.Thread(target=rm.release)
294 threads.append(thread)
297 for thread in threads:
307 if self._thread.is_alive():
310 def schedule(self, date, callback, track = False):
311 """ Schedule a callback to be executed at time date.
313 date string containing execution time for the task.
314 It can be expressed as an absolute time, using
315 timestamp format, or as a relative time matching
316 ^\d+.\d+(h|m|s|ms|us)$
318 callback code to be executed for the task. Must be a
319 Python function, and receives args and kwargs
322 track if set to True, the task will be retrivable with
323 the get_task() method
325 timestamp = strfvalid(date)
327 task = Task(timestamp, callback)
328 task = self._scheduler.schedule(task)
331 self._tasks[task.id] = task
333 # Notify condition to wake up the processing thread
341 runner = ParallelRun(maxthreads = 50)
345 while not self._stop:
347 task = self._scheduler.next()
351 # It there are not tasks in the tasks queue we need to
352 # wait until a call to schedule wakes us up
357 # If the task timestamp is in the future the thread needs to wait
358 # until time elapse or until another task is scheduled
360 if now < task.timestamp:
361 # Calculate time difference in seconds
362 timeout = strfdiff(task.timestamp, now)
363 # Re-schedule task with the same timestamp
364 self._scheduler.schedule(task)
365 # Sleep until timeout or until a new task awakes the condition
367 self._cond.wait(timeout)
370 # Process tasks in parallel
371 runner.put(self._execute, task)
374 err = traceback.format_exc()
375 self._logger.error("Error while processing tasks in the EC: %s" % err)
377 def _execute(self, task):
379 task.status = TaskStatus.DONE
382 task.result = task.callback()
385 err = traceback.format_exc()
386 self._logger.error("Error while executing event: %s" % err)
389 task.status = TaskStatus.ERROR