2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39 class FailureLevel(object):
40 """ Describes the system failure state """
45 class FailureManager(object):
46 """ The FailureManager is responsible for handling errors
47 and deciding whether an experiment should be aborted or not
51 def __init__(self, ec):
52 self._ec = weakref.ref(ec)
53 self._failure_level = FailureLevel.OK
58 """ Returns the ExperimentController associated to this FailureManager
68 def eval_failure(self, guid):
69 if self._failure_level == FailureLevel.OK:
70 rm = self.ec.get_resource(guid)
72 critical = rm.get("critical")
74 if state == ResourceState.FAILED and critical:
75 self._failure_level = FailureLevel.RM_FAILURE
77 self.ec.logger.debug("RM critical failure occurred on guid %d." \
78 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
80 def set_ec_failure(self):
81 self._failure_level = FailureLevel.EC_FAILURE
83 class ECState(object):
84 """ Possible states for an ExperimentController
91 class ExperimentController(object):
93 .. class:: Class Args :
95 :param exp_id: Human readable identifier for the experiment scenario.
100 An experiment, or scenario, is defined by a concrete set of resources,
101 behavior, configuration and interconnection of those resources.
102 The Experiment Description (ED) is a detailed representation of a
103 single experiment. It contains all the necessary information to
104 allow repeating the experiment. NEPI allows to describe
105 experiments by registering components (resources), configuring them
106 and interconnecting them.
108 A same experiment (scenario) can be executed many times, generating
109 different results. We call an experiment execution (instance) a 'run'.
111 The ExperimentController (EC), is the entity responsible of
112 managing an experiment run. The same scenario can be
113 recreated (and re-run) by instantiating an EC and recreating
114 the same experiment description.
116 In NEPI, an experiment is represented as a graph of interconnected
117 resources. A resource is a generic concept in the sense that any
118 component taking part of an experiment, whether physical of
119 virtual, is considered a resource. A resources could be a host,
120 a virtual machine, an application, a simulator, a IP address.
122 A ResourceManager (RM), is the entity responsible for managing a
123 single resource. ResourceManagers are specific to a resource
124 type (i.e. An RM to control a Linux application will not be
125 the same as the RM used to control a ns-3 simulation).
126 To support a new type of resource in NEPI, a new RM must be
127 implemented. NEPI already provides a variety of
128 RMs to control basic resources, and new can be extended from
131 Through the EC interface the user can create ResourceManagers (RMs),
132 configure them and interconnect them, to describe an experiment.
133 Describing an experiment through the EC does not run the experiment.
134 Only when the 'deploy()' method is invoked on the EC, the EC will take
135 actions to transform the 'described' experiment into a 'running' experiment.
137 While the experiment is running, it is possible to continue to
138 create/configure/connect RMs, and to deploy them to involve new
139 resources in the experiment (this is known as 'interactive' deployment).
141 An experiments in NEPI is identified by a string id,
142 which is either given by the user, or automatically generated by NEPI.
143 The purpose of this identifier is to separate files and results that
144 belong to different experiment scenarios.
145 However, since a same 'experiment' can be run many times, the experiment
146 id is not enough to identify an experiment instance (run).
147 For this reason, the ExperimentController has two identifier, the
148 exp_id, which can be re-used in different ExperimentController,
149 and the run_id, which is unique to one ExperimentController instance, and
150 is automatically generated by NEPI.
154 def __init__(self, exp_id = None):
155 super(ExperimentController, self).__init__()
158 self._logger = logging.getLogger("ExperimentController")
160 # Run identifier. It identifies a concrete execution instance (run)
162 # Since a same experiment (same configuration) can be executed many
163 # times, this run_id permits to separate result files generated on
164 # different experiment executions
165 self._run_id = tsformat()
167 # Experiment identifier. Usually assigned by the user
168 # Identifies the experiment scenario (i.e. configuration,
169 # resources used, etc)
170 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
172 # generator of globally unique ids
173 self._guid_generator = guid.GuidGenerator()
176 self._resources = dict()
178 # Scheduler. It a queue that holds tasks scheduled for
179 # execution, and yields the next task to be executed
180 # ordered by execution and arrival time
181 self._scheduler = HeapScheduler()
186 # RM groups (for deployment)
187 self._groups = dict()
189 # generator of globally unique id for groups
190 self._group_id_generator = guid.GuidGenerator()
192 # Flag to stop processing thread
195 # Entity in charge of managing system failures
196 self._fm = FailureManager(self)
199 self._state = ECState.RUNNING
201 # The runner is a pool of threads used to parallelize
203 nthreads = int(os.environ.get("NEPI_NTHREADS", "20"))
204 self._runner = ParallelRun(maxthreads = nthreads)
206 # Event processing thread
207 self._cond = threading.Condition()
208 self._thread = threading.Thread(target = self._process)
209 self._thread.setDaemon(True)
214 """ Returns the logger instance of the Experiment Controller
220 def failure_level(self):
221 """ Returns the level of FAILURE of th experiment
225 return self._fm._failure_level
229 """ Returns the state of the Experiment Controller
236 """ Returns the experiment id assigned by the user
243 """ Returns the experiment instance (run) identifier (automatically
251 """ Returns True if the experiment has failed and should be interrupted,
255 return self._fm.abort
257 def inform_failure(self, guid):
258 """ Reports a failure in a RM to the EC for evaluation
260 :param guid: Resource id
265 return self._fm.eval_failure(guid)
267 def wait_finished(self, guids):
268 """ Blocking method that waits until all RMs in the 'guids' list
269 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
270 RELEASED ), or until a failure in the experiment occurs
273 :param guids: List of guids
281 return self.wait(guids, state = ResourceState.STOPPED,
284 def wait_started(self, guids):
285 """ Blocking method that waits until all RMs in the 'guids' list
286 have reached a state >= STARTED, or until a failure in the
287 experiment occurs (i.e. abort == True)
289 :param guids: List of guids
297 return self.wait(guids, state = ResourceState.STARTED,
300 def wait_released(self, guids):
301 """ Blocking method that waits until all RMs in the 'guids' list
302 have reached a state == RELEASED, or until the EC fails
304 :param guids: List of guids
310 return self._state == ECState.FAILED
312 return self.wait(guids, state = ResourceState.RELEASED,
315 def wait_deployed(self, guids):
316 """ Blocking method that waits until all RMs in the 'guids' list
317 have reached a state >= READY, or until a failure in the
318 experiment occurs (i.e. abort == True)
320 :param guids: List of guids
328 return self.wait(guids, state = ResourceState.READY,
331 def wait(self, guids, state, quit):
332 """ Blocking method that waits until all RMs in the 'guids' list
333 have reached a state >= 'state', or until the 'quit' callback
336 :param guids: List of guids
340 if isinstance(guids, int):
343 # Make a copy to avoid modifying the original guids list
347 # If there are no more guids to wait for
348 # or the quit function returns True, exit the loop
349 if len(guids) == 0 or quit():
352 # If a guid reached one of the target states, remove it from list
354 rm = self.get_resource(guid)
358 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
359 rm.get_rtype(), guid, rstate, state))
362 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
363 guid, rstate, state))
369 def get_task(self, tid):
370 """ Returns a task by its id
372 :param tid: Id of the task
378 return self._tasks.get(tid)
380 def get_resource(self, guid):
381 """ Returns a registered ResourceManager by its guid
383 :param guid: Id of the task
386 :rtype: ResourceManager
389 rm = self._resources.get(guid)
392 def remove_resource(self, guid):
393 del self._resources[guid]
397 """ Returns the set() of guids of all the ResourceManager
399 :return: Set of all RM guids
403 keys = self._resources.keys()
407 def register_resource(self, rtype, guid = None):
408 """ Registers a new ResourceManager of type 'rtype' in the experiment
410 This method will assign a new 'guid' for the RM, if no guid
413 :param rtype: Type of the RM
416 :return: Guid of the RM
420 # Get next available guid
421 guid = self._guid_generator.next(guid)
424 rm = ResourceFactory.create(rtype, self, guid)
427 self._resources[guid] = rm
431 def get_attributes(self, guid):
432 """ Returns all the attributes of the RM with guid 'guid'
434 :param guid: Guid of the RM
437 :return: List of attributes
441 rm = self.get_resource(guid)
442 return rm.get_attributes()
444 def get_attribute(self, guid, name):
445 """ Returns the attribute 'name' of the RM with guid 'guid'
447 :param guid: Guid of the RM
450 :param name: Name of the attribute
453 :return: The attribute with name 'name'
457 rm = self.get_resource(guid)
458 return rm.get_attribute(name)
460 def register_connection(self, guid1, guid2):
461 """ Registers a connection between a RM with guid 'guid1'
462 and another RM with guid 'guid2'.
464 The order of the in which the two guids are provided is not
465 important, since the connection relationship is symmetric.
467 :param guid1: First guid to connect
468 :type guid1: ResourceManager
470 :param guid2: Second guid to connect
471 :type guid: ResourceManager
474 rm1 = self.get_resource(guid1)
475 rm2 = self.get_resource(guid2)
477 rm1.register_connection(guid2)
478 rm2.register_connection(guid1)
480 def register_condition(self, guids1, action, guids2, state,
482 """ Registers an action START, STOP or DEPLOY for all RM on list
483 guids1 to occur at time 'time' after all elements in list guids2
484 have reached state 'state'.
486 :param guids1: List of guids of RMs subjected to action
489 :param action: Action to perform (either START, STOP or DEPLOY)
490 :type action: ResourceAction
492 :param guids2: List of guids of RMs to we waited for
495 :param state: State to wait for on RMs of list guids2 (STARTED,
497 :type state: ResourceState
499 :param time: Time to wait after guids2 has reached status
503 if isinstance(guids1, int):
505 if isinstance(guids2, int):
509 rm = self.get_resource(guid1)
510 rm.register_condition(action, guids2, state, time)
512 def enable_trace(self, guid, name):
513 """ Enables a trace to be collected during the experiment run
515 :param name: Name of the trace
519 rm = self.get_resource(guid)
520 rm.enable_trace(name)
522 def trace_enabled(self, guid, name):
523 """ Returns True if the trace of name 'name' is enabled
525 :param name: Name of the trace
529 rm = self.get_resource(guid)
530 return rm.trace_enabled(name)
532 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
533 """ Returns information on a collected trace, the trace stream or
534 blocks (chunks) of the trace stream
536 :param name: Name of the trace
539 :param attr: Can be one of:
540 - TraceAttr.ALL (complete trace content),
541 - TraceAttr.STREAM (block in bytes to read starting
543 - TraceAttr.PATH (full path to the trace file),
544 - TraceAttr.SIZE (size of trace file).
547 :param block: Number of bytes to retrieve from trace, when attr is
551 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
557 rm = self.get_resource(guid)
558 return rm.trace(name, attr, block, offset)
560 def get_traces(self, guid):
561 """ Returns the list of the trace names of the RM with guid 'guid'
563 :param guid: Guid of the RM
566 :return: List of trace names
570 rm = self.get_resource(guid)
571 return rm.get_traces()
574 def discover(self, guid):
575 """ Discovers an available resource matching the criteria defined
576 by the RM with guid 'guid', and associates that resource to the RM
578 Not all RM types require (or are capable of) performing resource
579 discovery. For the RM types which are not capable of doing so,
580 invoking this method does not have any consequences.
582 :param guid: Guid of the RM
586 rm = self.get_resource(guid)
589 def provision(self, guid):
590 """ Provisions the resource associated to the RM with guid 'guid'.
592 Provisioning means making a resource 'accessible' to the user.
593 Not all RM types require (or are capable of) performing resource
594 provisioning. For the RM types which are not capable of doing so,
595 invoking this method does not have any consequences.
597 :param guid: Guid of the RM
601 rm = self.get_resource(guid)
602 return rm.provision()
604 def get(self, guid, name):
605 """ Returns the value of the attribute with name 'name' on the
608 :param guid: Guid of the RM
611 :param name: Name of the attribute
614 :return: The value of the attribute with name 'name'
617 rm = self.get_resource(guid)
620 def set(self, guid, name, value):
621 """ Modifies the value of the attribute with name 'name' on the
624 :param guid: Guid of the RM
627 :param name: Name of the attribute
630 :param value: Value of the attribute
633 rm = self.get_resource(guid)
636 def get_global(self, rtype, name):
637 """ Returns the value of the global attribute with name 'name' on the
638 RMs of rtype 'rtype'.
640 :param guid: Guid of the RM
643 :param name: Name of the attribute
646 :return: The value of the attribute with name 'name'
649 rclass = ResourceFactory.get_resource_type(rtype)
650 return rclass.get_global(name)
652 def set_global(self, rtype, name, value):
653 """ Modifies the value of the global attribute with name 'name' on the
654 RMs of with rtype 'rtype'.
656 :param guid: Guid of the RM
659 :param name: Name of the attribute
662 :param value: Value of the attribute
665 rclass = ResourceFactory.get_resource_type(rtype)
666 return rclass.set_global(name, value)
668 def state(self, guid, hr = False):
669 """ Returns the state of a resource
671 :param guid: Resource guid
674 :param hr: Human readable. Forces return of a
675 status string instead of a number
679 rm = self.get_resource(guid)
683 return ResourceState2str.get(state)
687 def stop(self, guid):
688 """ Stops the RM with guid 'guid'
690 Stopping a RM means that the resource it controls will
691 no longer take part of the experiment.
693 :param guid: Guid of the RM
697 rm = self.get_resource(guid)
700 def start(self, guid):
701 """ Starts the RM with guid 'guid'
703 Starting a RM means that the resource it controls will
704 begin taking part of the experiment.
706 :param guid: Guid of the RM
710 rm = self.get_resource(guid)
713 def get_start_time(self, guid):
714 """ Returns the start time of the RM as a timestamp """
715 rm = self.get_resource(guid)
718 def get_stop_time(self, guid):
719 """ Returns the stop time of the RM as a timestamp """
720 rm = self.get_resource(guid)
723 def get_discover_time(self, guid):
724 """ Returns the discover time of the RM as a timestamp """
725 rm = self.get_resource(guid)
726 return rm.discover_time
728 def get_provision_time(self, guid):
729 """ Returns the provision time of the RM as a timestamp """
730 rm = self.get_resource(guid)
731 return rm.provision_time
733 def get_ready_time(self, guid):
734 """ Returns the deployment time of the RM as a timestamp """
735 rm = self.get_resource(guid)
738 def get_release_time(self, guid):
739 """ Returns the release time of the RM as a timestamp """
740 rm = self.get_resource(guid)
741 return rm.release_time
743 def get_failed_time(self, guid):
744 """ Returns the time failure occured for the RM as a timestamp """
745 rm = self.get_resource(guid)
746 return rm.failed_time
748 def set_with_conditions(self, name, value, guids1, guids2, state,
750 """ Modifies the value of attribute with name 'name' on all RMs
751 on the guids1 list when time 'time' has elapsed since all
752 elements in guids2 list have reached state 'state'.
754 :param name: Name of attribute to set in RM
757 :param value: Value of attribute to set in RM
760 :param guids1: List of guids of RMs subjected to action
763 :param action: Action to register (either START or STOP)
764 :type action: ResourceAction
766 :param guids2: List of guids of RMs to we waited for
769 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
770 :type state: ResourceState
772 :param time: Time to wait after guids2 has reached status
776 if isinstance(guids1, int):
778 if isinstance(guids2, int):
782 rm = self.get_resource(guid)
783 rm.set_with_conditions(name, value, guids2, state, time)
785 def deploy(self, guids = None, wait_all_ready = True, group = None):
786 """ Deploys all ResourceManagers in the guids list.
788 If the argument 'guids' is not given, all RMs with state NEW
791 :param guids: List of guids of RMs to deploy
794 :param wait_all_ready: Wait until all RMs are ready in
795 order to start the RMs
798 :param group: Id of deployment group in which to deploy RMs
802 self.logger.debug(" ------- DEPLOY START ------ ")
805 # If no guids list was passed, all 'NEW' RMs will be deployed
807 for guid, rm in self._resources.iteritems():
808 if rm.state == ResourceState.NEW:
811 if isinstance(guids, int):
814 # Create deployment group
815 # New guids can be added to a same deployment group later on
819 group = self._group_id_generator.next()
821 if group not in self._groups:
822 self._groups[group] = []
824 self._groups[group].extend(guids)
826 def wait_all_and_start(group):
827 # Function that checks if all resources are READY
828 # before scheduling a start_with_conditions for each RM
831 # Get all guids in group
832 guids = self._groups[group]
835 if self.state(guid) < ResourceState.READY:
840 callback = functools.partial(wait_all_and_start, group)
841 self.schedule("1s", callback)
843 # If all resources are ready, we schedule the start
845 rm = self.get_resource(guid)
846 self.schedule("0s", rm.start_with_conditions)
848 if rm.conditions.get(ResourceAction.STOP):
849 # Only if the RM has STOP conditions we
850 # schedule a stop. Otherwise the RM will stop immediately
851 self.schedule("0s", rm.stop_with_conditions)
853 if wait_all_ready and new_group:
854 # Schedule a function to check that all resources are
855 # READY, and only then schedule the start.
856 # This aims at reducing the number of tasks looping in the
858 # Instead of having many start tasks, we will have only one for
860 callback = functools.partial(wait_all_and_start, group)
861 self.schedule("0s", callback)
864 rm = self.get_resource(guid)
865 rm.deployment_group = group
866 self.schedule("0s", rm.deploy_with_conditions)
868 if not wait_all_ready:
869 self.schedule("0s", rm.start_with_conditions)
871 if rm.conditions.get(ResourceAction.STOP):
872 # Only if the RM has STOP conditions we
873 # schedule a stop. Otherwise the RM will stop immediately
874 self.schedule("0s", rm.stop_with_conditions)
876 def release(self, guids = None):
877 """ Releases all ResourceManagers in the guids list.
879 If the argument 'guids' is not given, all RMs registered
880 in the experiment are released.
882 :param guids: List of RM guids
886 if isinstance(guids, int):
890 guids = self.resources
893 rm = self.get_resource(guid)
894 self.schedule("0s", rm.release)
896 self.wait_released(guids)
899 if self.get(guid, "hardRelease"):
900 self.remove_resource(guid)
903 """ Releases all resources and stops the ExperimentController
906 # If there was a major failure we can't exit gracefully
907 if self._state == ECState.FAILED:
908 raise RuntimeError("EC failure. Can not exit gracefully")
910 # Remove all pending tasks from the scheduler queue
911 for tid in list(self._scheduler.pending):
912 self._scheduler.remove(tid)
914 # Remove pending tasks from the workers queue
919 # Mark the EC state as TERMINATED
920 self._state = ECState.TERMINATED
922 # Stop processing thread
925 # Notify condition to wake up the processing thread
928 if self._thread.is_alive():
931 def schedule(self, date, callback, track = False):
932 """ Schedules a callback to be executed at time 'date'.
934 :param date: string containing execution time for the task.
935 It can be expressed as an absolute time, using
936 timestamp format, or as a relative time matching
937 ^\d+.\d+(h|m|s|ms|us)$
939 :param callback: code to be executed for the task. Must be a
940 Python function, and receives args and kwargs
943 :param track: if set to True, the task will be retrievable with
944 the get_task() method
946 :return : The Id of the task
950 timestamp = stabsformat(date)
951 task = Task(timestamp, callback)
952 task = self._scheduler.schedule(task)
955 self._tasks[task.id] = task
957 # Notify condition to wake up the processing thread
963 """ Process scheduled tasks.
967 Tasks are scheduled by invoking the schedule method with a target
968 callback and an execution time.
969 The schedule method creates a new Task object with that callback
970 and execution time, and pushes it into the '_scheduler' queue.
971 The execution time and the order of arrival of tasks are used
972 to order the tasks in the queue.
974 The _process method is executed in an independent thread held by
975 the ExperimentController for as long as the experiment is running.
976 This method takes tasks from the '_scheduler' queue in a loop
977 and processes them in parallel using multithreading.
978 The environmental variable NEPI_NTHREADS can be used to control
979 the number of threads used to process tasks. The default value is
982 To execute tasks in parallel, a ParallelRunner (PR) object is used.
983 This object keeps a pool of threads (workers), and a queue of tasks
984 scheduled for 'immediate' execution.
986 On each iteration, the '_process' loop will take the next task that
987 is scheduled for 'future' execution from the '_scheduler' queue,
988 and if the execution time of that task is >= to the current time,
989 it will push that task into the PR for 'immediate execution'.
990 As soon as a worker is free, the PR will assign the next task to
993 Upon receiving a task to execute, each PR worker (thread) will
994 invoke the _execute method of the EC, passing the task as
996 The _execute method will then invoke task.callback inside a
997 try/except block. If an exception is raised by the tasks.callback,
998 it will be trapped by the try block, logged to standard error
999 (usually the console), and the task will be marked as failed.
1003 self._runner.start()
1005 while not self._stop:
1007 self._cond.acquire()
1009 task = self._scheduler.next()
1012 # No task to execute. Wait for a new task to be scheduled.
1015 # The task timestamp is in the future. Wait for timeout
1016 # or until another task is scheduled.
1018 if now < task.timestamp:
1019 # Calculate timeout in seconds
1020 timeout = tdiffsec(task.timestamp, now)
1022 # Re-schedule task with the same timestamp
1023 self._scheduler.schedule(task)
1027 # Wait timeout or until a new task awakes the condition
1028 self._cond.wait(timeout)
1030 self._cond.release()
1033 # Process tasks in parallel
1034 self._runner.put(self._execute, task)
1037 err = traceback.format_exc()
1038 self.logger.error("Error while processing tasks in the EC: %s" % err)
1040 # Set the EC to FAILED state
1041 self._state = ECState.FAILED
1043 # Set the FailureManager failure level to EC failure
1044 self._fm.set_ec_failure()
1046 self.logger.debug("Exiting the task processing loop ... ")
1049 self._runner.destroy()
1051 def _execute(self, task):
1052 """ Executes a single task.
1054 :param task: Object containing the callback to execute
1060 task.result = task.callback()
1061 task.status = TaskStatus.DONE
1064 err = traceback.format_exc()
1066 task.status = TaskStatus.ERROR
1068 self.logger.error("Error occurred while executing task: %s" % err)
1071 """ Awakes the processing thread if it is blocked waiting
1072 for new tasks to arrive
1075 self._cond.acquire()
1077 self._cond.release()