2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
40 class FailureLevel(object):
41 """ Describes the system failure state """
46 class FailureManager(object):
47 """ The FailureManager is responsible for handling errors
48 and deciding whether an experiment should be aborted or not
52 def __init__(self, ec):
53 self._ec = weakref.ref(ec)
54 self._failure_level = FailureLevel.OK
58 """ Returns the ExperimentController associated to this FailureManager
66 if self._failure_level == FailureLevel.OK:
67 for guid in self.ec.resources:
68 state = self.ec.state(guid)
69 critical = self.ec.get(guid, "critical")
70 if state == ResourceState.FAILED and critical:
71 self._failure_level = FailureLevel.RM_FAILURE
72 self.ec.logger.debug("RM critical failure occurred on guid %d." \
73 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
76 return self._failure_level != FailureLevel.OK
78 def set_ec_failure(self):
79 self._failure_level = FailureLevel.EC_FAILURE
82 class ECState(object):
83 """ Possible states for an ExperimentController
90 class ExperimentController(object):
92 .. class:: Class Args :
94 :param exp_id: Human readable identifier for the experiment scenario.
99 An experiment, or scenario, is defined by a concrete set of resources,
100 behavior, configuration and interconnection of those resources.
101 The Experiment Description (ED) is a detailed representation of a
102 single experiment. It contains all the necessary information to
103 allow repeating the experiment. NEPI allows to describe
104 experiments by registering components (resources), configuring them
105 and interconnecting them.
107 A same experiment (scenario) can be executed many times, generating
108 different results. We call an experiment execution (instance) a 'run'.
110 The ExperimentController (EC), is the entity responsible of
111 managing an experiment run. The same scenario can be
112 recreated (and re-run) by instantiating an EC and recreating
113 the same experiment description.
115 In NEPI, an experiment is represented as a graph of interconnected
116 resources. A resource is a generic concept in the sense that any
117 component taking part of an experiment, whether physical of
118 virtual, is considered a resource. A resources could be a host,
119 a virtual machine, an application, a simulator, a IP address.
121 A ResourceManager (RM), is the entity responsible for managing a
122 single resource. ResourceManagers are specific to a resource
123 type (i.e. An RM to control a Linux application will not be
124 the same as the RM used to control a ns-3 simulation).
125 To support a new type of resource in NEPI, a new RM must be
126 implemented. NEPI already provides a variety of
127 RMs to control basic resources, and new can be extended from
130 Through the EC interface the user can create ResourceManagers (RMs),
131 configure them and interconnect them, to describe an experiment.
132 Describing an experiment through the EC does not run the experiment.
133 Only when the 'deploy()' method is invoked on the EC, the EC will take
134 actions to transform the 'described' experiment into a 'running' experiment.
136 While the experiment is running, it is possible to continue to
137 create/configure/connect RMs, and to deploy them to involve new
138 resources in the experiment (this is known as 'interactive' deployment).
140 An experiments in NEPI is identified by a string id,
141 which is either given by the user, or automatically generated by NEPI.
142 The purpose of this identifier is to separate files and results that
143 belong to different experiment scenarios.
144 However, since a same 'experiment' can be run many times, the experiment
145 id is not enough to identify an experiment instance (run).
146 For this reason, the ExperimentController has two identifier, the
147 exp_id, which can be re-used in different ExperimentController,
148 and the run_id, which is unique to one ExperimentController instance, and
149 is automatically generated by NEPI.
153 def __init__(self, exp_id = None):
154 super(ExperimentController, self).__init__()
157 self._logger = logging.getLogger("ExperimentController")
159 # Run identifier. It identifies a concrete execution instance (run)
161 # Since a same experiment (same configuration) can be executed many
162 # times, this run_id permits to separate result files generated on
163 # different experiment executions
164 self._run_id = tsformat()
166 # Experiment identifier. Usually assigned by the user
167 # Identifies the experiment scenario (i.e. configuration,
168 # resources used, etc)
169 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
171 # generator of globally unique ids
172 self._guid_generator = guid.GuidGenerator()
175 self._resources = dict()
177 # Scheduler. It a queue that holds tasks scheduled for
178 # execution, and yields the next task to be executed
179 # ordered by execution and arrival time
180 self._scheduler = HeapScheduler()
185 # RM groups (for deployment)
186 self._groups = dict()
188 # generator of globally unique id for groups
189 self._group_id_generator = guid.GuidGenerator()
191 # Flag to stop processing thread
194 # Entity in charge of managing system failures
195 self._fm = FailureManager(self)
198 self._state = ECState.RUNNING
200 # The runner is a pool of threads used to parallelize
202 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
203 self._runner = ParallelRun(maxthreads = nthreads)
205 # Event processing thread
206 self._cond = threading.Condition()
207 self._thread = threading.Thread(target = self._process)
208 self._thread.setDaemon(True)
213 """ Returns the logger instance of the Experiment Controller
220 """ Returns the state of the Experiment Controller
227 """ Returns the experiment id assigned by the user
234 """ Returns the experiment instance (run) identifier (automatically
242 """ Returns True if the experiment has failed and should be interrupted,
246 return self._fm.abort
248 def wait_finished(self, guids):
249 """ Blocking method that waits until all RMs in the 'guids' list
250 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
251 RELEASED ), or until a failure in the experiment occurs
254 :param guids: List of guids
262 return self.wait(guids, state = ResourceState.STOPPED,
265 def wait_started(self, guids):
266 """ Blocking method that waits until all RMs in the 'guids' list
267 have reached a state >= STARTED, or until a failure in the
268 experiment occurs (i.e. abort == True)
270 :param guids: List of guids
278 return self.wait(guids, state = ResourceState.STARTED,
281 def wait_released(self, guids):
282 """ Blocking method that waits until all RMs in the 'guids' list
283 have reached a state == RELEASED, or until the EC fails
285 :param guids: List of guids
291 return self._state == ECState.FAILED
293 return self.wait(guids, state = ResourceState.RELEASED,
296 def wait_deployed(self, guids):
297 """ Blocking method that waits until all RMs in the 'guids' list
298 have reached a state >= READY, or until a failure in the
299 experiment occurs (i.e. abort == True)
301 :param guids: List of guids
309 return self.wait(guids, state = ResourceState.READY,
312 def wait(self, guids, state, quit):
313 """ Blocking method that waits until all RMs in the 'guids' list
314 have reached a state >= 'state', or until the 'quit' callback
317 :param guids: List of guids
322 if isinstance(guids, int):
325 # Make a copy to avoid modifying the original guids list
329 # If there are no more guids to wait for
330 # or the quit function returns True, exit the loop
331 if len(guids) == 0 or quit():
334 # If a guid reached one of the target states, remove it from list
336 rstate = self.state(guid)
338 hrrstate = ResourceState2str.get(rstate)
339 hrstate = ResourceState2str.get(state)
343 rm = self.get_resource(guid)
344 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
345 rm.get_rtype(), guid, hrrstate, hrstate))
348 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
349 guid, hrrstate, hrstate))
352 def get_task(self, tid):
353 """ Returns a task by its id
355 :param tid: Id of the task
361 return self._tasks.get(tid)
363 def get_resource(self, guid):
364 """ Returns a registered ResourceManager by its guid
366 :param guid: Id of the task
369 :rtype: ResourceManager
372 return self._resources.get(guid)
376 """ Returns the set() of guids of all the ResourceManager
378 :return: Set of all RM guids
382 return self._resources.keys()
384 def register_resource(self, rtype, guid = None):
385 """ Registers a new ResourceManager of type 'rtype' in the experiment
387 This method will assign a new 'guid' for the RM, if no guid
390 :param rtype: Type of the RM
393 :return: Guid of the RM
397 # Get next available guid
398 guid = self._guid_generator.next(guid)
401 rm = ResourceFactory.create(rtype, self, guid)
404 self._resources[guid] = rm
408 def get_attributes(self, guid):
409 """ Returns all the attributes of the RM with guid 'guid'
411 :param guid: Guid of the RM
414 :return: List of attributes
418 rm = self.get_resource(guid)
419 return rm.get_attributes()
421 def get_attribute(self, guid, name):
422 """ Returns the attribute 'name' of the RM with guid 'guid'
424 :param guid: Guid of the RM
427 :param name: Name of the attribute
430 :return: The attribute with name 'name'
434 rm = self.get_resource(guid)
435 return rm.get_attribute(name)
437 def register_connection(self, guid1, guid2):
438 """ Registers a connection between a RM with guid 'guid1'
439 and another RM with guid 'guid2'.
441 The order of the in which the two guids are provided is not
442 important, since the connection relationship is symmetric.
444 :param guid1: First guid to connect
445 :type guid1: ResourceManager
447 :param guid2: Second guid to connect
448 :type guid: ResourceManager
451 rm1 = self.get_resource(guid1)
452 rm2 = self.get_resource(guid2)
454 rm1.register_connection(guid2)
455 rm2.register_connection(guid1)
457 def register_condition(self, guids1, action, guids2, state,
459 """ Registers an action START, STOP or DEPLOY for all RM on list
460 guids1 to occur at time 'time' after all elements in list guids2
461 have reached state 'state'.
463 :param guids1: List of guids of RMs subjected to action
466 :param action: Action to perform (either START, STOP or DEPLOY)
467 :type action: ResourceAction
469 :param guids2: List of guids of RMs to we waited for
472 :param state: State to wait for on RMs of list guids2 (STARTED,
474 :type state: ResourceState
476 :param time: Time to wait after guids2 has reached status
480 if isinstance(guids1, int):
482 if isinstance(guids2, int):
486 rm = self.get_resource(guid1)
487 rm.register_condition(action, guids2, state, time)
489 def enable_trace(self, guid, name):
490 """ Enables a trace to be collected during the experiment run
492 :param name: Name of the trace
496 rm = self.get_resource(guid)
497 rm.enable_trace(name)
499 def trace_enabled(self, guid, name):
500 """ Returns True if the trace of name 'name' is enabled
502 :param name: Name of the trace
506 rm = self.get_resource(guid)
507 return rm.trace_enabled(name)
509 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
510 """ Returns information on a collected trace, the trace stream or
511 blocks (chunks) of the trace stream
513 :param name: Name of the trace
516 :param attr: Can be one of:
517 - TraceAttr.ALL (complete trace content),
518 - TraceAttr.STREAM (block in bytes to read starting
520 - TraceAttr.PATH (full path to the trace file),
521 - TraceAttr.SIZE (size of trace file).
524 :param block: Number of bytes to retrieve from trace, when attr is
528 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
534 rm = self.get_resource(guid)
535 return rm.trace(name, attr, block, offset)
537 def get_traces(self, guid):
538 """ Returns the list of the trace names of the RM with guid 'guid'
540 :param guid: Guid of the RM
543 :return: List of trace names
547 rm = self.get_resource(guid)
548 return rm.get_traces()
551 def discover(self, guid):
552 """ Discovers an available resource matching the criteria defined
553 by the RM with guid 'guid', and associates that resource to the RM
555 Not all RM types require (or are capable of) performing resource
556 discovery. For the RM types which are not capable of doing so,
557 invoking this method does not have any consequences.
559 :param guid: Guid of the RM
563 rm = self.get_resource(guid)
566 def provision(self, guid):
567 """ Provisions the resource associated to the RM with guid 'guid'.
569 Provisioning means making a resource 'accessible' to the user.
570 Not all RM types require (or are capable of) performing resource
571 provisioning. For the RM types which are not capable of doing so,
572 invoking this method does not have any consequences.
574 :param guid: Guid of the RM
578 rm = self.get_resource(guid)
579 return rm.provision()
581 def get(self, guid, name):
582 """ Returns the value of the attribute with name 'name' on the
585 :param guid: Guid of the RM
588 :param name: Name of the attribute
591 :return: The value of the attribute with name 'name'
594 rm = self.get_resource(guid)
597 def set(self, guid, name, value):
598 """ Modifies the value of the attribute with name 'name' on the
601 :param guid: Guid of the RM
604 :param name: Name of the attribute
607 :param value: Value of the attribute
610 rm = self.get_resource(guid)
611 return rm.set(name, value)
613 def state(self, guid, hr = False):
614 """ Returns the state of a resource
616 :param guid: Resource guid
619 :param hr: Human readable. Forces return of a
620 status string instead of a number
624 rm = self.get_resource(guid)
628 return ResourceState2str.get(state)
632 def stop(self, guid):
633 """ Stops the RM with guid 'guid'
635 Stopping a RM means that the resource it controls will
636 no longer take part of the experiment.
638 :param guid: Guid of the RM
642 rm = self.get_resource(guid)
645 def start(self, guid):
646 """ Starts the RM with guid 'guid'
648 Starting a RM means that the resource it controls will
649 begin taking part of the experiment.
651 :param guid: Guid of the RM
655 rm = self.get_resource(guid)
658 def set_with_conditions(self, name, value, guids1, guids2, state,
660 """ Modifies the value of attribute with name 'name' on all RMs
661 on the guids1 list when time 'time' has elapsed since all
662 elements in guids2 list have reached state 'state'.
664 :param name: Name of attribute to set in RM
667 :param value: Value of attribute to set in RM
670 :param guids1: List of guids of RMs subjected to action
673 :param action: Action to register (either START or STOP)
674 :type action: ResourceAction
676 :param guids2: List of guids of RMs to we waited for
679 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
680 :type state: ResourceState
682 :param time: Time to wait after guids2 has reached status
686 if isinstance(guids1, int):
688 if isinstance(guids2, int):
692 rm = self.get_resource(guid)
693 rm.set_with_conditions(name, value, guids2, state, time)
695 def deploy(self, guids = None, wait_all_ready = True, group = None):
696 """ Deploys all ResourceManagers in the guids list.
698 If the argument 'guids' is not given, all RMs with state NEW
701 :param guids: List of guids of RMs to deploy
704 :param wait_all_ready: Wait until all RMs are ready in
705 order to start the RMs
708 :param group: Id of deployment group in which to deploy RMs
712 self.logger.debug(" ------- DEPLOY START ------ ")
715 # If no guids list was passed, all 'NEW' RMs will be deployed
717 for guid in self.resources:
718 if self.state(guid) == ResourceState.NEW:
721 if isinstance(guids, int):
724 # Create deployment group
725 # New guids can be added to a same deployment group later on
729 group = self._group_id_generator.next()
731 if group not in self._groups:
732 self._groups[group] = []
734 self._groups[group].extend(guids)
736 def wait_all_and_start(group):
737 # Function that checks if all resources are READY
738 # before scheduling a start_with_conditions for each RM
741 # Get all guids in group
742 guids = self._groups[group]
745 if self.state(guid) < ResourceState.READY:
751 callback = functools.partial(wait_all_and_start, group)
752 self.schedule("1s", callback)
754 # If all resources are ready, we schedule the start
756 rm = self.get_resource(guid)
757 self.schedule("0s", rm.start_with_conditions)
759 if wait_all_ready and new_group:
760 # Schedule a function to check that all resources are
761 # READY, and only then schedule the start.
762 # This aims at reducing the number of tasks looping in the
764 # Instead of having many start tasks, we will have only one for
766 callback = functools.partial(wait_all_and_start, group)
767 self.schedule("0s", callback)
770 rm = self.get_resource(guid)
771 rm.deployment_group = group
772 self.schedule("0s", rm.deploy_with_conditions)
774 if not wait_all_ready:
775 self.schedule("0s", rm.start_with_conditions)
777 if rm.conditions.get(ResourceAction.STOP):
778 # Only if the RM has STOP conditions we
779 # schedule a stop. Otherwise the RM will stop immediately
780 self.schedule("0s", rm.stop_with_conditions)
782 def release(self, guids = None):
783 """ Releases all ResourceManagers in the guids list.
785 If the argument 'guids' is not given, all RMs registered
786 in the experiment are released.
788 :param guids: List of RM guids
793 guids = self.resources
795 # Remove all pending tasks from the scheduler queue
796 for tid in list(self._scheduler.pending):
797 self._scheduler.remove(tid)
802 rm = self.get_resource(guid)
803 self.schedule("0s", rm.release)
805 self.wait_released(guids)
808 """ Releases all resources and stops the ExperimentController
811 # If there was a major failure we can't exit gracefully
812 if self._state == ECState.FAILED:
813 raise RuntimeError("EC failure. Can not exit gracefully")
817 # Mark the EC state as TERMINATED
818 self._state = ECState.TERMINATED
820 # Stop processing thread
823 # Notify condition to wake up the processing thread
826 if self._thread.is_alive():
829 def schedule(self, date, callback, track = False):
830 """ Schedules a callback to be executed at time 'date'.
832 :param date: string containing execution time for the task.
833 It can be expressed as an absolute time, using
834 timestamp format, or as a relative time matching
835 ^\d+.\d+(h|m|s|ms|us)$
837 :param callback: code to be executed for the task. Must be a
838 Python function, and receives args and kwargs
841 :param track: if set to True, the task will be retrievable with
842 the get_task() method
844 :return : The Id of the task
848 timestamp = stabsformat(date)
849 task = Task(timestamp, callback)
850 task = self._scheduler.schedule(task)
853 self._tasks[task.id] = task
855 # Notify condition to wake up the processing thread
861 """ Process scheduled tasks.
865 Tasks are scheduled by invoking the schedule method with a target
866 callback and an execution time.
867 The schedule method creates a new Task object with that callback
868 and execution time, and pushes it into the '_scheduler' queue.
869 The execution time and the order of arrival of tasks are used
870 to order the tasks in the queue.
872 The _process method is executed in an independent thread held by
873 the ExperimentController for as long as the experiment is running.
874 This method takes tasks from the '_scheduler' queue in a loop
875 and processes them in parallel using multithreading.
876 The environmental variable NEPI_NTHREADS can be used to control
877 the number of threads used to process tasks. The default value is
880 To execute tasks in parallel, a ParallelRunner (PR) object is used.
881 This object keeps a pool of threads (workers), and a queue of tasks
882 scheduled for 'immediate' execution.
884 On each iteration, the '_process' loop will take the next task that
885 is scheduled for 'future' execution from the '_scheduler' queue,
886 and if the execution time of that task is >= to the current time,
887 it will push that task into the PR for 'immediate execution'.
888 As soon as a worker is free, the PR will assign the next task to
891 Upon receiving a task to execute, each PR worker (thread) will
892 invoke the _execute method of the EC, passing the task as
894 The _execute method will then invoke task.callback inside a
895 try/except block. If an exception is raised by the tasks.callback,
896 it will be trapped by the try block, logged to standard error
897 (usually the console), and the task will be marked as failed.
903 while not self._stop:
907 task = self._scheduler.next()
910 # No task to execute. Wait for a new task to be scheduled.
913 # The task timestamp is in the future. Wait for timeout
914 # or until another task is scheduled.
916 if now < task.timestamp:
917 # Calculate timeout in seconds
918 timeout = tdiffsec(task.timestamp, now)
920 # Re-schedule task with the same timestamp
921 self._scheduler.schedule(task)
925 # Wait timeout or until a new task awakes the condition
926 self._cond.wait(timeout)
931 # Process tasks in parallel
932 self._runner.put(self._execute, task)
935 err = traceback.format_exc()
936 self.logger.error("Error while processing tasks in the EC: %s" % err)
938 # Set the EC to FAILED state
939 self._state = ECState.FAILED
941 # Set the FailureManager failure level to EC failure
942 self._fm.set_ec_failure()
944 self.logger.debug("Exiting the task processing loop ... ")
947 self._runner.destroy()
949 def _execute(self, task):
950 """ Executes a single task.
952 :param task: Object containing the callback to execute
957 task.status = TaskStatus.DONE
960 task.result = task.callback()
963 err = traceback.format_exc()
965 task.status = TaskStatus.ERROR
967 self.logger.error("Error occurred while executing task: %s" % err)
970 """ Awakes the processing thread if it is blocked waiting
971 for new tasks to arrive