2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
44 class ExperimentController(object):
45 def __init__(self, exp_id = None, root_dir = "/tmp"):
46 super(ExperimentController, self).__init__()
47 # root directory to store files
48 self._root_dir = root_dir
50 # experiment identifier given by the user
51 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
53 # generator of globally unique ids
54 self._guid_generator = guid.GuidGenerator()
57 self._resources = dict()
60 self._scheduler = HeapScheduler()
65 # Event processing thread
66 self._cond = threading.Condition()
67 self._thread = threading.Thread(target = self._process)
68 self._thread.setDaemon(True)
72 self._state = ECState.RUNNING
75 self._logger = logging.getLogger("ExperimentController")
88 if not exp_id.startswith("nepi-"):
89 exp_id = "nepi-" + exp_id
94 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
96 def wait_finished(self, guids):
97 while not all([self.state(guid) in [ResourceState.FINISHED,
98 ResourceState.STOPPED,
99 ResourceState.FAILED] \
100 for guid in guids]) and not self.finished:
101 # We keep the sleep as large as possible to
102 # decrese the number of RM state requests
105 def get_task(self, tid):
106 return self._tasks.get(tid)
108 def get_resource(self, guid):
109 return self._resources.get(guid)
113 return self._resources.keys()
115 def register_resource(self, rtype, guid = None):
116 # Get next available guid
117 guid = self._guid_generator.next(guid)
120 rm = ResourceFactory.create(rtype, self, guid)
123 self._resources[guid] = rm
127 def get_attributes(self, guid):
128 rm = self.get_resource(guid)
129 return rm.get_attributes()
131 def register_connection(self, guid1, guid2):
132 rm1 = self.get_resource(guid1)
133 rm2 = self.get_resource(guid2)
138 def register_condition(self, group1, action, group2, state,
140 """ Registers an action START or STOP for all RM on group1 to occur
141 time 'time' after all elements in group2 reached state 'state'.
143 :param group1: List of guids of RMs subjected to action
146 :param action: Action to register (either START or STOP)
147 :type action: ResourceAction
149 :param group2: List of guids of RMs to we waited for
152 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
153 :type state: ResourceState
155 :param time: Time to wait after group2 has reached status
159 if isinstance(group1, int):
161 if isinstance(group2, int):
165 rm = self.get_resource(guid1)
166 rm.register_condition(action, group2, state, time)
168 def register_trace(self, guid, name):
171 :param name: Name of the trace
174 rm = self.get_resource(guid)
175 rm.register_trace(name)
177 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
178 """ Get information on collected trace
180 :param name: Name of the trace
183 :param attr: Can be one of:
184 - TraceAttr.ALL (complete trace content),
185 - TraceAttr.STREAM (block in bytes to read starting at offset),
186 - TraceAttr.PATH (full path to the trace file),
187 - TraceAttr.SIZE (size of trace file).
190 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
193 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
198 rm = self.get_resource(guid)
199 return rm.trace(name, attr, block, offset)
201 def discover(self, guid):
202 rm = self.get_resource(guid)
205 def provision(self, guid):
206 rm = self.get_resource(guid)
207 return rm.provision()
209 def get(self, guid, name):
210 rm = self.get_resource(guid)
213 def set(self, guid, name, value):
214 rm = self.get_resource(guid)
215 return rm.set(name, value)
217 def state(self, guid, hr = False):
218 """ Returns the state of a resource
220 :param guid: Resource guid
223 :param hr: Human readable. Forces return of a
224 status string instead of a number
228 rm = self.get_resource(guid)
230 return ResourceState2str.get(rm.state)
234 def stop(self, guid):
235 rm = self.get_resource(guid)
238 def start(self, guid):
239 rm = self.get_resource(guid)
242 def set_with_conditions(self, name, value, group1, group2, state,
244 """ Set value 'value' on attribute with name 'name' on all RMs of
245 group1 when 'time' has elapsed since all elements in group2
246 have reached state 'state'.
248 :param name: Name of attribute to set in RM
251 :param value: Value of attribute to set in RM
254 :param group1: List of guids of RMs subjected to action
257 :param action: Action to register (either START or STOP)
258 :type action: ResourceAction
260 :param group2: List of guids of RMs to we waited for
263 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
264 :type state: ResourceState
266 :param time: Time to wait after group2 has reached status
270 if isinstance(group1, int):
272 if isinstance(group2, int):
276 rm = self.get_resource(guid)
277 rm.set_with_conditions(name, value, group2, state, time)
279 def stop_with_conditions(self, guid):
280 rm = self.get_resource(guid)
281 return rm.stop_with_conditions()
283 def start_with_conditions(self, guid):
284 rm = self.get_resource(guid)
285 return rm.start_with_condition()
287 def deploy(self, group = None, wait_all_ready = True):
288 """ Deploy all resource manager in group
290 :param group: List of guids of RMs to deploy
293 :param wait_all_ready: Wait until all RMs are ready in
294 order to start the RMs
298 self.logger.debug(" ------- DEPLOY START ------ ")
301 group = self.resources
303 # Before starting deployment we disorder the group list with the
304 # purpose of speeding up the whole deployment process.
305 # It is likely that the user inserted in the 'group' list closely
306 # resources one after another (e.g. all applications
307 # connected to the same node can likely appear one after another).
308 # This can originate a slow down in the deployment since the N
309 # threads the parallel runner uses to processes tasks may all
310 # be taken up by the same family of resources waiting for the
311 # same conditions (e.g. LinuxApplications running on a same
312 # node share a single lock, so they will tend to be serialized).
313 # If we disorder the group list, this problem can be mitigated.
314 random.shuffle(group)
316 def wait_all_and_start(group):
319 rm = self.get_resource(guid)
320 if rm.state < ResourceState.READY:
325 callback = functools.partial(wait_all_and_start, group)
326 self.schedule("1s", callback)
328 # If all resources are read, we schedule the start
330 rm = self.get_resource(guid)
331 self.schedule("0.01s", rm.start_with_conditions)
334 # Schedule the function that will check all resources are
335 # READY, and only then it will schedule the start.
336 # This is aimed to reduce the number of tasks looping in the scheduler.
337 # Intead of having N start tasks, we will have only one
338 callback = functools.partial(wait_all_and_start, group)
339 self.schedule("1s", callback)
342 rm = self.get_resource(guid)
343 self.schedule("0.001s", rm.deploy)
345 if not wait_all_ready:
346 self.schedule("1s", rm.start_with_conditions)
348 if rm.conditions.get(ResourceAction.STOP):
349 # Only if the RM has STOP conditions we
350 # schedule a stop. Otherwise the RM will stop immediately
351 self.schedule("2s", rm.stop_with_conditions)
354 def release(self, group = None):
356 group = self.resources
360 rm = self.get_resource(guid)
361 thread = threading.Thread(target=rm.release)
362 threads.append(thread)
363 thread.setDaemon(True)
366 while list(threads) and not self.finished:
368 # Time out after 5 seconds to check EC not terminated
370 if not thread.is_alive():
371 threads.remove(thread)
376 self._stop_scheduler()
378 if self._thread.is_alive():
381 def schedule(self, date, callback, track = False):
382 """ Schedule a callback to be executed at time date.
384 date string containing execution time for the task.
385 It can be expressed as an absolute time, using
386 timestamp format, or as a relative time matching
387 ^\d+.\d+(h|m|s|ms|us)$
389 callback code to be executed for the task. Must be a
390 Python function, and receives args and kwargs
393 track if set to True, the task will be retrivable with
394 the get_task() method
396 timestamp = strfvalid(date)
398 task = Task(timestamp, callback)
399 task = self._scheduler.schedule(task)
402 self._tasks[task.id] = task
404 # Notify condition to wake up the processing thread
412 runner = ParallelRun(maxthreads = 50)
416 while not self.finished:
418 task = self._scheduler.next()
422 # It there are not tasks in the tasks queue we need to
423 # wait until a call to schedule wakes us up
428 # If the task timestamp is in the future the thread needs to wait
429 # until time elapse or until another task is scheduled
431 if now < task.timestamp:
432 # Calculate time difference in seconds
433 timeout = strfdiff(task.timestamp, now)
434 # Re-schedule task with the same timestamp
435 self._scheduler.schedule(task)
436 # Sleep until timeout or until a new task awakes the condition
438 self._cond.wait(timeout)
441 # Process tasks in parallel
442 runner.put(self._execute, task)
445 err = traceback.format_exc()
446 self._logger.error("Error while processing tasks in the EC: %s" % err)
448 self._state = ECState.FAILED
450 # Mark EC state as terminated
451 if self.ecstate == ECState.RUNNING:
452 # Synchronize to get errors if occurred
454 self._state = ECState.TERMINATED
456 def _execute(self, task):
458 task.status = TaskStatus.DONE
461 task.result = task.callback()
464 err = traceback.format_exc()
466 task.status = TaskStatus.ERROR
468 self._logger.error("Error occurred while executing task: %s" % err)
470 self._stop_scheduler()
472 # Propage error to the ParallelRunner
475 def _stop_scheduler(self):
476 # Mark the EC as failed
477 self._state = ECState.FAILED
479 # Wake up the EC in case it was sleeping