2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
44 class ExperimentController(object):
45 def __init__(self, exp_id = None, root_dir = "/tmp"):
46 super(ExperimentController, self).__init__()
47 # root directory to store files
48 self._root_dir = root_dir
50 # experiment identifier given by the user
51 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
53 # generator of globally unique ids
54 self._guid_generator = guid.GuidGenerator()
57 self._resources = dict()
60 self._scheduler = HeapScheduler()
65 # Event processing thread
66 self._cond = threading.Condition()
67 self._thread = threading.Thread(target = self._process)
68 self._thread.setDaemon(True)
72 self._state = ECState.RUNNING
75 self._logger = logging.getLogger("ExperimentController")
88 if not exp_id.startswith("nepi-"):
89 exp_id = "nepi-" + exp_id
94 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
96 def wait_finished(self, guids):
97 # Take into account if only one guids is given in parameter
98 while not all([self.state(guid) in [ResourceState.FINISHED,
99 ResourceState.STOPPED,
100 ResourceState.FAILED] \
101 for guid in guids]) and not self.finished:
102 # We keep the sleep as large as possible to
103 # decrese the number of RM state requests
106 def get_task(self, tid):
107 return self._tasks.get(tid)
109 def get_resource(self, guid):
110 return self._resources.get(guid)
114 return self._resources.keys()
116 def register_resource(self, rtype, guid = None):
117 # Get next available guid
118 guid = self._guid_generator.next(guid)
121 rm = ResourceFactory.create(rtype, self, guid)
124 self._resources[guid] = rm
128 def get_attributes(self, guid):
129 rm = self.get_resource(guid)
130 return rm.get_attributes()
132 def register_connection(self, guid1, guid2):
133 rm1 = self.get_resource(guid1)
134 rm2 = self.get_resource(guid2)
139 def register_condition(self, group1, action, group2, state,
141 """ Registers an action START or STOP for all RM on group1 to occur
142 time 'time' after all elements in group2 reached state 'state'.
144 :param group1: List of guids of RMs subjected to action
147 :param action: Action to register (either START or STOP)
148 :type action: ResourceAction
150 :param group2: List of guids of RMs to we waited for
153 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
154 :type state: ResourceState
156 :param time: Time to wait after group2 has reached status
160 if isinstance(group1, int):
162 if isinstance(group2, int):
166 rm = self.get_resource(guid1)
167 rm.register_condition(action, group2, state, time)
169 def register_trace(self, guid, name):
172 :param name: Name of the trace
175 rm = self.get_resource(guid)
176 rm.register_trace(name)
178 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
179 """ Get information on collected trace
181 :param name: Name of the trace
184 :param attr: Can be one of:
185 - TraceAttr.ALL (complete trace content),
186 - TraceAttr.STREAM (block in bytes to read starting at offset),
187 - TraceAttr.PATH (full path to the trace file),
188 - TraceAttr.SIZE (size of trace file).
191 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
194 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
199 rm = self.get_resource(guid)
200 return rm.trace(name, attr, block, offset)
202 def discover(self, guid):
203 rm = self.get_resource(guid)
206 def provision(self, guid):
207 rm = self.get_resource(guid)
208 return rm.provision()
210 def get(self, guid, name):
211 rm = self.get_resource(guid)
214 def set(self, guid, name, value):
215 rm = self.get_resource(guid)
216 return rm.set(name, value)
218 def state(self, guid, hr = False):
219 """ Returns the state of a resource
221 :param guid: Resource guid
224 :param hr: Human readable. Forces return of a
225 status string instead of a number
229 rm = self.get_resource(guid)
231 return ResourceState2str.get(rm.state)
235 def stop(self, guid):
236 rm = self.get_resource(guid)
239 def start(self, guid):
240 rm = self.get_resource(guid)
243 def set_with_conditions(self, name, value, group1, group2, state,
245 """ Set value 'value' on attribute with name 'name' on all RMs of
246 group1 when 'time' has elapsed since all elements in group2
247 have reached state 'state'.
249 :param name: Name of attribute to set in RM
252 :param value: Value of attribute to set in RM
255 :param group1: List of guids of RMs subjected to action
258 :param action: Action to register (either START or STOP)
259 :type action: ResourceAction
261 :param group2: List of guids of RMs to we waited for
264 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
265 :type state: ResourceState
267 :param time: Time to wait after group2 has reached status
271 if isinstance(group1, int):
273 if isinstance(group2, int):
277 rm = self.get_resource(guid)
278 rm.set_with_conditions(name, value, group2, state, time)
280 def stop_with_conditions(self, guid):
281 rm = self.get_resource(guid)
282 return rm.stop_with_conditions()
284 def start_with_conditions(self, guid):
285 rm = self.get_resource(guid)
286 return rm.start_with_condition()
288 def deploy(self, group = None, wait_all_ready = True):
289 """ Deploy all resource manager in group
291 :param group: List of guids of RMs to deploy
294 :param wait_all_ready: Wait until all RMs are ready in
295 order to start the RMs
299 self.logger.debug(" ------- DEPLOY START ------ ")
302 group = self.resources
304 # Before starting deployment we disorder the group list with the
305 # purpose of speeding up the whole deployment process.
306 # It is likely that the user inserted in the 'group' list closely
307 # resources one after another (e.g. all applications
308 # connected to the same node can likely appear one after another).
309 # This can originate a slow down in the deployment since the N
310 # threads the parallel runner uses to processes tasks may all
311 # be taken up by the same family of resources waiting for the
312 # same conditions (e.g. LinuxApplications running on a same
313 # node share a single lock, so they will tend to be serialized).
314 # If we disorder the group list, this problem can be mitigated.
315 random.shuffle(group)
317 def wait_all_and_start(group):
320 rm = self.get_resource(guid)
321 if rm.state < ResourceState.READY:
326 callback = functools.partial(wait_all_and_start, group)
327 self.schedule("1s", callback)
329 # If all resources are read, we schedule the start
331 rm = self.get_resource(guid)
332 self.schedule("0.01s", rm.start_with_conditions)
335 # Schedule the function that will check all resources are
336 # READY, and only then it will schedule the start.
337 # This is aimed to reduce the number of tasks looping in the scheduler.
338 # Intead of having N start tasks, we will have only one
339 callback = functools.partial(wait_all_and_start, group)
340 self.schedule("1s", callback)
343 rm = self.get_resource(guid)
344 self.schedule("0.001s", rm.deploy)
346 if not wait_all_ready:
347 self.schedule("1s", rm.start_with_conditions)
349 if rm.conditions.get(ResourceAction.STOP):
350 # Only if the RM has STOP conditions we
351 # schedule a stop. Otherwise the RM will stop immediately
352 self.schedule("2s", rm.stop_with_conditions)
355 def release(self, group = None):
357 group = self.resources
361 rm = self.get_resource(guid)
362 thread = threading.Thread(target=rm.release)
363 threads.append(thread)
364 thread.setDaemon(True)
367 while list(threads) and not self.finished:
369 # Time out after 5 seconds to check EC not terminated
371 if not thread.is_alive():
372 threads.remove(thread)
377 self._stop_scheduler()
379 if self._thread.is_alive():
382 def schedule(self, date, callback, track = False):
383 """ Schedule a callback to be executed at time date.
385 date string containing execution time for the task.
386 It can be expressed as an absolute time, using
387 timestamp format, or as a relative time matching
388 ^\d+.\d+(h|m|s|ms|us)$
390 callback code to be executed for the task. Must be a
391 Python function, and receives args and kwargs
394 track if set to True, the task will be retrivable with
395 the get_task() method
397 timestamp = strfvalid(date)
399 task = Task(timestamp, callback)
400 task = self._scheduler.schedule(task)
403 self._tasks[task.id] = task
405 # Notify condition to wake up the processing thread
413 runner = ParallelRun(maxthreads = 50)
417 while not self.finished:
419 task = self._scheduler.next()
423 # It there are not tasks in the tasks queue we need to
424 # wait until a call to schedule wakes us up
429 # If the task timestamp is in the future the thread needs to wait
430 # until time elapse or until another task is scheduled
432 if now < task.timestamp:
433 # Calculate time difference in seconds
434 timeout = strfdiff(task.timestamp, now)
435 # Re-schedule task with the same timestamp
436 self._scheduler.schedule(task)
437 # Sleep until timeout or until a new task awakes the condition
439 self._cond.wait(timeout)
442 # Process tasks in parallel
443 runner.put(self._execute, task)
446 err = traceback.format_exc()
447 self._logger.error("Error while processing tasks in the EC: %s" % err)
449 self._state = ECState.FAILED
451 # Mark EC state as terminated
452 if self.ecstate == ECState.RUNNING:
453 # Synchronize to get errors if occurred
455 self._state = ECState.TERMINATED
457 def _execute(self, task):
459 task.status = TaskStatus.DONE
462 task.result = task.callback()
465 err = traceback.format_exc()
467 task.status = TaskStatus.ERROR
469 self._logger.error("Error occurred while executing task: %s" % err)
471 self._stop_scheduler()
473 # Propage error to the ParallelRunner
476 def _stop_scheduler(self):
477 # Mark the EC as failed
478 self._state = ECState.FAILED
480 # Wake up the EC in case it was sleeping