2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
44 class ExperimentController(object):
45 def __init__(self, exp_id = None, root_dir = "/tmp"):
46 super(ExperimentController, self).__init__()
47 # root directory to store files
48 self._root_dir = root_dir
50 # experiment identifier given by the user
51 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
53 # generator of globally unique ids
54 self._guid_generator = guid.GuidGenerator()
57 self._resources = dict()
60 self._scheduler = HeapScheduler()
65 # Event processing thread
66 self._cond = threading.Condition()
67 self._thread = threading.Thread(target = self._process)
68 self._thread.setDaemon(True)
72 self._state = ECState.RUNNING
75 self._logger = logging.getLogger("ExperimentController")
88 if not exp_id.startswith("nepi-"):
89 exp_id = "nepi-" + exp_id
94 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
96 def wait_finished(self, guids):
97 while not all([self.state(guid) == ResourceState.FINISHED \
98 for guid in guids]) and not self.finished:
99 # We keep the sleep as large as possible to
100 # decrese the number of RM state requests
103 def get_task(self, tid):
104 return self._tasks.get(tid)
106 def get_resource(self, guid):
107 return self._resources.get(guid)
111 return self._resources.keys()
113 def register_resource(self, rtype, guid = None):
114 # Get next available guid
115 guid = self._guid_generator.next(guid)
118 rm = ResourceFactory.create(rtype, self, guid)
121 self._resources[guid] = rm
125 def get_attributes(self, guid):
126 rm = self.get_resource(guid)
127 return rm.get_attributes()
129 def register_connection(self, guid1, guid2):
130 rm1 = self.get_resource(guid1)
131 rm2 = self.get_resource(guid2)
136 def register_condition(self, group1, action, group2, state,
138 """ Registers an action START or STOP for all RM on group1 to occur
139 time 'time' after all elements in group2 reached state 'state'.
141 :param group1: List of guids of RMs subjected to action
144 :param action: Action to register (either START or STOP)
145 :type action: ResourceAction
147 :param group2: List of guids of RMs to we waited for
150 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
151 :type state: ResourceState
153 :param time: Time to wait after group2 has reached status
157 if isinstance(group1, int):
159 if isinstance(group2, int):
163 rm = self.get_resource(guid1)
164 rm.register_condition(action, group2, state, time)
166 def register_trace(self, guid, name):
169 :param name: Name of the trace
172 rm = self.get_resource(guid)
173 rm.register_trace(name)
175 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
176 """ Get information on collected trace
178 :param name: Name of the trace
181 :param attr: Can be one of:
182 - TraceAttr.ALL (complete trace content),
183 - TraceAttr.STREAM (block in bytes to read starting at offset),
184 - TraceAttr.PATH (full path to the trace file),
185 - TraceAttr.SIZE (size of trace file).
188 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
191 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
196 rm = self.get_resource(guid)
197 return rm.trace(name, attr, block, offset)
199 def discover(self, guid):
200 rm = self.get_resource(guid)
203 def provision(self, guid):
204 rm = self.get_resource(guid)
205 return rm.provision()
207 def get(self, guid, name):
208 rm = self.get_resource(guid)
211 def set(self, guid, name, value):
212 rm = self.get_resource(guid)
213 return rm.set(name, value)
215 def state(self, guid, hr = False):
216 """ Returns the state of a resource
218 :param guid: Resource guid
221 :param hr: Human readable. Forces return of a
222 status string instead of a number
226 rm = self.get_resource(guid)
228 return ResourceState2str.get(rm.state)
232 def stop(self, guid):
233 rm = self.get_resource(guid)
236 def start(self, guid):
237 rm = self.get_resource(guid)
240 def set_with_conditions(self, name, value, group1, group2, state,
242 """ Set value 'value' on attribute with name 'name' on all RMs of
243 group1 when 'time' has elapsed since all elements in group2
244 have reached state 'state'.
246 :param name: Name of attribute to set in RM
249 :param value: Value of attribute to set in RM
252 :param group1: List of guids of RMs subjected to action
255 :param action: Action to register (either START or STOP)
256 :type action: ResourceAction
258 :param group2: List of guids of RMs to we waited for
261 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
262 :type state: ResourceState
264 :param time: Time to wait after group2 has reached status
268 if isinstance(group1, int):
270 if isinstance(group2, int):
274 rm = self.get_resource(guid)
275 rm.set_with_conditions(name, value, group2, state, time)
277 def stop_with_conditions(self, guid):
278 rm = self.get_resource(guid)
279 return rm.stop_with_conditions()
281 def start_with_conditions(self, guid):
282 rm = self.get_resource(guid)
283 return rm.start_with_condition()
285 def deploy(self, group = None, wait_all_ready = True):
286 """ Deploy all resource manager in group
288 :param group: List of guids of RMs to deploy
291 :param wait_all_ready: Wait until all RMs are ready in
292 order to start the RMs
296 self.logger.debug(" ------- DEPLOY START ------ ")
299 group = self.resources
301 # Before starting deployment we disorder the group list with the
302 # purpose of speeding up the whole deployment process.
303 # It is likely that the user inserted in the 'group' list closely
304 # resources one after another (e.g. all applications
305 # connected to the same node can likely appear one after another).
306 # This can originate a slow down in the deployment since the N
307 # threads the parallel runner uses to processes tasks may all
308 # be taken up by the same family of resources waiting for the
309 # same conditions (e.g. LinuxApplications running on a same
310 # node share a single lock, so they will tend to be serialized).
311 # If we disorder the group list, this problem can be mitigated.
312 random.shuffle(group)
314 def wait_all_and_start(group):
317 rm = self.get_resource(guid)
318 if rm.state < ResourceState.READY:
323 callback = functools.partial(wait_all_and_start, group)
324 self.schedule("1s", callback)
326 # If all resources are read, we schedule the start
328 rm = self.get_resource(guid)
329 self.schedule("0.01s", rm.start_with_conditions)
332 # Schedule the function that will check all resources are
333 # READY, and only then it will schedule the start.
334 # This is aimed to reduce the number of tasks looping in the scheduler.
335 # Intead of having N start tasks, we will have only one
336 callback = functools.partial(wait_all_and_start, group)
337 self.schedule("1s", callback)
340 rm = self.get_resource(guid)
341 self.schedule("0.001s", rm.deploy)
343 if not wait_all_ready:
344 self.schedule("1s", rm.start_with_conditions)
346 if rm.conditions.get(ResourceAction.STOP):
347 # Only if the RM has STOP conditions we
348 # schedule a stop. Otherwise the RM will stop immediately
349 self.schedule("2s", rm.stop_with_conditions)
352 def release(self, group = None):
354 group = self.resources
358 rm = self.get_resource(guid)
359 thread = threading.Thread(target=rm.release)
360 threads.append(thread)
361 thread.setDaemon(True)
364 while list(threads) and not self.finished:
366 # Time out after 5 seconds to check EC not terminated
368 if not thread.is_alive():
369 threads.remove(thread)
374 self._stop_scheduler()
376 if self._thread.is_alive():
379 def schedule(self, date, callback, track = False):
380 """ Schedule a callback to be executed at time date.
382 date string containing execution time for the task.
383 It can be expressed as an absolute time, using
384 timestamp format, or as a relative time matching
385 ^\d+.\d+(h|m|s|ms|us)$
387 callback code to be executed for the task. Must be a
388 Python function, and receives args and kwargs
391 track if set to True, the task will be retrivable with
392 the get_task() method
394 timestamp = strfvalid(date)
396 task = Task(timestamp, callback)
397 task = self._scheduler.schedule(task)
400 self._tasks[task.id] = task
402 # Notify condition to wake up the processing thread
410 runner = ParallelRun(maxthreads = 50)
414 while not self.finished:
416 task = self._scheduler.next()
420 # It there are not tasks in the tasks queue we need to
421 # wait until a call to schedule wakes us up
426 # If the task timestamp is in the future the thread needs to wait
427 # until time elapse or until another task is scheduled
429 if now < task.timestamp:
430 # Calculate time difference in seconds
431 timeout = strfdiff(task.timestamp, now)
432 # Re-schedule task with the same timestamp
433 self._scheduler.schedule(task)
434 # Sleep until timeout or until a new task awakes the condition
436 self._cond.wait(timeout)
439 # Process tasks in parallel
440 runner.put(self._execute, task)
443 err = traceback.format_exc()
444 self._logger.error("Error while processing tasks in the EC: %s" % err)
446 self._state = ECState.FAILED
448 # Mark EC state as terminated
449 if self.ecstate == ECState.RUNNING:
450 # Synchronize to get errors if occurred
452 self._state = ECState.TERMINATED
454 def _execute(self, task):
456 task.status = TaskStatus.DONE
459 task.result = task.callback()
462 err = traceback.format_exc()
464 task.status = TaskStatus.ERROR
466 self._logger.error("Error occurred while executing task: %s" % err)
468 self._stop_scheduler()
470 # Propage error to the ParallelRunner
473 def _stop_scheduler(self):
474 # Mark the EC as failed
475 self._state = ECState.FAILED
477 # Wake up the EC in case it was sleeping