2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
44 class ExperimentController(object):
45 def __init__(self, exp_id = None, root_dir = "/tmp"):
46 super(ExperimentController, self).__init__()
47 # root directory to store files
48 self._root_dir = root_dir
50 # experiment identifier given by the user
51 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
53 # generator of globally unique ids
54 self._guid_generator = guid.GuidGenerator()
57 self._resources = dict()
60 self._scheduler = HeapScheduler()
65 # Event processing thread
66 self._cond = threading.Condition()
67 self._thread = threading.Thread(target = self._process)
68 self._thread.setDaemon(True)
72 self._state = ECState.RUNNING
75 self._logger = logging.getLogger("ExperimentController")
88 if not exp_id.startswith("nepi-"):
89 exp_id = "nepi-" + exp_id
94 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
96 def wait_finished(self, guids):
97 while not all([self.state(guid) == ResourceState.FINISHED \
98 for guid in guids]) and not self.finished:
99 # We keep the sleep as large as possible to
100 # decrese the number of RM state requests
103 def get_task(self, tid):
104 return self._tasks.get(tid)
106 def get_resource(self, guid):
107 return self._resources.get(guid)
111 return self._resources.keys()
113 def register_resource(self, rtype, guid = None):
114 # Get next available guid
115 guid = self._guid_generator.next(guid)
118 rm = ResourceFactory.create(rtype, self, guid)
121 self._resources[guid] = rm
125 def get_attributes(self, guid):
126 rm = self.get_resource(guid)
127 return rm.get_attributes()
129 def get_filters(self, guid):
130 rm = self.get_resource(guid)
131 return rm.get_filters()
133 def register_connection(self, guid1, guid2):
134 rm1 = self.get_resource(guid1)
135 rm2 = self.get_resource(guid2)
140 def register_condition(self, group1, action, group2, state,
142 """ Registers an action START or STOP for all RM on group1 to occur
143 time 'time' after all elements in group2 reached state 'state'.
145 :param group1: List of guids of RMs subjected to action
148 :param action: Action to register (either START or STOP)
149 :type action: ResourceAction
151 :param group2: List of guids of RMs to we waited for
154 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
155 :type state: ResourceState
157 :param time: Time to wait after group2 has reached status
161 if isinstance(group1, int):
163 if isinstance(group2, int):
167 rm = self.get_resource(guid1)
168 rm.register_condition(action, group2, state, time)
170 def register_trace(self, guid, name):
173 :param name: Name of the trace
176 rm = self.get_resource(guid)
177 rm.register_trace(name)
179 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
180 """ Get information on collected trace
182 :param name: Name of the trace
185 :param attr: Can be one of:
186 - TraceAttr.ALL (complete trace content),
187 - TraceAttr.STREAM (block in bytes to read starting at offset),
188 - TraceAttr.PATH (full path to the trace file),
189 - TraceAttr.SIZE (size of trace file).
192 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
195 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
200 rm = self.get_resource(guid)
201 return rm.trace(name, attr, block, offset)
203 def discover(self, guid, filters):
204 rm = self.get_resource(guid)
205 return rm.discover(filters)
207 def provision(self, guid, filters):
208 rm = self.get_resource(guid)
209 return rm.provision(filters)
211 def get(self, guid, name):
212 rm = self.get_resource(guid)
215 def set(self, guid, name, value):
216 rm = self.get_resource(guid)
217 return rm.set(name, value)
219 def state(self, guid):
220 rm = self.get_resource(guid)
223 def stop(self, guid):
224 rm = self.get_resource(guid)
227 def start(self, guid):
228 rm = self.get_resource(guid)
231 def set_with_conditions(self, name, value, group1, group2, state,
233 """ Set value 'value' on attribute with name 'name' on all RMs of
234 group1 when 'time' has elapsed since all elements in group2
235 have reached state 'state'.
237 :param name: Name of attribute to set in RM
240 :param value: Value of attribute to set in RM
243 :param group1: List of guids of RMs subjected to action
246 :param action: Action to register (either START or STOP)
247 :type action: ResourceAction
249 :param group2: List of guids of RMs to we waited for
252 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
253 :type state: ResourceState
255 :param time: Time to wait after group2 has reached status
259 if isinstance(group1, int):
261 if isinstance(group2, int):
265 rm = self.get_resource(guid)
266 rm.set_with_conditions(name, value, group2, state, time)
268 def stop_with_conditions(self, guid):
269 rm = self.get_resource(guid)
270 return rm.stop_with_conditions()
272 def start_with_conditions(self, guid):
273 rm = self.get_resource(guid)
274 return rm.start_with_condition()
276 def deploy(self, group = None, wait_all_ready = True):
277 """ Deploy all resource manager in group
279 :param group: List of guids of RMs to deploy
282 :param wait_all_ready: Wait until all RMs are ready in
283 order to start the RMs
287 self.logger.debug(" ------- DEPLOY START ------ ")
290 group = self.resources
292 # Before starting deployment we disorder the group list with the
293 # purpose of speeding up the whole deployment process.
294 # It is likely that the user inserted in the 'group' list closely
295 # resources one after another (e.g. all applications
296 # connected to the same node can likely appear one after another).
297 # This can originate a slow down in the deployment since the N
298 # threads the parallel runner uses to processes tasks may all
299 # be taken up by the same family of resources waiting for the
300 # same conditions (e.g. LinuxApplications running on a same
301 # node share a single lock, so they will tend to be serialized).
302 # If we disorder the group list, this problem can be mitigated.
303 random.shuffle(group)
305 def wait_all_and_start(group):
308 rm = self.get_resource(guid)
309 if rm.state < ResourceState.READY:
314 callback = functools.partial(wait_all_and_start, group)
315 self.schedule("1s", callback)
317 # If all resources are read, we schedule the start
319 rm = self.get_resource(guid)
320 self.schedule("0.01s", rm.start_with_conditions)
323 # Schedule the function that will check all resources are
324 # READY, and only then it will schedule the start.
325 # This is aimed to reduce the number of tasks looping in the scheduler.
326 # Intead of having N start tasks, we will have only one
327 callback = functools.partial(wait_all_and_start, group)
328 self.schedule("1s", callback)
331 rm = self.get_resource(guid)
332 self.schedule("0.001s", rm.deploy)
334 if not wait_all_ready:
335 self.schedule("1s", rm.start_with_conditions)
337 if rm.conditions.get(ResourceAction.STOP):
338 # Only if the RM has STOP conditions we
339 # schedule a stop. Otherwise the RM will stop immediately
340 self.schedule("2s", rm.stop_with_conditions)
343 def release(self, group = None):
345 group = self.resources
349 rm = self.get_resource(guid)
350 thread = threading.Thread(target=rm.release)
351 threads.append(thread)
352 thread.setDaemon(True)
355 while list(threads) and not self.finished:
357 # Time out after 5 seconds to check EC not terminated
359 if not thread.is_alive():
360 threads.remove(thread)
365 self._stop_scheduler()
367 if self._thread.is_alive():
370 def schedule(self, date, callback, track = False):
371 """ Schedule a callback to be executed at time date.
373 date string containing execution time for the task.
374 It can be expressed as an absolute time, using
375 timestamp format, or as a relative time matching
376 ^\d+.\d+(h|m|s|ms|us)$
378 callback code to be executed for the task. Must be a
379 Python function, and receives args and kwargs
382 track if set to True, the task will be retrivable with
383 the get_task() method
385 timestamp = strfvalid(date)
387 task = Task(timestamp, callback)
388 task = self._scheduler.schedule(task)
391 self._tasks[task.id] = task
393 # Notify condition to wake up the processing thread
401 runner = ParallelRun(maxthreads = 50)
405 while not self.finished:
407 task = self._scheduler.next()
411 # It there are not tasks in the tasks queue we need to
412 # wait until a call to schedule wakes us up
417 # If the task timestamp is in the future the thread needs to wait
418 # until time elapse or until another task is scheduled
420 if now < task.timestamp:
421 # Calculate time difference in seconds
422 timeout = strfdiff(task.timestamp, now)
423 # Re-schedule task with the same timestamp
424 self._scheduler.schedule(task)
425 # Sleep until timeout or until a new task awakes the condition
427 self._cond.wait(timeout)
430 # Process tasks in parallel
431 runner.put(self._execute, task)
434 err = traceback.format_exc()
435 self._logger.error("Error while processing tasks in the EC: %s" % err)
437 self._state = ECState.FAILED
439 # Mark EC state as terminated
440 if self.ecstate == ECState.RUNNING:
441 # Synchronize to get errors if occurred
443 self._state = ECState.TERMINATED
445 def _execute(self, task):
447 task.status = TaskStatus.DONE
450 task.result = task.callback()
453 err = traceback.format_exc()
455 task.status = TaskStatus.ERROR
457 self._logger.error("Error occurred while executing task: %s" % err)
459 self._stop_scheduler()
461 # Propage error to the ParallelRunner
464 def _stop_scheduler(self):
465 # Mark the EC as failed
466 self._state = ECState.FAILED
468 # Wake up the EC in case it was sleeping