2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39 class FailureLevel(object):
40 """ Describes the system failure state """
45 class FailureManager(object):
46 """ The FailureManager is responsible for handling errors
47 and deciding whether an experiment should be aborted or not
51 def __init__(self, ec):
52 self._ec = weakref.ref(ec)
53 self._failure_level = FailureLevel.OK
57 """ Returns the ExperimentController associated to this FailureManager
65 if self._failure_level == FailureLevel.OK:
66 for guid in self.ec.resources:
67 state = self.ec.state(guid)
68 critical = self.ec.get(guid, "critical")
69 if state == ResourceState.FAILED and critical:
70 self._failure_level = FailureLevel.RM_FAILURE
71 self.ec.logger.debug("RM critical failure occurred on guid %d." \
72 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
75 return self._failure_level != FailureLevel.OK
77 def set_ec_failure(self):
78 self._failure_level = FailureLevel.EC_FAILURE
81 class ECState(object):
82 """ Possible states for an ExperimentController
89 class ExperimentController(object):
91 .. class:: Class Args :
93 :param exp_id: Human readable identifier for the experiment scenario.
98 An experiment, or scenario, is defined by a concrete set of resources,
99 behavior, configuration and interconnection of those resources.
100 The Experiment Description (ED) is a detailed representation of a
101 single experiment. It contains all the necessary information to
102 allow repeating the experiment. NEPI allows to describe
103 experiments by registering components (resources), configuring them
104 and interconnecting them.
106 A same experiment (scenario) can be executed many times, generating
107 different results. We call an experiment execution (instance) a 'run'.
109 The ExperimentController (EC), is the entity responsible of
110 managing an experiment run. The same scenario can be
111 recreated (and re-run) by instantiating an EC and recreating
112 the same experiment description.
114 In NEPI, an experiment is represented as a graph of interconnected
115 resources. A resource is a generic concept in the sense that any
116 component taking part of an experiment, whether physical of
117 virtual, is considered a resource. A resources could be a host,
118 a virtual machine, an application, a simulator, a IP address.
120 A ResourceManager (RM), is the entity responsible for managing a
121 single resource. ResourceManagers are specific to a resource
122 type (i.e. An RM to control a Linux application will not be
123 the same as the RM used to control a ns-3 simulation).
124 To support a new type of resource in NEPI, a new RM must be
125 implemented. NEPI already provides a variety of
126 RMs to control basic resources, and new can be extended from
129 Through the EC interface the user can create ResourceManagers (RMs),
130 configure them and interconnect them, to describe an experiment.
131 Describing an experiment through the EC does not run the experiment.
132 Only when the 'deploy()' method is invoked on the EC, the EC will take
133 actions to transform the 'described' experiment into a 'running' experiment.
135 While the experiment is running, it is possible to continue to
136 create/configure/connect RMs, and to deploy them to involve new
137 resources in the experiment (this is known as 'interactive' deployment).
139 An experiments in NEPI is identified by a string id,
140 which is either given by the user, or automatically generated by NEPI.
141 The purpose of this identifier is to separate files and results that
142 belong to different experiment scenarios.
143 However, since a same 'experiment' can be run many times, the experiment
144 id is not enough to identify an experiment instance (run).
145 For this reason, the ExperimentController has two identifier, the
146 exp_id, which can be re-used in different ExperimentController,
147 and the run_id, which is unique to one ExperimentController instance, and
148 is automatically generated by NEPI.
152 def __init__(self, exp_id = None):
153 super(ExperimentController, self).__init__()
156 self._logger = logging.getLogger("ExperimentController")
158 # Run identifier. It identifies a concrete execution instance (run)
160 # Since a same experiment (same configuration) can be executed many
161 # times, this run_id permits to separate result files generated on
162 # different experiment executions
163 self._run_id = tsformat()
165 # Experiment identifier. Usually assigned by the user
166 # Identifies the experiment scenario (i.e. configuration,
167 # resources used, etc)
168 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
170 # generator of globally unique ids
171 self._guid_generator = guid.GuidGenerator()
174 self._resources = dict()
176 # Scheduler. It a queue that holds tasks scheduled for
177 # execution, and yields the next task to be executed
178 # ordered by execution and arrival time
179 self._scheduler = HeapScheduler()
184 # RM groups (for deployment)
185 self._groups = dict()
187 # generator of globally unique id for groups
188 self._group_id_generator = guid.GuidGenerator()
190 # Flag to stop processing thread
193 # Entity in charge of managing system failures
194 self._fm = FailureManager(self)
197 self._state = ECState.RUNNING
199 # The runner is a pool of threads used to parallelize
201 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
202 self._runner = ParallelRun(maxthreads = nthreads)
204 # Event processing thread
205 self._cond = threading.Condition()
206 self._thread = threading.Thread(target = self._process)
207 self._thread.setDaemon(True)
212 """ Returns the logger instance of the Experiment Controller
219 """ Returns the state of the Experiment Controller
226 """ Returns the experiment id assigned by the user
233 """ Returns the experiment instance (run) identifier (automatically
241 """ Returns True if the experiment has failed and should be interrupted,
245 return self._fm.abort
247 def wait_finished(self, guids):
248 """ Blocking method that waits until all RMs in the 'guids' list
249 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
250 RELEASED ), or until a failure in the experiment occurs
253 :param guids: List of guids
261 return self.wait(guids, state = ResourceState.STOPPED,
264 def wait_started(self, guids):
265 """ Blocking method that waits until all RMs in the 'guids' list
266 have reached a state >= STARTED, or until a failure in the
267 experiment occurs (i.e. abort == True)
269 :param guids: List of guids
277 return self.wait(guids, state = ResourceState.STARTED,
280 def wait_released(self, guids):
281 """ Blocking method that waits until all RMs in the 'guids' list
282 have reached a state == RELEASED, or until the EC fails
284 :param guids: List of guids
290 return self._state == ECState.FAILED
292 return self.wait(guids, state = ResourceState.RELEASED,
295 def wait_deployed(self, guids):
296 """ Blocking method that waits until all RMs in the 'guids' list
297 have reached a state >= READY, or until a failure in the
298 experiment occurs (i.e. abort == True)
300 :param guids: List of guids
308 return self.wait(guids, state = ResourceState.READY,
311 def wait(self, guids, state, quit):
312 """ Blocking method that waits until all RMs in the 'guids' list
313 have reached a state >= 'state', or until the 'quit' callback
316 :param guids: List of guids
321 if isinstance(guids, int):
324 # Make a copy to avoid modifying the original guids list
328 # If there are no more guids to wait for
329 # or the quit function returns True, exit the loop
330 if len(guids) == 0 or quit():
333 # If a guid reached one of the target states, remove it from list
335 rstate = self.state(guid)
337 hrrstate = ResourceState2str.get(rstate)
338 hrstate = ResourceState2str.get(state)
342 rm = self.get_resource(guid)
343 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
344 rm.get_rtype(), guid, hrrstate, hrstate))
347 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
348 guid, hrrstate, hrstate))
351 def get_task(self, tid):
352 """ Returns a task by its id
354 :param tid: Id of the task
360 return self._tasks.get(tid)
362 def get_resource(self, guid):
363 """ Returns a registered ResourceManager by its guid
365 :param guid: Id of the task
368 :rtype: ResourceManager
371 return self._resources.get(guid)
375 """ Returns the set() of guids of all the ResourceManager
377 :return: Set of all RM guids
381 return self._resources.keys()
383 def register_resource(self, rtype, guid = None):
384 """ Registers a new ResourceManager of type 'rtype' in the experiment
386 This method will assign a new 'guid' for the RM, if no guid
389 :param rtype: Type of the RM
392 :return: Guid of the RM
396 # Get next available guid
397 guid = self._guid_generator.next(guid)
400 rm = ResourceFactory.create(rtype, self, guid)
403 self._resources[guid] = rm
407 def get_attributes(self, guid):
408 """ Returns all the attributes of the RM with guid 'guid'
410 :param guid: Guid of the RM
413 :return: List of attributes
417 rm = self.get_resource(guid)
418 return rm.get_attributes()
420 def get_attribute(self, guid, name):
421 """ Returns the attribute 'name' of the RM with guid 'guid'
423 :param guid: Guid of the RM
426 :param name: Name of the attribute
429 :return: The attribute with name 'name'
433 rm = self.get_resource(guid)
434 return rm.get_attribute(name)
436 def register_connection(self, guid1, guid2):
437 """ Registers a connection between a RM with guid 'guid1'
438 and another RM with guid 'guid2'.
440 The order of the in which the two guids are provided is not
441 important, since the connection relationship is symmetric.
443 :param guid1: First guid to connect
444 :type guid1: ResourceManager
446 :param guid2: Second guid to connect
447 :type guid: ResourceManager
450 rm1 = self.get_resource(guid1)
451 rm2 = self.get_resource(guid2)
453 rm1.register_connection(guid2)
454 rm2.register_connection(guid1)
456 def register_condition(self, guids1, action, guids2, state,
458 """ Registers an action START, STOP or DEPLOY for all RM on list
459 guids1 to occur at time 'time' after all elements in list guids2
460 have reached state 'state'.
462 :param guids1: List of guids of RMs subjected to action
465 :param action: Action to perform (either START, STOP or DEPLOY)
466 :type action: ResourceAction
468 :param guids2: List of guids of RMs to we waited for
471 :param state: State to wait for on RMs of list guids2 (STARTED,
473 :type state: ResourceState
475 :param time: Time to wait after guids2 has reached status
479 if isinstance(guids1, int):
481 if isinstance(guids2, int):
485 rm = self.get_resource(guid1)
486 rm.register_condition(action, guids2, state, time)
488 def enable_trace(self, guid, name):
489 """ Enables a trace to be collected during the experiment run
491 :param name: Name of the trace
495 rm = self.get_resource(guid)
496 rm.enable_trace(name)
498 def trace_enabled(self, guid, name):
499 """ Returns True if the trace of name 'name' is enabled
501 :param name: Name of the trace
505 rm = self.get_resource(guid)
506 return rm.trace_enabled(name)
508 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
509 """ Returns information on a collected trace, the trace stream or
510 blocks (chunks) of the trace stream
512 :param name: Name of the trace
515 :param attr: Can be one of:
516 - TraceAttr.ALL (complete trace content),
517 - TraceAttr.STREAM (block in bytes to read starting
519 - TraceAttr.PATH (full path to the trace file),
520 - TraceAttr.SIZE (size of trace file).
523 :param block: Number of bytes to retrieve from trace, when attr is
527 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
533 rm = self.get_resource(guid)
534 return rm.trace(name, attr, block, offset)
536 def get_traces(self, guid):
537 """ Returns the list of the trace names of the RM with guid 'guid'
539 :param guid: Guid of the RM
542 :return: List of trace names
546 rm = self.get_resource(guid)
547 return rm.get_traces()
550 def discover(self, guid):
551 """ Discovers an available resource matching the criteria defined
552 by the RM with guid 'guid', and associates that resource to the RM
554 Not all RM types require (or are capable of) performing resource
555 discovery. For the RM types which are not capable of doing so,
556 invoking this method does not have any consequences.
558 :param guid: Guid of the RM
562 rm = self.get_resource(guid)
565 def provision(self, guid):
566 """ Provisions the resource associated to the RM with guid 'guid'.
568 Provisioning means making a resource 'accessible' to the user.
569 Not all RM types require (or are capable of) performing resource
570 provisioning. For the RM types which are not capable of doing so,
571 invoking this method does not have any consequences.
573 :param guid: Guid of the RM
577 rm = self.get_resource(guid)
578 return rm.provision()
580 def get(self, guid, name):
581 """ Returns the value of the attribute with name 'name' on the
584 :param guid: Guid of the RM
587 :param name: Name of the attribute
590 :return: The value of the attribute with name 'name'
593 rm = self.get_resource(guid)
596 def set(self, guid, name, value):
597 """ Modifies the value of the attribute with name 'name' on the
600 :param guid: Guid of the RM
603 :param name: Name of the attribute
606 :param value: Value of the attribute
609 rm = self.get_resource(guid)
610 return rm.set(name, value)
612 def state(self, guid, hr = False):
613 """ Returns the state of a resource
615 :param guid: Resource guid
618 :param hr: Human readable. Forces return of a
619 status string instead of a number
623 rm = self.get_resource(guid)
627 return ResourceState2str.get(state)
631 def stop(self, guid):
632 """ Stops the RM with guid 'guid'
634 Stopping a RM means that the resource it controls will
635 no longer take part of the experiment.
637 :param guid: Guid of the RM
641 rm = self.get_resource(guid)
644 def start(self, guid):
645 """ Starts the RM with guid 'guid'
647 Starting a RM means that the resource it controls will
648 begin taking part of the experiment.
650 :param guid: Guid of the RM
654 rm = self.get_resource(guid)
657 def set_with_conditions(self, name, value, guids1, guids2, state,
659 """ Modifies the value of attribute with name 'name' on all RMs
660 on the guids1 list when time 'time' has elapsed since all
661 elements in guids2 list have reached state 'state'.
663 :param name: Name of attribute to set in RM
666 :param value: Value of attribute to set in RM
669 :param guids1: List of guids of RMs subjected to action
672 :param action: Action to register (either START or STOP)
673 :type action: ResourceAction
675 :param guids2: List of guids of RMs to we waited for
678 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
679 :type state: ResourceState
681 :param time: Time to wait after guids2 has reached status
685 if isinstance(guids1, int):
687 if isinstance(guids2, int):
691 rm = self.get_resource(guid)
692 rm.set_with_conditions(name, value, guids2, state, time)
694 def deploy(self, guids = None, wait_all_ready = True, group = None):
695 """ Deploys all ResourceManagers in the guids list.
697 If the argument 'guids' is not given, all RMs with state NEW
700 :param guids: List of guids of RMs to deploy
703 :param wait_all_ready: Wait until all RMs are ready in
704 order to start the RMs
707 :param group: Id of deployment group in which to deploy RMs
711 self.logger.debug(" ------- DEPLOY START ------ ")
714 # If no guids list was passed, all 'NEW' RMs will be deployed
716 for guid in self.resources:
717 if self.state(guid) == ResourceState.NEW:
720 if isinstance(guids, int):
723 # Create deployment group
724 # New guids can be added to a same deployment group later on
728 group = self._group_id_generator.next()
730 if group not in self._groups:
731 self._groups[group] = []
733 self._groups[group].extend(guids)
735 def wait_all_and_start(group):
736 # Function that checks if all resources are READY
737 # before scheduling a start_with_conditions for each RM
740 # Get all guids in group
741 guids = self._groups[group]
744 if self.state(guid) < ResourceState.READY:
749 callback = functools.partial(wait_all_and_start, group)
750 self.schedule("1s", callback)
752 # If all resources are ready, we schedule the start
754 rm = self.get_resource(guid)
755 self.schedule("0s", rm.start_with_conditions)
757 if wait_all_ready and new_group:
758 # Schedule a function to check that all resources are
759 # READY, and only then schedule the start.
760 # This aims at reducing the number of tasks looping in the
762 # Instead of having many start tasks, we will have only one for
764 callback = functools.partial(wait_all_and_start, group)
765 self.schedule("0s", callback)
768 rm = self.get_resource(guid)
769 rm.deployment_group = group
770 self.schedule("0s", rm.deploy_with_conditions)
772 if not wait_all_ready:
773 self.schedule("0s", rm.start_with_conditions)
775 if rm.conditions.get(ResourceAction.STOP):
776 # Only if the RM has STOP conditions we
777 # schedule a stop. Otherwise the RM will stop immediately
778 self.schedule("0s", rm.stop_with_conditions)
780 def release(self, guids = None):
781 """ Releases all ResourceManagers in the guids list.
783 If the argument 'guids' is not given, all RMs registered
784 in the experiment are released.
786 :param guids: List of RM guids
791 guids = self.resources
793 # Remove all pending tasks from the scheduler queue
794 for tid in list(self._scheduler.pending):
795 self._scheduler.remove(tid)
800 rm = self.get_resource(guid)
801 self.schedule("0s", rm.release)
803 self.wait_released(guids)
806 """ Releases all resources and stops the ExperimentController
809 # If there was a major failure we can't exit gracefully
810 if self._state == ECState.FAILED:
811 raise RuntimeError("EC failure. Can not exit gracefully")
815 # Mark the EC state as TERMINATED
816 self._state = ECState.TERMINATED
818 # Stop processing thread
821 # Notify condition to wake up the processing thread
824 if self._thread.is_alive():
827 def schedule(self, date, callback, track = False):
828 """ Schedules a callback to be executed at time 'date'.
830 :param date: string containing execution time for the task.
831 It can be expressed as an absolute time, using
832 timestamp format, or as a relative time matching
833 ^\d+.\d+(h|m|s|ms|us)$
835 :param callback: code to be executed for the task. Must be a
836 Python function, and receives args and kwargs
839 :param track: if set to True, the task will be retrievable with
840 the get_task() method
842 :return : The Id of the task
846 timestamp = stabsformat(date)
847 task = Task(timestamp, callback)
848 task = self._scheduler.schedule(task)
851 self._tasks[task.id] = task
853 # Notify condition to wake up the processing thread
859 """ Process scheduled tasks.
863 Tasks are scheduled by invoking the schedule method with a target
864 callback and an execution time.
865 The schedule method creates a new Task object with that callback
866 and execution time, and pushes it into the '_scheduler' queue.
867 The execution time and the order of arrival of tasks are used
868 to order the tasks in the queue.
870 The _process method is executed in an independent thread held by
871 the ExperimentController for as long as the experiment is running.
872 This method takes tasks from the '_scheduler' queue in a loop
873 and processes them in parallel using multithreading.
874 The environmental variable NEPI_NTHREADS can be used to control
875 the number of threads used to process tasks. The default value is
878 To execute tasks in parallel, a ParallelRunner (PR) object is used.
879 This object keeps a pool of threads (workers), and a queue of tasks
880 scheduled for 'immediate' execution.
882 On each iteration, the '_process' loop will take the next task that
883 is scheduled for 'future' execution from the '_scheduler' queue,
884 and if the execution time of that task is >= to the current time,
885 it will push that task into the PR for 'immediate execution'.
886 As soon as a worker is free, the PR will assign the next task to
889 Upon receiving a task to execute, each PR worker (thread) will
890 invoke the _execute method of the EC, passing the task as
892 The _execute method will then invoke task.callback inside a
893 try/except block. If an exception is raised by the tasks.callback,
894 it will be trapped by the try block, logged to standard error
895 (usually the console), and the task will be marked as failed.
901 while not self._stop:
905 task = self._scheduler.next()
908 # No task to execute. Wait for a new task to be scheduled.
911 # The task timestamp is in the future. Wait for timeout
912 # or until another task is scheduled.
914 if now < task.timestamp:
915 # Calculate timeout in seconds
916 timeout = tdiffsec(task.timestamp, now)
918 # Re-schedule task with the same timestamp
919 self._scheduler.schedule(task)
923 # Wait timeout or until a new task awakes the condition
924 self._cond.wait(timeout)
929 # Process tasks in parallel
930 self._runner.put(self._execute, task)
933 err = traceback.format_exc()
934 self.logger.error("Error while processing tasks in the EC: %s" % err)
936 # Set the EC to FAILED state
937 self._state = ECState.FAILED
939 # Set the FailureManager failure level to EC failure
940 self._fm.set_ec_failure()
942 self.logger.debug("Exiting the task processing loop ... ")
945 self._runner.destroy()
947 def _execute(self, task):
948 """ Executes a single task.
950 :param task: Object containing the callback to execute
955 task.status = TaskStatus.DONE
958 task.result = task.callback()
961 err = traceback.format_exc()
963 task.status = TaskStatus.ERROR
965 self.logger.error("Error occurred while executing task: %s" % err)
968 """ Awakes the processing thread if it is blocked waiting
969 for new tasks to arrive