2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39 class FailureLevel(object):
40 """ Describes the system failure state """
45 class FailureManager(object):
46 """ The FailureManager is responsible for handling errors
47 and deciding whether an experiment should be aborted or not
51 def __init__(self, ec):
52 self._ec = weakref.ref(ec)
53 self._failure_level = FailureLevel.OK
57 """ Returns the ExperimentController associated to this FailureManager
65 if self._failure_level == FailureLevel.OK:
66 for guid in self.ec.resources:
68 state = self.ec.state(guid)
69 critical = self.ec.get(guid, "critical")
70 if state == ResourceState.FAILED and critical:
71 self._failure_level = FailureLevel.RM_FAILURE
72 self.ec.logger.debug("RM critical failure occurred on guid %d." \
73 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
76 # An error might occure because a RM was deleted abruptly.
77 # In this case the error should be ignored.
78 if guid in self.ec._resources:
81 return self._failure_level != FailureLevel.OK
83 def set_ec_failure(self):
84 self._failure_level = FailureLevel.EC_FAILURE
86 class ECState(object):
87 """ Possible states for an ExperimentController
94 class ExperimentController(object):
96 .. class:: Class Args :
98 :param exp_id: Human readable identifier for the experiment scenario.
103 An experiment, or scenario, is defined by a concrete set of resources,
104 behavior, configuration and interconnection of those resources.
105 The Experiment Description (ED) is a detailed representation of a
106 single experiment. It contains all the necessary information to
107 allow repeating the experiment. NEPI allows to describe
108 experiments by registering components (resources), configuring them
109 and interconnecting them.
111 A same experiment (scenario) can be executed many times, generating
112 different results. We call an experiment execution (instance) a 'run'.
114 The ExperimentController (EC), is the entity responsible of
115 managing an experiment run. The same scenario can be
116 recreated (and re-run) by instantiating an EC and recreating
117 the same experiment description.
119 In NEPI, an experiment is represented as a graph of interconnected
120 resources. A resource is a generic concept in the sense that any
121 component taking part of an experiment, whether physical of
122 virtual, is considered a resource. A resources could be a host,
123 a virtual machine, an application, a simulator, a IP address.
125 A ResourceManager (RM), is the entity responsible for managing a
126 single resource. ResourceManagers are specific to a resource
127 type (i.e. An RM to control a Linux application will not be
128 the same as the RM used to control a ns-3 simulation).
129 To support a new type of resource in NEPI, a new RM must be
130 implemented. NEPI already provides a variety of
131 RMs to control basic resources, and new can be extended from
134 Through the EC interface the user can create ResourceManagers (RMs),
135 configure them and interconnect them, to describe an experiment.
136 Describing an experiment through the EC does not run the experiment.
137 Only when the 'deploy()' method is invoked on the EC, the EC will take
138 actions to transform the 'described' experiment into a 'running' experiment.
140 While the experiment is running, it is possible to continue to
141 create/configure/connect RMs, and to deploy them to involve new
142 resources in the experiment (this is known as 'interactive' deployment).
144 An experiments in NEPI is identified by a string id,
145 which is either given by the user, or automatically generated by NEPI.
146 The purpose of this identifier is to separate files and results that
147 belong to different experiment scenarios.
148 However, since a same 'experiment' can be run many times, the experiment
149 id is not enough to identify an experiment instance (run).
150 For this reason, the ExperimentController has two identifier, the
151 exp_id, which can be re-used in different ExperimentController,
152 and the run_id, which is unique to one ExperimentController instance, and
153 is automatically generated by NEPI.
157 def __init__(self, exp_id = None):
158 super(ExperimentController, self).__init__()
161 self._logger = logging.getLogger("ExperimentController")
163 # Run identifier. It identifies a concrete execution instance (run)
165 # Since a same experiment (same configuration) can be executed many
166 # times, this run_id permits to separate result files generated on
167 # different experiment executions
168 self._run_id = tsformat()
170 # Experiment identifier. Usually assigned by the user
171 # Identifies the experiment scenario (i.e. configuration,
172 # resources used, etc)
173 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
175 # generator of globally unique ids
176 self._guid_generator = guid.GuidGenerator()
179 self._resources = dict()
181 # Scheduler. It a queue that holds tasks scheduled for
182 # execution, and yields the next task to be executed
183 # ordered by execution and arrival time
184 self._scheduler = HeapScheduler()
189 # RM groups (for deployment)
190 self._groups = dict()
192 # generator of globally unique id for groups
193 self._group_id_generator = guid.GuidGenerator()
195 # Flag to stop processing thread
198 # Entity in charge of managing system failures
199 self._fm = FailureManager(self)
202 self._state = ECState.RUNNING
204 # Blacklist file for PL nodes
205 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
206 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
207 if not os.path.exists(plblacklist_file):
208 if os.path.isdir(nepi_home):
209 open(plblacklist_file, 'w').close()
211 os.makedirs(nepi_home)
212 open(plblacklist_file, 'w').close()
214 # The runner is a pool of threads used to parallelize
216 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
217 self._runner = ParallelRun(maxthreads = nthreads)
219 # Event processing thread
220 self._cond = threading.Condition()
221 self._thread = threading.Thread(target = self._process)
222 self._thread.setDaemon(True)
227 """ Returns the logger instance of the Experiment Controller
234 """ Returns the state of the Experiment Controller
241 """ Returns the experiment id assigned by the user
248 """ Returns the experiment instance (run) identifier (automatically
256 """ Returns True if the experiment has failed and should be interrupted,
260 return self._fm.abort
262 def wait_finished(self, guids):
263 """ Blocking method that waits until all RMs in the 'guids' list
264 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
265 RELEASED ), or until a failure in the experiment occurs
268 :param guids: List of guids
276 return self.wait(guids, state = ResourceState.STOPPED,
279 def wait_started(self, guids):
280 """ Blocking method that waits until all RMs in the 'guids' list
281 have reached a state >= STARTED, or until a failure in the
282 experiment occurs (i.e. abort == True)
284 :param guids: List of guids
292 return self.wait(guids, state = ResourceState.STARTED,
295 def wait_released(self, guids):
296 """ Blocking method that waits until all RMs in the 'guids' list
297 have reached a state == RELEASED, or until the EC fails
299 :param guids: List of guids
305 return self._state == ECState.FAILED
307 return self.wait(guids, state = ResourceState.RELEASED,
310 def wait_deployed(self, guids):
311 """ Blocking method that waits until all RMs in the 'guids' list
312 have reached a state >= READY, or until a failure in the
313 experiment occurs (i.e. abort == True)
315 :param guids: List of guids
323 return self.wait(guids, state = ResourceState.READY,
326 def wait(self, guids, state, quit):
327 """ Blocking method that waits until all RMs in the 'guids' list
328 have reached a state >= 'state', or until the 'quit' callback
331 :param guids: List of guids
336 if isinstance(guids, int):
339 # Make a copy to avoid modifying the original guids list
343 # If there are no more guids to wait for
344 # or the quit function returns True, exit the loop
345 if len(guids) == 0 or quit():
348 # If a guid reached one of the target states, remove it from list
350 rstate = self.state(guid)
352 hrrstate = ResourceState2str.get(rstate)
353 hrstate = ResourceState2str.get(state)
357 rm = self.get_resource(guid)
358 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
359 rm.get_rtype(), guid, hrrstate, hrstate))
362 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
363 guid, hrrstate, hrstate))
366 def get_task(self, tid):
367 """ Returns a task by its id
369 :param tid: Id of the task
375 return self._tasks.get(tid)
377 def get_resource(self, guid):
378 """ Returns a registered ResourceManager by its guid
380 :param guid: Id of the task
383 :rtype: ResourceManager
386 rm = self._resources.get(guid)
389 def remove_resource(self, guid):
390 del self._resources[guid]
394 """ Returns the set() of guids of all the ResourceManager
396 :return: Set of all RM guids
400 keys = self._resources.keys()
404 def register_resource(self, rtype, guid = None):
405 """ Registers a new ResourceManager of type 'rtype' in the experiment
407 This method will assign a new 'guid' for the RM, if no guid
410 :param rtype: Type of the RM
413 :return: Guid of the RM
417 # Get next available guid
418 guid = self._guid_generator.next(guid)
421 rm = ResourceFactory.create(rtype, self, guid)
424 self._resources[guid] = rm
428 def get_attributes(self, guid):
429 """ Returns all the attributes of the RM with guid 'guid'
431 :param guid: Guid of the RM
434 :return: List of attributes
438 rm = self.get_resource(guid)
439 return rm.get_attributes()
441 def get_attribute(self, guid, name):
442 """ Returns the attribute 'name' of the RM with guid 'guid'
444 :param guid: Guid of the RM
447 :param name: Name of the attribute
450 :return: The attribute with name 'name'
454 rm = self.get_resource(guid)
455 return rm.get_attribute(name)
457 def register_connection(self, guid1, guid2):
458 """ Registers a connection between a RM with guid 'guid1'
459 and another RM with guid 'guid2'.
461 The order of the in which the two guids are provided is not
462 important, since the connection relationship is symmetric.
464 :param guid1: First guid to connect
465 :type guid1: ResourceManager
467 :param guid2: Second guid to connect
468 :type guid: ResourceManager
471 rm1 = self.get_resource(guid1)
472 rm2 = self.get_resource(guid2)
474 rm1.register_connection(guid2)
475 rm2.register_connection(guid1)
477 def register_condition(self, guids1, action, guids2, state,
479 """ Registers an action START, STOP or DEPLOY for all RM on list
480 guids1 to occur at time 'time' after all elements in list guids2
481 have reached state 'state'.
483 :param guids1: List of guids of RMs subjected to action
486 :param action: Action to perform (either START, STOP or DEPLOY)
487 :type action: ResourceAction
489 :param guids2: List of guids of RMs to we waited for
492 :param state: State to wait for on RMs of list guids2 (STARTED,
494 :type state: ResourceState
496 :param time: Time to wait after guids2 has reached status
500 if isinstance(guids1, int):
502 if isinstance(guids2, int):
506 rm = self.get_resource(guid1)
507 rm.register_condition(action, guids2, state, time)
509 def enable_trace(self, guid, name):
510 """ Enables a trace to be collected during the experiment run
512 :param name: Name of the trace
516 rm = self.get_resource(guid)
517 rm.enable_trace(name)
519 def trace_enabled(self, guid, name):
520 """ Returns True if the trace of name 'name' is enabled
522 :param name: Name of the trace
526 rm = self.get_resource(guid)
527 return rm.trace_enabled(name)
529 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
530 """ Returns information on a collected trace, the trace stream or
531 blocks (chunks) of the trace stream
533 :param name: Name of the trace
536 :param attr: Can be one of:
537 - TraceAttr.ALL (complete trace content),
538 - TraceAttr.STREAM (block in bytes to read starting
540 - TraceAttr.PATH (full path to the trace file),
541 - TraceAttr.SIZE (size of trace file).
544 :param block: Number of bytes to retrieve from trace, when attr is
548 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
554 rm = self.get_resource(guid)
555 return rm.trace(name, attr, block, offset)
557 def get_traces(self, guid):
558 """ Returns the list of the trace names of the RM with guid 'guid'
560 :param guid: Guid of the RM
563 :return: List of trace names
567 rm = self.get_resource(guid)
568 return rm.get_traces()
571 def discover(self, guid):
572 """ Discovers an available resource matching the criteria defined
573 by the RM with guid 'guid', and associates that resource to the RM
575 Not all RM types require (or are capable of) performing resource
576 discovery. For the RM types which are not capable of doing so,
577 invoking this method does not have any consequences.
579 :param guid: Guid of the RM
583 rm = self.get_resource(guid)
586 def provision(self, guid):
587 """ Provisions the resource associated to the RM with guid 'guid'.
589 Provisioning means making a resource 'accessible' to the user.
590 Not all RM types require (or are capable of) performing resource
591 provisioning. For the RM types which are not capable of doing so,
592 invoking this method does not have any consequences.
594 :param guid: Guid of the RM
598 rm = self.get_resource(guid)
599 return rm.provision()
601 def get(self, guid, name):
602 """ Returns the value of the attribute with name 'name' on the
605 :param guid: Guid of the RM
608 :param name: Name of the attribute
611 :return: The value of the attribute with name 'name'
614 rm = self.get_resource(guid)
617 def set(self, guid, name, value):
618 """ Modifies the value of the attribute with name 'name' on the
621 :param guid: Guid of the RM
624 :param name: Name of the attribute
627 :param value: Value of the attribute
630 rm = self.get_resource(guid)
633 def get_global(self, rtype, name):
634 """ Returns the value of the global attribute with name 'name' on the
635 RMs of rtype 'rtype'.
637 :param guid: Guid of the RM
640 :param name: Name of the attribute
643 :return: The value of the attribute with name 'name'
646 rclass = ResourceFactory.get_resource_type(rtype)
647 return rclass.get_global(name)
649 def set_global(self, rtype, name, value):
650 """ Modifies the value of the global attribute with name 'name' on the
651 RMs of with rtype 'rtype'.
653 :param guid: Guid of the RM
656 :param name: Name of the attribute
659 :param value: Value of the attribute
662 rclass = ResourceFactory.get_resource_type(rtype)
663 return rclass.set_global(name, value)
665 def state(self, guid, hr = False):
666 """ Returns the state of a resource
668 :param guid: Resource guid
671 :param hr: Human readable. Forces return of a
672 status string instead of a number
676 rm = self.get_resource(guid)
680 return ResourceState2str.get(state)
684 def stop(self, guid):
685 """ Stops the RM with guid 'guid'
687 Stopping a RM means that the resource it controls will
688 no longer take part of the experiment.
690 :param guid: Guid of the RM
694 rm = self.get_resource(guid)
697 def start(self, guid):
698 """ Starts the RM with guid 'guid'
700 Starting a RM means that the resource it controls will
701 begin taking part of the experiment.
703 :param guid: Guid of the RM
707 rm = self.get_resource(guid)
710 def get_start_time(self, guid):
711 """ Returns the start time of the RM as a timestamp """
712 rm = self.get_resource(guid)
715 def get_stop_time(self, guid):
716 """ Returns the stop time of the RM as a timestamp """
717 rm = self.get_resource(guid)
720 def get_discover_time(self, guid):
721 """ Returns the discover time of the RM as a timestamp """
722 rm = self.get_resource(guid)
723 return rm.discover_time
725 def get_provision_time(self, guid):
726 """ Returns the provision time of the RM as a timestamp """
727 rm = self.get_resource(guid)
728 return rm.provision_time
730 def get_ready_time(self, guid):
731 """ Returns the deployment time of the RM as a timestamp """
732 rm = self.get_resource(guid)
735 def get_release_time(self, guid):
736 """ Returns the release time of the RM as a timestamp """
737 rm = self.get_resource(guid)
738 return rm.release_time
740 def get_failed_time(self, guid):
741 """ Returns the time failure occured for the RM as a timestamp """
742 rm = self.get_resource(guid)
743 return rm.failed_time
745 def set_with_conditions(self, name, value, guids1, guids2, state,
747 """ Modifies the value of attribute with name 'name' on all RMs
748 on the guids1 list when time 'time' has elapsed since all
749 elements in guids2 list have reached state 'state'.
751 :param name: Name of attribute to set in RM
754 :param value: Value of attribute to set in RM
757 :param guids1: List of guids of RMs subjected to action
760 :param action: Action to register (either START or STOP)
761 :type action: ResourceAction
763 :param guids2: List of guids of RMs to we waited for
766 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
767 :type state: ResourceState
769 :param time: Time to wait after guids2 has reached status
773 if isinstance(guids1, int):
775 if isinstance(guids2, int):
779 rm = self.get_resource(guid)
780 rm.set_with_conditions(name, value, guids2, state, time)
782 def deploy(self, guids = None, wait_all_ready = True, group = None):
783 """ Deploys all ResourceManagers in the guids list.
785 If the argument 'guids' is not given, all RMs with state NEW
788 :param guids: List of guids of RMs to deploy
791 :param wait_all_ready: Wait until all RMs are ready in
792 order to start the RMs
795 :param group: Id of deployment group in which to deploy RMs
799 self.logger.debug(" ------- DEPLOY START ------ ")
802 # If no guids list was passed, all 'NEW' RMs will be deployed
804 for guid in self.resources:
805 if self.state(guid) == ResourceState.NEW:
808 if isinstance(guids, int):
811 # Create deployment group
812 # New guids can be added to a same deployment group later on
816 group = self._group_id_generator.next()
818 if group not in self._groups:
819 self._groups[group] = []
821 self._groups[group].extend(guids)
823 def wait_all_and_start(group):
824 # Function that checks if all resources are READY
825 # before scheduling a start_with_conditions for each RM
828 # Get all guids in group
829 guids = self._groups[group]
832 if self.state(guid) < ResourceState.READY:
837 callback = functools.partial(wait_all_and_start, group)
838 self.schedule("1s", callback)
840 # If all resources are ready, we schedule the start
842 rm = self.get_resource(guid)
843 self.schedule("0s", rm.start_with_conditions)
845 if rm.conditions.get(ResourceAction.STOP):
846 # Only if the RM has STOP conditions we
847 # schedule a stop. Otherwise the RM will stop immediately
848 self.schedule("0s", rm.stop_with_conditions)
850 if wait_all_ready and new_group:
851 # Schedule a function to check that all resources are
852 # READY, and only then schedule the start.
853 # This aims at reducing the number of tasks looping in the
855 # Instead of having many start tasks, we will have only one for
857 callback = functools.partial(wait_all_and_start, group)
858 self.schedule("0s", callback)
861 rm = self.get_resource(guid)
862 rm.deployment_group = group
863 self.schedule("0s", rm.deploy_with_conditions)
865 if not wait_all_ready:
866 self.schedule("0s", rm.start_with_conditions)
868 if rm.conditions.get(ResourceAction.STOP):
869 # Only if the RM has STOP conditions we
870 # schedule a stop. Otherwise the RM will stop immediately
871 self.schedule("0s", rm.stop_with_conditions)
873 def release(self, guids = None):
874 """ Releases all ResourceManagers in the guids list.
876 If the argument 'guids' is not given, all RMs registered
877 in the experiment are released.
879 :param guids: List of RM guids
883 if isinstance(guids, int):
887 guids = self.resources
890 rm = self.get_resource(guid)
891 self.schedule("0s", rm.release)
893 self.wait_released(guids)
896 if self.get(guid, "hardRelease"):
897 self.remove_resource(guid)
900 """ Releases all resources and stops the ExperimentController
903 # If there was a major failure we can't exit gracefully
904 if self._state == ECState.FAILED:
905 raise RuntimeError("EC failure. Can not exit gracefully")
907 # Remove all pending tasks from the scheduler queue
908 for tid in list(self._scheduler.pending):
909 self._scheduler.remove(tid)
911 # Remove pending tasks from the workers queue
916 # Mark the EC state as TERMINATED
917 self._state = ECState.TERMINATED
919 # Stop processing thread
922 # Notify condition to wake up the processing thread
925 if self._thread.is_alive():
928 def schedule(self, date, callback, track = False):
929 """ Schedules a callback to be executed at time 'date'.
931 :param date: string containing execution time for the task.
932 It can be expressed as an absolute time, using
933 timestamp format, or as a relative time matching
934 ^\d+.\d+(h|m|s|ms|us)$
936 :param callback: code to be executed for the task. Must be a
937 Python function, and receives args and kwargs
940 :param track: if set to True, the task will be retrievable with
941 the get_task() method
943 :return : The Id of the task
947 timestamp = stabsformat(date)
948 task = Task(timestamp, callback)
949 task = self._scheduler.schedule(task)
952 self._tasks[task.id] = task
954 # Notify condition to wake up the processing thread
960 """ Process scheduled tasks.
964 Tasks are scheduled by invoking the schedule method with a target
965 callback and an execution time.
966 The schedule method creates a new Task object with that callback
967 and execution time, and pushes it into the '_scheduler' queue.
968 The execution time and the order of arrival of tasks are used
969 to order the tasks in the queue.
971 The _process method is executed in an independent thread held by
972 the ExperimentController for as long as the experiment is running.
973 This method takes tasks from the '_scheduler' queue in a loop
974 and processes them in parallel using multithreading.
975 The environmental variable NEPI_NTHREADS can be used to control
976 the number of threads used to process tasks. The default value is
979 To execute tasks in parallel, a ParallelRunner (PR) object is used.
980 This object keeps a pool of threads (workers), and a queue of tasks
981 scheduled for 'immediate' execution.
983 On each iteration, the '_process' loop will take the next task that
984 is scheduled for 'future' execution from the '_scheduler' queue,
985 and if the execution time of that task is >= to the current time,
986 it will push that task into the PR for 'immediate execution'.
987 As soon as a worker is free, the PR will assign the next task to
990 Upon receiving a task to execute, each PR worker (thread) will
991 invoke the _execute method of the EC, passing the task as
993 The _execute method will then invoke task.callback inside a
994 try/except block. If an exception is raised by the tasks.callback,
995 it will be trapped by the try block, logged to standard error
996 (usually the console), and the task will be marked as failed.
1000 self._runner.start()
1002 while not self._stop:
1004 self._cond.acquire()
1006 task = self._scheduler.next()
1009 # No task to execute. Wait for a new task to be scheduled.
1012 # The task timestamp is in the future. Wait for timeout
1013 # or until another task is scheduled.
1015 if now < task.timestamp:
1016 # Calculate timeout in seconds
1017 timeout = tdiffsec(task.timestamp, now)
1019 # Re-schedule task with the same timestamp
1020 self._scheduler.schedule(task)
1024 # Wait timeout or until a new task awakes the condition
1025 self._cond.wait(timeout)
1027 self._cond.release()
1030 # Process tasks in parallel
1031 self._runner.put(self._execute, task)
1034 err = traceback.format_exc()
1035 self.logger.error("Error while processing tasks in the EC: %s" % err)
1037 # Set the EC to FAILED state
1038 self._state = ECState.FAILED
1040 # Set the FailureManager failure level to EC failure
1041 self._fm.set_ec_failure()
1043 self.logger.debug("Exiting the task processing loop ... ")
1046 self._runner.destroy()
1048 def _execute(self, task):
1049 """ Executes a single task.
1051 :param task: Object containing the callback to execute
1057 task.result = task.callback()
1058 task.status = TaskStatus.DONE
1061 err = traceback.format_exc()
1063 task.status = TaskStatus.ERROR
1065 self.logger.error("Error occurred while executing task: %s" % err)
1068 """ Awakes the processing thread if it is blocked waiting
1069 for new tasks to arrive
1072 self._cond.acquire()
1074 self._cond.release()