2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
27 from nepi.util import guid
28 from nepi.util.parallel import ParallelRun
29 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
30 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
33 from nepi.execution.trace import TraceAttr
35 # TODO: use multiprocessing instead of threading
36 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
38 class ECState(object):
43 class ExperimentController(object):
44 def __init__(self, exp_id = None, root_dir = "/tmp"):
45 super(ExperimentController, self).__init__()
46 # root directory to store files
47 self._root_dir = root_dir
49 # experiment identifier given by the user
50 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
52 # generator of globally unique ids
53 self._guid_generator = guid.GuidGenerator()
56 self._resources = dict()
59 self._scheduler = HeapScheduler()
64 # Event processing thread
65 self._cond = threading.Condition()
66 self._thread = threading.Thread(target = self._process)
67 self._thread.setDaemon(True)
71 self._state = ECState.RUNNING
74 self._logger = logging.getLogger("ExperimentController")
87 if not exp_id.startswith("nepi-"):
88 exp_id = "nepi-" + exp_id
93 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
95 def wait_finished(self, guids):
96 while not all([self.state(guid) == ResourceState.FINISHED \
97 for guid in guids]) and not self.finished:
98 # We keep the sleep as large as possible to
99 # decrese the number of RM state requests
102 def get_task(self, tid):
103 return self._tasks.get(tid)
105 def get_resource(self, guid):
106 return self._resources.get(guid)
110 return self._resources.keys()
112 def register_resource(self, rtype, guid = None):
113 # Get next available guid
114 guid = self._guid_generator.next(guid)
117 rm = ResourceFactory.create(rtype, self, guid)
120 self._resources[guid] = rm
124 def get_attributes(self, guid):
125 rm = self.get_resource(guid)
126 return rm.get_attributes()
128 def get_filters(self, guid):
129 rm = self.get_resource(guid)
130 return rm.get_filters()
132 def register_connection(self, guid1, guid2):
133 rm1 = self.get_resource(guid1)
134 rm2 = self.get_resource(guid2)
139 def register_condition(self, group1, action, group2, state,
141 """ Registers an action START or STOP for all RM on group1 to occur
142 time 'time' after all elements in group2 reached state 'state'.
144 :param group1: List of guids of RMs subjected to action
147 :param action: Action to register (either START or STOP)
148 :type action: ResourceAction
150 :param group2: List of guids of RMs to we waited for
153 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
154 :type state: ResourceState
156 :param time: Time to wait after group2 has reached status
160 if isinstance(group1, int):
162 if isinstance(group2, int):
166 rm = self.get_resource(guid1)
167 rm.register_condition(action, group2, state, time)
169 def register_trace(self, guid, name):
172 :param name: Name of the trace
175 rm = self.get_resource(guid)
176 rm.register_trace(name)
178 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
179 """ Get information on collected trace
181 :param name: Name of the trace
184 :param attr: Can be one of:
185 - TraceAttr.ALL (complete trace content),
186 - TraceAttr.STREAM (block in bytes to read starting at offset),
187 - TraceAttr.PATH (full path to the trace file),
188 - TraceAttr.SIZE (size of trace file).
191 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
194 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
199 rm = self.get_resource(guid)
200 return rm.trace(name, attr, block, offset)
202 def discover(self, guid, filters):
203 rm = self.get_resource(guid)
204 return rm.discover(filters)
206 def provision(self, guid, filters):
207 rm = self.get_resource(guid)
208 return rm.provision(filters)
210 def get(self, guid, name):
211 rm = self.get_resource(guid)
214 def set(self, guid, name, value):
215 rm = self.get_resource(guid)
216 return rm.set(name, value)
218 def state(self, guid):
219 rm = self.get_resource(guid)
222 def stop(self, guid):
223 rm = self.get_resource(guid)
226 def start(self, guid):
227 rm = self.get_resource(guid)
230 def set_with_conditions(self, name, value, group1, group2, state,
232 """ Set value 'value' on attribute with name 'name' on all RMs of
233 group1 when 'time' has elapsed since all elements in group2
234 have reached state 'state'.
236 :param name: Name of attribute to set in RM
239 :param value: Value of attribute to set in RM
242 :param group1: List of guids of RMs subjected to action
245 :param action: Action to register (either START or STOP)
246 :type action: ResourceAction
248 :param group2: List of guids of RMs to we waited for
251 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
252 :type state: ResourceState
254 :param time: Time to wait after group2 has reached status
258 if isinstance(group1, int):
260 if isinstance(group2, int):
264 rm = self.get_resource(guid)
265 rm.set_with_conditions(name, value, group2, state, time)
267 def stop_with_conditions(self, guid):
268 rm = self.get_resource(guid)
269 return rm.stop_with_conditions()
271 def start_with_conditions(self, guid):
272 rm = self.get_resource(guid)
273 return rm.start_with_condition()
275 def deploy(self, group = None, wait_all_ready = True):
276 """ Deploy all resource manager in group
278 :param group: List of guids of RMs to deploy
281 :param wait_all_ready: Wait until all RMs are ready in
282 order to start the RMs
286 self.logger.debug(" ------- DEPLOY START ------ ")
293 rm.start_with_conditions()
295 # Only if the RM has STOP conditions we
296 # schedule a stop. Otherwise the RM will stop immediately
297 if rm.conditions.get(ResourceAction.STOP):
298 rm.stop_with_conditions()
301 err = traceback.format_exc()
303 self._logger.error("Error occurred while deploying resources: %s" % err)
309 group = self.resources
311 # Before starting deployment we disorder the group list with the
312 # purpose of speeding up the whole deployment process.
313 # It is likely that the user inserted in the 'group' list closely
314 # resources resources one after another (e.g. all applications
315 # connected to the same node can likely appear one after another).
316 # This can originate a slow down in the deployment since the N
317 # threads the parallel runner uses to processes tasks may all
318 # be taken up by the same family of resources waiting for the
320 # If we disorder the group list, this problem can be mitigated
321 random.shuffle(group)
325 rm = self.get_resource(guid)
330 self.register_condition(guid, ResourceAction.START,
331 towait, ResourceState.READY)
333 thread = threading.Thread(target = steps, args = (rm,))
334 threads.append(thread)
335 thread.setDaemon(True)
338 while list(threads) and not self.finished and not stop:
340 # Time out after 5 seconds to check EC not terminated
342 if not thread.is_alive():
343 threads.remove(thread)
347 self._stop_scheduler()
349 if self._thread.is_alive():
352 raise RuntimeError, "Error occurred, interrupting deployment "
354 def release(self, group = None):
356 group = self.resources
360 rm = self.get_resource(guid)
361 thread = threading.Thread(target=rm.release)
362 threads.append(thread)
363 thread.setDaemon(True)
366 while list(threads) and not self.finished:
368 # Time out after 5 seconds to check EC not terminated
370 if not thread.is_alive():
371 threads.remove(thread)
376 self._stop_scheduler()
378 if self._thread.is_alive():
381 def schedule(self, date, callback, track = False):
382 """ Schedule a callback to be executed at time date.
384 date string containing execution time for the task.
385 It can be expressed as an absolute time, using
386 timestamp format, or as a relative time matching
387 ^\d+.\d+(h|m|s|ms|us)$
389 callback code to be executed for the task. Must be a
390 Python function, and receives args and kwargs
393 track if set to True, the task will be retrivable with
394 the get_task() method
396 timestamp = strfvalid(date)
398 task = Task(timestamp, callback)
399 task = self._scheduler.schedule(task)
402 self._tasks[task.id] = task
404 # Notify condition to wake up the processing thread
412 runner = ParallelRun(maxthreads = 50)
416 while not self.finished:
418 task = self._scheduler.next()
422 # It there are not tasks in the tasks queue we need to
423 # wait until a call to schedule wakes us up
428 # If the task timestamp is in the future the thread needs to wait
429 # until time elapse or until another task is scheduled
431 if now < task.timestamp:
432 # Calculate time difference in seconds
433 timeout = strfdiff(task.timestamp, now)
434 # Re-schedule task with the same timestamp
435 self._scheduler.schedule(task)
436 # Sleep until timeout or until a new task awakes the condition
438 self._cond.wait(timeout)
441 # Process tasks in parallel
442 runner.put(self._execute, task)
446 err = traceback.format_exc()
447 self._logger.error("Error while processing tasks in the EC: %s" % err)
449 self._state = ECState.FAILED
453 # Mark EC state as terminated
454 if self.ecstate == ECState.RUNNING:
455 self._state = ECState.TERMINATED
457 def _execute(self, task):
459 task.status = TaskStatus.DONE
462 task.result = task.callback()
465 err = traceback.format_exc()
467 task.status = TaskStatus.ERROR
469 self._logger.error("Error occurred while executing task: %s" % err)
471 self._stop_scheduler()
473 # Propage error to the ParallelRunner
476 def _stop_scheduler(self):
477 # Mark the EC as failed
478 self._state = ECState.FAILED
480 # Wake up the EC in case it was sleeping