2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Human readable identifier for the experiment.
52 It will be used in the name of the directory
53 where experiment related information is stored
56 :param root_dir: Root directory where experiment specific folder
57 will be created to store experiment information
61 The ExperimentController (EC), is the entity responsible for
62 managing a single experiment.
63 Through the EC interface the user can create ResourceManagers (RMs),
64 configure them and interconnect them, in order to describe the experiment.
66 Only when the 'deploy()' method is invoked, the EC will take actions
67 to transform the 'described' experiment into a 'running' experiment.
69 While the experiment is running, it is possible to continue to
70 create/configure/connect RMs, and to deploy them to involve new
71 resources in the experiment.
75 def __init__(self, exp_id = None, root_dir = "/tmp"):
76 super(ExperimentController, self).__init__()
77 # root directory to store files
78 self._root_dir = root_dir
80 # experiment identifier given by the user
81 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
83 # generator of globally unique ids
84 self._guid_generator = guid.GuidGenerator()
87 self._resources = dict()
90 self._scheduler = HeapScheduler()
95 # Event processing thread
96 self._cond = threading.Condition()
97 self._thread = threading.Thread(target = self._process)
98 self._thread.setDaemon(True)
102 self._state = ECState.RUNNING
105 self._logger = logging.getLogger("ExperimentController")
109 """ Return the logger of the Experiment Controller
116 """ Return the state of the Experiment Controller
123 """ Return the experiment ID
126 exp_id = self._exp_id
127 if not exp_id.startswith("nepi-"):
128 exp_id = "nepi-" + exp_id
133 """ Put the state of the Experiment Controller into a final state :
134 Either TERMINATED or FAILED
137 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
139 def wait_finished(self, guids):
140 """ Blocking method that wait until all the RM from the 'guid' list
141 reach the state FINISHED
143 :param guids: List of guids
146 if isinstance(guids, int):
149 while not all([self.state(guid) in [ResourceState.FINISHED,
150 ResourceState.STOPPED,
151 ResourceState.FAILED] \
152 for guid in guids]) and not self.finished:
153 # We keep the sleep as large as possible to
154 # decrese the number of RM state requests
157 def get_task(self, tid):
158 """ Get a specific task
160 :param tid: Id of the task
164 return self._tasks.get(tid)
166 def get_resource(self, guid):
167 """ Get a specific Resource Manager
169 :param guid: Id of the task
171 :rtype: ResourceManager
173 return self._resources.get(guid)
177 """ Returns the list of all the Resource Manager Id
182 return self._resources.keys()
184 def register_resource(self, rtype, guid = None):
185 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
186 for the RM of type 'rtype' and add it to the list of Resources.
188 :param rtype: Type of the RM
190 :return: Id of the RM
193 # Get next available guid
194 guid = self._guid_generator.next(guid)
197 rm = ResourceFactory.create(rtype, self, guid)
200 self._resources[guid] = rm
204 def get_attributes(self, guid):
205 """ Return all the attibutes of a specific RM
207 :param guid: Guid of the RM
209 :return: List of attributes
212 rm = self.get_resource(guid)
213 return rm.get_attributes()
215 def register_connection(self, guid1, guid2):
216 """ Registers a guid1 with a guid2.
217 The declaration order is not important
219 :param guid1: First guid to connect
220 :type guid1: ResourceManager
222 :param guid2: Second guid to connect
223 :type guid: ResourceManager
225 rm1 = self.get_resource(guid1)
226 rm2 = self.get_resource(guid2)
231 def register_condition(self, group1, action, group2, state,
233 """ Registers an action START or STOP for all RM on group1 to occur
234 time 'time' after all elements in group2 reached state 'state'.
236 :param group1: List of guids of RMs subjected to action
239 :param action: Action to register (either START or STOP)
240 :type action: ResourceAction
242 :param group2: List of guids of RMs to we waited for
245 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
246 :type state: ResourceState
248 :param time: Time to wait after group2 has reached status
252 if isinstance(group1, int):
254 if isinstance(group2, int):
258 rm = self.get_resource(guid1)
259 rm.register_condition(action, group2, state, time)
261 def register_trace(self, guid, name):
264 :param name: Name of the trace
267 rm = self.get_resource(guid)
268 rm.register_trace(name)
270 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
271 """ Get information on collected trace
273 :param name: Name of the trace
276 :param attr: Can be one of:
277 - TraceAttr.ALL (complete trace content),
278 - TraceAttr.STREAM (block in bytes to read starting at offset),
279 - TraceAttr.PATH (full path to the trace file),
280 - TraceAttr.SIZE (size of trace file).
283 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
286 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
291 rm = self.get_resource(guid)
292 return rm.trace(name, attr, block, offset)
294 def discover(self, guid):
295 """ Discover a specific RM defined by its 'guid'
297 :param guid: Guid of the RM
301 rm = self.get_resource(guid)
304 def provision(self, guid):
305 """ Provision a specific RM defined by its 'guid'
307 :param guid: Guid of the RM
311 rm = self.get_resource(guid)
312 return rm.provision()
314 def get(self, guid, name):
315 """ Get a specific attribute 'name' from the RM 'guid'
317 :param guid: Guid of the RM
320 :param name: attribute's name
324 rm = self.get_resource(guid)
327 def set(self, guid, name, value):
328 """ Set a specific attribute 'name' from the RM 'guid'
329 with the value 'value'
331 :param guid: Guid of the RM
334 :param name: attribute's name
337 :param value: attribute's value
340 rm = self.get_resource(guid)
341 return rm.set(name, value)
343 def state(self, guid, hr = False):
344 """ Returns the state of a resource
346 :param guid: Resource guid
349 :param hr: Human readable. Forces return of a
350 status string instead of a number
354 rm = self.get_resource(guid)
356 return ResourceState2str.get(rm.state)
360 def stop(self, guid):
361 """ Stop a specific RM defined by its 'guid'
363 :param guid: Guid of the RM
367 rm = self.get_resource(guid)
370 def start(self, guid):
371 """ Start a specific RM defined by its 'guid'
373 :param guid: Guid of the RM
377 rm = self.get_resource(guid)
380 def set_with_conditions(self, name, value, group1, group2, state,
382 """ Set value 'value' on attribute with name 'name' on all RMs of
383 group1 when 'time' has elapsed since all elements in group2
384 have reached state 'state'.
386 :param name: Name of attribute to set in RM
389 :param value: Value of attribute to set in RM
392 :param group1: List of guids of RMs subjected to action
395 :param action: Action to register (either START or STOP)
396 :type action: ResourceAction
398 :param group2: List of guids of RMs to we waited for
401 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
402 :type state: ResourceState
404 :param time: Time to wait after group2 has reached status
408 if isinstance(group1, int):
410 if isinstance(group2, int):
414 rm = self.get_resource(guid)
415 rm.set_with_conditions(name, value, group2, state, time)
417 def stop_with_conditions(self, guid):
418 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
420 :param guid: Guid of the RM
424 rm = self.get_resource(guid)
425 return rm.stop_with_conditions()
427 def start_with_conditions(self, guid):
428 """ Start a specific RM defined by its 'guid' only if all the conditions are true
430 :param guid: Guid of the RM
434 rm = self.get_resource(guid)
435 return rm.start_with_condition()
437 def deploy(self, group = None, wait_all_ready = True):
438 """ Deploy all resource manager in group
440 :param group: List of guids of RMs to deploy
443 :param wait_all_ready: Wait until all RMs are ready in
444 order to start the RMs
448 self.logger.debug(" ------- DEPLOY START ------ ")
451 group = self.resources
453 if isinstance(group, int):
456 # Before starting deployment we disorder the group list with the
457 # purpose of speeding up the whole deployment process.
458 # It is likely that the user inserted in the 'group' list closely
459 # resources one after another (e.g. all applications
460 # connected to the same node can likely appear one after another).
461 # This can originate a slow down in the deployment since the N
462 # threads the parallel runner uses to processes tasks may all
463 # be taken up by the same family of resources waiting for the
464 # same conditions (e.g. LinuxApplications running on a same
465 # node share a single lock, so they will tend to be serialized).
466 # If we disorder the group list, this problem can be mitigated.
467 #random.shuffle(group)
469 def wait_all_and_start(group):
472 rm = self.get_resource(guid)
473 if rm.state < ResourceState.READY:
478 callback = functools.partial(wait_all_and_start, group)
479 self.schedule("1s", callback)
481 # If all resources are read, we schedule the start
483 rm = self.get_resource(guid)
484 self.schedule("0s", rm.start_with_conditions)
487 # Schedule the function that will check all resources are
488 # READY, and only then it will schedule the start.
489 # This is aimed to reduce the number of tasks looping in the scheduler.
490 # Intead of having N start tasks, we will have only one
491 callback = functools.partial(wait_all_and_start, group)
492 self.schedule("1s", callback)
495 rm = self.get_resource(guid)
496 self.schedule("0s", rm.deploy)
498 if not wait_all_ready:
499 self.schedule("1s", rm.start_with_conditions)
501 if rm.conditions.get(ResourceAction.STOP):
502 # Only if the RM has STOP conditions we
503 # schedule a stop. Otherwise the RM will stop immediately
504 self.schedule("2s", rm.stop_with_conditions)
507 def release(self, group = None):
508 """ Release the elements of the list 'group' or
509 all the resources if any group is specified
511 :param group: List of RM
516 group = self.resources
520 rm = self.get_resource(guid)
521 thread = threading.Thread(target=rm.release)
522 threads.append(thread)
523 thread.setDaemon(True)
526 while list(threads) and not self.finished:
528 # Time out after 5 seconds to check EC not terminated
530 if not thread.is_alive():
531 threads.remove(thread)
534 """ Shutdown the Experiment Controller.
535 Releases all the resources and stops task processing thread
540 # Mark the EC state as TERMINATED
541 self._state = ECState.TERMINATED
543 # Notify condition to wake up the processing thread
546 if self._thread.is_alive():
549 def schedule(self, date, callback, track = False):
550 """ Schedule a callback to be executed at time date.
552 :param date: string containing execution time for the task.
553 It can be expressed as an absolute time, using
554 timestamp format, or as a relative time matching
555 ^\d+.\d+(h|m|s|ms|us)$
557 :param callback: code to be executed for the task. Must be a
558 Python function, and receives args and kwargs
561 :param track: if set to True, the task will be retrivable with
562 the get_task() method
564 :return : The Id of the task
566 timestamp = strfvalid(date)
568 task = Task(timestamp, callback)
569 task = self._scheduler.schedule(task)
572 self._tasks[task.id] = task
574 # Notify condition to wake up the processing thread
580 """ Process scheduled tasks.
582 The _process method is executed in an independent thread held by the
583 ExperimentController for as long as the experiment is running.
585 Tasks are scheduled by invoking the schedule method with a target callback.
586 The schedule method is given a execution time which controls the
587 order in which tasks are processed.
589 Tasks are processed in parallel using multithreading.
590 The environmental variable NEPI_NTHREADS can be used to control
591 the number of threads used to process tasks. The default value is 50.
595 To execute tasks in parallel, an ParallelRunner (PR) object, holding
596 a pool of threads (workers), is used.
597 For each available thread in the PR, the next task popped from
598 the scheduler queue is 'put' in the PR.
599 Upon receiving a task to execute, each PR worker (thread) invokes the
600 _execute method of the EC, passing the task as argument.
601 This method, calls task.callback inside a try/except block. If an
602 exception is raised by the tasks.callback, it will be trapped by the
603 try block, logged to standard error (usually the console), and the EC
604 state will be set to ECState.FAILED.
605 The invocation of _notify immediately after, forces the processing
606 loop in the _process method, to wake up if it was blocked waiting for new
607 tasks to arrived, and to check the EC state.
608 As the EC is in FAILED state, the processing loop exits and the
609 'finally' block is invoked. In the 'finally' block, the 'sync' method
610 of the PR is invoked, which forces the PR to raise any unchecked errors
611 that might have been raised by the workers.
614 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
616 runner = ParallelRun(maxthreads = nthreads)
620 while not self.finished:
622 task = self._scheduler.next()
626 # It there are not tasks in the tasks queue we need to
627 # wait until a call to schedule wakes us up
632 # If the task timestamp is in the future the thread needs to wait
633 # until time elapse or until another task is scheduled
635 if now < task.timestamp:
636 # Calculate time difference in seconds
637 timeout = strfdiff(task.timestamp, now)
638 # Re-schedule task with the same timestamp
639 self._scheduler.schedule(task)
640 # Sleep until timeout or until a new task awakes the condition
642 self._cond.wait(timeout)
645 # Process tasks in parallel
646 runner.put(self._execute, task)
649 err = traceback.format_exc()
650 self._logger.error("Error while processing tasks in the EC: %s" % err)
652 self._state = ECState.FAILED
656 def _execute(self, task):
657 """ Executes a single task.
659 If the invokation of the task callback raises an
660 exception, the processing thread of the ExperimentController
661 will be stopped and the experiment will be aborted.
663 :param task: Object containing the callback to execute
668 task.status = TaskStatus.DONE
671 task.result = task.callback()
674 err = traceback.format_exc()
676 task.status = TaskStatus.ERROR
678 self._logger.error("Error occurred while executing task: %s" % err)
680 # Set the EC to FAILED state (this will force to exit the task
682 self._state = ECState.FAILED
684 # Notify condition to wake up the processing thread
687 # Propage error to the ParallelRunner
691 """ Awakes the processing thread in case it is blocked waiting
692 for a new task to be scheduled.