2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
40 class FailureLevel(object):
41 """ Describes the system failure state
47 class FailureManager(object):
48 """ The FailureManager is responsible for handling errors,
49 and deciding whether an experiment should be aborted
52 def __init__(self, ec):
53 self._ec = weakref.ref(ec)
54 self._failure_level = FailureLevel.OK
58 """ Returns the Experiment Controller """
63 if self._failure_level == FailureLevel.OK:
64 for guid in self.ec.resources:
65 state = self.ec.state(guid)
66 critical = self.ec.get(guid, "critical")
67 if state == ResourceState.FAILED and critical:
68 self._failure_level = FailureLevel.RM_FAILURE
69 self.ec.logger.debug("RM critical failure occurred on guid %d." \
70 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
73 return self._failure_level != FailureLevel.OK
75 def set_ec_failure(self):
76 self._failure_level = FailureLevel.EC_FAILURE
79 class ECState(object):
80 """ State of the Experiment Controller
87 class ExperimentController(object):
89 .. class:: Class Args :
91 :param exp_id: Human readable identifier for the experiment scenario.
96 An experiment, or scenario, is defined by a concrete set of resources,
97 behavior, configuration and interconnection of those resources.
98 The Experiment Description (ED) is a detailed representation of a
99 single experiment. It contains all the necessary information to
100 allow repeating the experiment. NEPI allows to describe
101 experiments by registering components (resources), configuring them
102 and interconnecting them.
104 A same experiment (scenario) can be executed many times, generating
105 different results. We call an experiment execution (instance) a 'run'.
107 The ExperimentController (EC), is the entity responsible of
108 managing an experiment run. The same scenario can be
109 recreated (and re-run) by instantiating an EC and recreating
110 the same experiment description.
112 In NEPI, an experiment is represented as a graph of interconnected
113 resources. A resource is a generic concept in the sense that any
114 component taking part of an experiment, whether physical of
115 virtual, is considered a resource. A resources could be a host,
116 a virtual machine, an application, a simulator, a IP address.
118 A ResourceManager (RM), is the entity responsible for managing a
119 single resource. ResourceManagers are specific to a resource
120 type (i.e. An RM to control a Linux application will not be
121 the same as the RM used to control a ns-3 simulation).
122 To support a new type of resource in NEPI, a new RM must be
123 implemented. NEPI already provides a variety of
124 RMs to control basic resources, and new can be extended from
127 Through the EC interface the user can create ResourceManagers (RMs),
128 configure them and interconnect them, to describe an experiment.
129 Describing an experiment through the EC does not run the experiment.
130 Only when the 'deploy()' method is invoked on the EC, the EC will take
131 actions to transform the 'described' experiment into a 'running' experiment.
133 While the experiment is running, it is possible to continue to
134 create/configure/connect RMs, and to deploy them to involve new
135 resources in the experiment (this is known as 'interactive' deployment).
137 An experiments in NEPI is identified by a string id,
138 which is either given by the user, or automatically generated by NEPI.
139 The purpose of this identifier is to separate files and results that
140 belong to different experiment scenarios.
141 However, since a same 'experiment' can be run many times, the experiment
142 id is not enough to identify an experiment instance (run).
143 For this reason, the ExperimentController has two identifier, the
144 exp_id, which can be re-used in different ExperimentController,
145 and the run_id, which is unique to one ExperimentController instance, and
146 is automatically generated by NEPI.
150 def __init__(self, exp_id = None):
151 super(ExperimentController, self).__init__()
153 self._logger = logging.getLogger("ExperimentController")
155 # Run identifier. It identifies a concrete execution instance (run)
157 # Since a same experiment (same configuration) can be executed many
158 # times, this run_id permits to separate result files generated on
159 # different experiment executions
160 self._run_id = tsformat()
162 # Experiment identifier. Usually assigned by the user
163 # Identifies the experiment scenario (i.e. configuration,
164 # resources used, etc)
165 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
167 # generator of globally unique ids
168 self._guid_generator = guid.GuidGenerator()
171 self._resources = dict()
173 # Scheduler. It a queue that holds tasks scheduled for
174 # execution, and yields the next task to be executed
175 # ordered by execution and arrival time
176 self._scheduler = HeapScheduler()
181 # RM groups (for deployment)
182 self._groups = dict()
184 # generator of globally unique id for groups
185 self._group_id_generator = guid.GuidGenerator()
187 # Flag to stop processing thread
190 # Entity in charge of managing system failures
191 self._fm = FailureManager(self)
194 self._state = ECState.RUNNING
196 # The runner is a pool of threads used to parallelize
198 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
199 self._runner = ParallelRun(maxthreads = nthreads)
201 # Event processing thread
202 self._cond = threading.Condition()
203 self._thread = threading.Thread(target = self._process)
204 self._thread.setDaemon(True)
209 """ Return the logger of the Experiment Controller
216 """ Return the state of the Experiment Controller
223 """ Return the experiment id assigned by the user
230 """ Return the experiment instance (run) identifier
237 return self._fm.abort
239 def wait_finished(self, guids):
240 """ Blocking method that wait until all RMs in the 'guid' list
241 reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or
242 RELEASED ) or until a System Failure occurs (e.g. Task Failure)
244 :param guids: List of guids
252 return self.wait(guids, state = ResourceState.STOPPED,
255 def wait_started(self, guids):
256 """ Blocking method that wait until all RMs in the 'guid' list
257 reach a state >= STARTED or until a System Failure occurs
260 :param guids: List of guids
267 return self.wait(guids, state = ResourceState.STARTED,
270 def wait_released(self, guids):
271 """ Blocking method that wait until all RMs in the 'guid' list
272 reach a state = RELEASED or until the EC fails
274 :param guids: List of guids
279 return self._state == ECState.FAILED
281 return self.wait(guids, state = ResourceState.RELEASED,
284 def wait_deployed(self, guids):
285 """ Blocking method that wait until all RMs in the 'guid' list
286 reach a state >= READY or until a System Failure occurs
289 :param guids: List of guids
296 return self.wait(guids, state = ResourceState.READY,
299 def wait(self, guids, state, quit):
300 """ Blocking method that wait until all RMs in the 'guid' list
301 reach a state >= 'state' or until quit yileds True
303 :param guids: List of guids
306 if isinstance(guids, int):
309 # Make a copy to avoid modifying the original guids list
313 # If there are no more guids to wait for
314 # or the quit function returns True, exit the loop
315 if len(guids) == 0 or quit():
318 # If a guid reached one of the target states, remove it from list
320 rstate = self.state(guid)
322 hrrstate = ResourceState2str.get(rstate)
323 hrstate = ResourceState2str.get(state)
327 rm = self.get_resource(guid)
328 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
329 rm.get_rtype(), guid, hrrstate, hrstate))
332 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
333 guid, hrrstate, hrstate))
336 def get_task(self, tid):
337 """ Get a specific task
339 :param tid: Id of the task
343 return self._tasks.get(tid)
345 def get_resource(self, guid):
346 """ Get a specific Resource Manager
348 :param guid: Id of the task
350 :rtype: ResourceManager
352 return self._resources.get(guid)
356 """ Returns the list of all the Resource Manager Id
361 return self._resources.keys()
363 def register_resource(self, rtype, guid = None):
364 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
365 for the RM of type 'rtype' and add it to the list of Resources.
367 :param rtype: Type of the RM
369 :return: Id of the RM
372 # Get next available guid
373 guid = self._guid_generator.next(guid)
376 rm = ResourceFactory.create(rtype, self, guid)
379 self._resources[guid] = rm
383 def get_attributes(self, guid):
384 """ Return all the attibutes of a specific RM
386 :param guid: Guid of the RM
388 :return: List of attributes
391 rm = self.get_resource(guid)
392 return rm.get_attributes()
394 def register_connection(self, guid1, guid2):
395 """ Registers a guid1 with a guid2.
396 The declaration order is not important
398 :param guid1: First guid to connect
399 :type guid1: ResourceManager
401 :param guid2: Second guid to connect
402 :type guid: ResourceManager
404 rm1 = self.get_resource(guid1)
405 rm2 = self.get_resource(guid2)
407 rm1.register_connection(guid2)
408 rm2.register_connection(guid1)
410 def register_condition(self, guids1, action, guids2, state,
412 """ Registers an action START or STOP for all RM on guids1 to occur
413 time 'time' after all elements in guids2 reached state 'state'.
415 :param guids1: List of guids of RMs subjected to action
418 :param action: Action to register (either START or STOP)
419 :type action: ResourceAction
421 :param guids2: List of guids of RMs to we waited for
424 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
425 :type state: ResourceState
427 :param time: Time to wait after guids2 has reached status
431 if isinstance(guids1, int):
433 if isinstance(guids2, int):
437 rm = self.get_resource(guid1)
438 rm.register_condition(action, guids2, state, time)
440 def enable_trace(self, guid, name):
443 :param name: Name of the trace
446 rm = self.get_resource(guid)
447 rm.enable_trace(name)
449 def trace_enabled(self, guid, name):
450 """ Returns True if trace is enabled
452 :param name: Name of the trace
455 rm = self.get_resource(guid)
456 return rm.trace_enabled(name)
458 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
459 """ Get information on collected trace
461 :param name: Name of the trace
464 :param attr: Can be one of:
465 - TraceAttr.ALL (complete trace content),
466 - TraceAttr.STREAM (block in bytes to read starting at offset),
467 - TraceAttr.PATH (full path to the trace file),
468 - TraceAttr.SIZE (size of trace file).
471 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
474 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
479 rm = self.get_resource(guid)
480 return rm.trace(name, attr, block, offset)
482 def discover(self, guid):
483 """ Discover a specific RM defined by its 'guid'
485 :param guid: Guid of the RM
489 rm = self.get_resource(guid)
492 def provision(self, guid):
493 """ Provision a specific RM defined by its 'guid'
495 :param guid: Guid of the RM
499 rm = self.get_resource(guid)
500 return rm.provision()
502 def get(self, guid, name):
503 """ Get a specific attribute 'name' from the RM 'guid'
505 :param guid: Guid of the RM
508 :param name: attribute's name
512 rm = self.get_resource(guid)
515 def set(self, guid, name, value):
516 """ Set a specific attribute 'name' from the RM 'guid'
517 with the value 'value'
519 :param guid: Guid of the RM
522 :param name: attribute's name
525 :param value: attribute's value
528 rm = self.get_resource(guid)
529 return rm.set(name, value)
531 def state(self, guid, hr = False):
532 """ Returns the state of a resource
534 :param guid: Resource guid
537 :param hr: Human readable. Forces return of a
538 status string instead of a number
542 rm = self.get_resource(guid)
546 return ResourceState2str.get(state)
550 def stop(self, guid):
551 """ Stop a specific RM defined by its 'guid'
553 :param guid: Guid of the RM
557 rm = self.get_resource(guid)
560 def start(self, guid):
561 """ Start a specific RM defined by its 'guid'
563 :param guid: Guid of the RM
567 rm = self.get_resource(guid)
570 def set_with_conditions(self, name, value, guids1, guids2, state,
572 """ Set value 'value' on attribute with name 'name' on all RMs of
573 guids1 when 'time' has elapsed since all elements in guids2
574 have reached state 'state'.
576 :param name: Name of attribute to set in RM
579 :param value: Value of attribute to set in RM
582 :param guids1: List of guids of RMs subjected to action
585 :param action: Action to register (either START or STOP)
586 :type action: ResourceAction
588 :param guids2: List of guids of RMs to we waited for
591 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
592 :type state: ResourceState
594 :param time: Time to wait after guids2 has reached status
598 if isinstance(guids1, int):
600 if isinstance(guids2, int):
604 rm = self.get_resource(guid)
605 rm.set_with_conditions(name, value, guids2, state, time)
607 def deploy(self, guids = None, wait_all_ready = True, group = None):
608 """ Deploy all resource manager in guids list
610 :param guids: List of guids of RMs to deploy
613 :param wait_all_ready: Wait until all RMs are ready in
614 order to start the RMs
617 :param group: Id of deployment group in which to deploy RMs
621 self.logger.debug(" ------- DEPLOY START ------ ")
624 # If no guids list was passed, all 'NEW' RMs will be deployed
626 for guid in self.resources:
627 if self.state(guid) == ResourceState.NEW:
630 if isinstance(guids, int):
633 # Create deployment group
634 # New guids can be added to a same deployment group later on
638 group = self._group_id_generator.next()
640 if group not in self._groups:
641 self._groups[group] = []
643 self._groups[group].extend(guids)
645 def wait_all_and_start(group):
646 # Function that checks if all resources are READY
647 # before scheduling a start_with_conditions for each RM
650 # Get all guids in group
651 guids = self._groups[group]
654 if self.state(guid) < ResourceState.READY:
660 callback = functools.partial(wait_all_and_start, group)
661 self.schedule("1s", callback)
663 # If all resources are ready, we schedule the start
665 rm = self.get_resource(guid)
666 self.schedule("0s", rm.start_with_conditions)
668 if wait_all_ready and new_group:
669 # Schedule a function to check that all resources are
670 # READY, and only then schedule the start.
671 # This aims at reducing the number of tasks looping in the
673 # Instead of having many start tasks, we will have only one for
675 callback = functools.partial(wait_all_and_start, group)
676 self.schedule("0s", callback)
679 rm = self.get_resource(guid)
680 rm.deployment_group = group
681 self.schedule("0s", rm.deploy_with_conditions)
683 if not wait_all_ready:
684 self.schedule("0s", rm.start_with_conditions)
686 if rm.conditions.get(ResourceAction.STOP):
687 # Only if the RM has STOP conditions we
688 # schedule a stop. Otherwise the RM will stop immediately
689 self.schedule("0s", rm.stop_with_conditions)
691 def release(self, guids = None):
692 """ Release al RMs on the guids list or
693 all the resources if no list is specified
695 :param guids: List of RM guids
700 guids = self.resources
702 # Remove all pending tasks from the scheduler queue
703 for tid in list(self._scheduler.pending):
704 self._scheduler.remove(tid)
709 rm = self.get_resource(guid)
710 self.schedule("0s", rm.release)
712 self.wait_released(guids)
715 """ Shutdown the Experiment Controller.
716 Releases all the resources and stops task processing thread
719 # If there was a major failure we can't exit gracefully
720 if self._state == ECState.FAILED:
721 raise RuntimeError("EC failure. Can not exit gracefully")
725 # Mark the EC state as TERMINATED
726 self._state = ECState.TERMINATED
728 # Stop processing thread
731 # Notify condition to wake up the processing thread
734 if self._thread.is_alive():
737 def schedule(self, date, callback, track = False):
738 """ Schedule a callback to be executed at time date.
740 :param date: string containing execution time for the task.
741 It can be expressed as an absolute time, using
742 timestamp format, or as a relative time matching
743 ^\d+.\d+(h|m|s|ms|us)$
745 :param callback: code to be executed for the task. Must be a
746 Python function, and receives args and kwargs
749 :param track: if set to True, the task will be retrivable with
750 the get_task() method
752 :return : The Id of the task
754 timestamp = stabsformat(date)
755 task = Task(timestamp, callback)
756 task = self._scheduler.schedule(task)
759 self._tasks[task.id] = task
761 # Notify condition to wake up the processing thread
767 """ Process scheduled tasks.
771 The _process method is executed in an independent thread held by the
772 ExperimentController for as long as the experiment is running.
774 Tasks are scheduled by invoking the schedule method with a target callback.
775 The schedule method is given a execution time which controls the
776 order in which tasks are processed.
778 Tasks are processed in parallel using multithreading.
779 The environmental variable NEPI_NTHREADS can be used to control
780 the number of threads used to process tasks. The default value is 50.
784 To execute tasks in parallel, an ParallelRunner (PR) object, holding
785 a pool of threads (workers), is used.
786 For each available thread in the PR, the next task popped from
787 the scheduler queue is 'put' in the PR.
788 Upon receiving a task to execute, each PR worker (thread) invokes the
789 _execute method of the EC, passing the task as argument.
790 This method, calls task.callback inside a try/except block. If an
791 exception is raised by the tasks.callback, it will be trapped by the
792 try block, logged to standard error (usually the console), and the EC
793 state will be set to ECState.FAILED.
794 The invocation of _notify immediately after, forces the processing
795 loop in the _process method, to wake up if it was blocked waiting for new
796 tasks to arrived, and to check the EC state.
797 As the EC is in FAILED state, the processing loop exits and the
798 'finally' block is invoked. In the 'finally' block, the 'sync' method
799 of the PR is invoked, which forces the PR to raise any unchecked errors
800 that might have been raised by the workers.
806 while not self._stop:
810 task = self._scheduler.next()
813 # No task to execute. Wait for a new task to be scheduled.
816 # The task timestamp is in the future. Wait for timeout
817 # or until another task is scheduled.
819 if now < task.timestamp:
820 # Calculate timeout in seconds
821 timeout = tdiffsec(task.timestamp, now)
823 # Re-schedule task with the same timestamp
824 self._scheduler.schedule(task)
828 # Wait timeout or until a new task awakes the condition
829 self._cond.wait(timeout)
834 # Process tasks in parallel
835 self._runner.put(self._execute, task)
838 err = traceback.format_exc()
839 self.logger.error("Error while processing tasks in the EC: %s" % err)
841 # Set the EC to FAILED state
842 self._state = ECState.FAILED
844 # Set the FailureManager failure level to EC failure
845 self._fm.set_ec_failure()
847 self.logger.debug("Exiting the task processing loop ... ")
850 self._runner.destroy()
852 def _execute(self, task):
853 """ Executes a single task.
855 :param task: Object containing the callback to execute
860 If the invokation of the task callback raises an
861 exception, the processing thread of the ExperimentController
862 will be stopped and the experiment will be aborted.
866 task.status = TaskStatus.DONE
869 task.result = task.callback()
872 err = traceback.format_exc()
874 task.status = TaskStatus.ERROR
876 self.logger.error("Error occurred while executing task: %s" % err)
879 """ Awakes the processing thread in case it is blocked waiting
880 for a new task to be scheduled.