2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
40 class FailureLevel(object):
41 """ Describes the system failure state
47 class FailureManager(object):
48 """ The FailureManager is responsible for handling errors,
49 and deciding whether an experiment should be aborted
52 def __init__(self, ec):
53 self._ec = weakref.ref(ec)
54 self._failure_level = FailureLevel.OK
58 """ Returns the Experiment Controller """
63 if self._failure_level == FailureLevel.OK:
64 for guid in self.ec.resources:
65 state = self.ec.state(guid)
66 critical = self.ec.get(guid, "critical")
68 if state == ResourceState.FAILED and critical:
69 self._failure_level = FailureLevel.RM_FAILURE
70 self.ec.logger.debug("RM critical failure occurred on guid %d." \
71 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
74 return self._failure_level != FailureLevel.OK
76 def set_ec_failure(self):
77 self._failure_level = FailureLevel.EC_FAILURE
80 class ECState(object):
81 """ State of the Experiment Controller
88 class ExperimentController(object):
90 .. class:: Class Args :
92 :param exp_id: Human readable identifier for the experiment scenario.
97 An experiment, or scenario, is defined by a concrete set of resources,
98 behavior, configuration and interconnection of those resources.
99 The Experiment Description (ED) is a detailed representation of a
100 single experiment. It contains all the necessary information to
101 allow repeating the experiment. NEPI allows to describe
102 experiments by registering components (resources), configuring them
103 and interconnecting them.
105 A same experiment (scenario) can be executed many times, generating
106 different results. We call an experiment execution (instance) a 'run'.
108 The ExperimentController (EC), is the entity responsible of
109 managing an experiment run. The same scenario can be
110 recreated (and re-run) by instantiating an EC and recreating
111 the same experiment description.
113 In NEPI, an experiment is represented as a graph of interconnected
114 resources. A resource is a generic concept in the sense that any
115 component taking part of an experiment, whether physical of
116 virtual, is considered a resource. A resources could be a host,
117 a virtual machine, an application, a simulator, a IP address.
119 A ResourceManager (RM), is the entity responsible for managing a
120 single resource. ResourceManagers are specific to a resource
121 type (i.e. An RM to control a Linux application will not be
122 the same as the RM used to control a ns-3 simulation).
123 To support a new type of resource in NEPI, a new RM must be
124 implemented. NEPI already provides a variety of
125 RMs to control basic resources, and new can be extended from
128 Through the EC interface the user can create ResourceManagers (RMs),
129 configure them and interconnect them, to describe an experiment.
130 Describing an experiment through the EC does not run the experiment.
131 Only when the 'deploy()' method is invoked on the EC, the EC will take
132 actions to transform the 'described' experiment into a 'running' experiment.
134 While the experiment is running, it is possible to continue to
135 create/configure/connect RMs, and to deploy them to involve new
136 resources in the experiment (this is known as 'interactive' deployment).
138 An experiments in NEPI is identified by a string id,
139 which is either given by the user, or automatically generated by NEPI.
140 The purpose of this identifier is to separate files and results that
141 belong to different experiment scenarios.
142 However, since a same 'experiment' can be run many times, the experiment
143 id is not enough to identify an experiment instance (run).
144 For this reason, the ExperimentController has two identifier, the
145 exp_id, which can be re-used in different ExperimentController,
146 and the run_id, which is unique to one ExperimentController instance, and
147 is automatically generated by NEPI.
151 def __init__(self, exp_id = None):
152 super(ExperimentController, self).__init__()
154 self._logger = logging.getLogger("ExperimentController")
156 # Run identifier. It identifies a concrete execution instance (run)
158 # Since a same experiment (same configuration) can be executed many
159 # times, this run_id permits to separate result files generated on
160 # different experiment executions
161 self._run_id = tsformat()
163 # Experiment identifier. Usually assigned by the user
164 # Identifies the experiment scenario (i.e. configuration,
165 # resources used, etc)
166 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
168 # generator of globally unique ids
169 self._guid_generator = guid.GuidGenerator()
172 self._resources = dict()
174 # Scheduler. It a queue that holds tasks scheduled for
175 # execution, and yields the next task to be executed
176 # ordered by execution and arrival time
177 self._scheduler = HeapScheduler()
182 # RM groups (for deployment)
183 self._groups = dict()
185 # generator of globally unique id for groups
186 self._group_id_generator = guid.GuidGenerator()
188 # Flag to stop processing thread
191 # Entity in charge of managing system failures
192 self._fm = FailureManager(self)
195 self._state = ECState.RUNNING
197 # The runner is a pool of threads used to parallelize
199 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
200 self._runner = ParallelRun(maxthreads = nthreads)
202 # Event processing thread
203 self._cond = threading.Condition()
204 self._thread = threading.Thread(target = self._process)
205 self._thread.setDaemon(True)
210 """ Return the logger of the Experiment Controller
217 """ Return the state of the Experiment Controller
224 """ Return the experiment id assigned by the user
231 """ Return the experiment instance (run) identifier
238 return self._fm.abort
240 def wait_finished(self, guids):
241 """ Blocking method that wait until all RMs in the 'guid' list
242 reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or
243 RELEASED ) or until a System Failure occurs (e.g. Task Failure)
245 :param guids: List of guids
253 return self.wait(guids, state = ResourceState.STOPPED,
256 def wait_started(self, guids):
257 """ Blocking method that wait until all RMs in the 'guid' list
258 reach a state >= STARTED or until a System Failure occurs
261 :param guids: List of guids
268 return self.wait(guids, state = ResourceState.STARTED,
271 def wait_released(self, guids):
272 """ Blocking method that wait until all RMs in the 'guid' list
273 reach a state = RELEASED or until the EC fails
275 :param guids: List of guids
280 return self._state == ECState.FAILED
282 return self.wait(guids, state = ResourceState.RELEASED,
285 def wait_deployed(self, guids):
286 """ Blocking method that wait until all RMs in the 'guid' list
287 reach a state >= READY or until a System Failure occurs
290 :param guids: List of guids
297 return self.wait(guids, state = ResourceState.READY,
300 def wait(self, guids, state, quit):
301 """ Blocking method that wait until all RMs in the 'guid' list
302 reach a state >= 'state' or until quit yileds True
304 :param guids: List of guids
307 if isinstance(guids, int):
311 # If there are no more guids to wait for
312 # or the quit function returns True, exit the loop
313 if len(guids) == 0 or quit():
316 # If a guid reached one of the target states, remove it from list
318 rstate = self.state(guid)
320 hrrstate = ResourceState2str.get(rstate)
321 hrstate = ResourceState2str.get(state)
325 self.logger.debug(" guid %d DONE - state is %s, required is >= %s " % (
326 guid, hrrstate, hrstate))
329 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
330 guid, hrrstate, hrstate))
333 def get_task(self, tid):
334 """ Get a specific task
336 :param tid: Id of the task
340 return self._tasks.get(tid)
342 def get_resource(self, guid):
343 """ Get a specific Resource Manager
345 :param guid: Id of the task
347 :rtype: ResourceManager
349 return self._resources.get(guid)
353 """ Returns the list of all the Resource Manager Id
358 return self._resources.keys()
360 def register_resource(self, rtype, guid = None):
361 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
362 for the RM of type 'rtype' and add it to the list of Resources.
364 :param rtype: Type of the RM
366 :return: Id of the RM
369 # Get next available guid
370 guid = self._guid_generator.next(guid)
373 rm = ResourceFactory.create(rtype, self, guid)
376 self._resources[guid] = rm
380 def get_attributes(self, guid):
381 """ Return all the attibutes of a specific RM
383 :param guid: Guid of the RM
385 :return: List of attributes
388 rm = self.get_resource(guid)
389 return rm.get_attributes()
391 def register_connection(self, guid1, guid2):
392 """ Registers a guid1 with a guid2.
393 The declaration order is not important
395 :param guid1: First guid to connect
396 :type guid1: ResourceManager
398 :param guid2: Second guid to connect
399 :type guid: ResourceManager
401 rm1 = self.get_resource(guid1)
402 rm2 = self.get_resource(guid2)
404 rm1.register_connection(guid2)
405 rm2.register_connection(guid1)
407 def register_condition(self, guids1, action, guids2, state,
409 """ Registers an action START or STOP for all RM on guids1 to occur
410 time 'time' after all elements in guids2 reached state 'state'.
412 :param guids1: List of guids of RMs subjected to action
415 :param action: Action to register (either START or STOP)
416 :type action: ResourceAction
418 :param guids2: List of guids of RMs to we waited for
421 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
422 :type state: ResourceState
424 :param time: Time to wait after guids2 has reached status
428 if isinstance(guids1, int):
430 if isinstance(guids2, int):
434 rm = self.get_resource(guid1)
435 rm.register_condition(action, guids2, state, time)
437 def enable_trace(self, guid, name):
440 :param name: Name of the trace
443 rm = self.get_resource(guid)
444 rm.enable_trace(name)
446 def trace_enabled(self, guid, name):
447 """ Returns True if trace is enabled
449 :param name: Name of the trace
452 rm = self.get_resource(guid)
453 return rm.trace_enabled(name)
455 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
456 """ Get information on collected trace
458 :param name: Name of the trace
461 :param attr: Can be one of:
462 - TraceAttr.ALL (complete trace content),
463 - TraceAttr.STREAM (block in bytes to read starting at offset),
464 - TraceAttr.PATH (full path to the trace file),
465 - TraceAttr.SIZE (size of trace file).
468 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
471 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
476 rm = self.get_resource(guid)
477 return rm.trace(name, attr, block, offset)
479 def discover(self, guid):
480 """ Discover a specific RM defined by its 'guid'
482 :param guid: Guid of the RM
486 rm = self.get_resource(guid)
489 def provision(self, guid):
490 """ Provision a specific RM defined by its 'guid'
492 :param guid: Guid of the RM
496 rm = self.get_resource(guid)
497 return rm.provision()
499 def get(self, guid, name):
500 """ Get a specific attribute 'name' from the RM 'guid'
502 :param guid: Guid of the RM
505 :param name: attribute's name
509 rm = self.get_resource(guid)
512 def set(self, guid, name, value):
513 """ Set a specific attribute 'name' from the RM 'guid'
514 with the value 'value'
516 :param guid: Guid of the RM
519 :param name: attribute's name
522 :param value: attribute's value
525 rm = self.get_resource(guid)
526 return rm.set(name, value)
528 def state(self, guid, hr = False):
529 """ Returns the state of a resource
531 :param guid: Resource guid
534 :param hr: Human readable. Forces return of a
535 status string instead of a number
539 rm = self.get_resource(guid)
543 return ResourceState2str.get(state)
547 def stop(self, guid):
548 """ Stop a specific RM defined by its 'guid'
550 :param guid: Guid of the RM
554 rm = self.get_resource(guid)
557 def start(self, guid):
558 """ Start a specific RM defined by its 'guid'
560 :param guid: Guid of the RM
564 rm = self.get_resource(guid)
567 def set_with_conditions(self, name, value, guids1, guids2, state,
569 """ Set value 'value' on attribute with name 'name' on all RMs of
570 guids1 when 'time' has elapsed since all elements in guids2
571 have reached state 'state'.
573 :param name: Name of attribute to set in RM
576 :param value: Value of attribute to set in RM
579 :param guids1: List of guids of RMs subjected to action
582 :param action: Action to register (either START or STOP)
583 :type action: ResourceAction
585 :param guids2: List of guids of RMs to we waited for
588 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
589 :type state: ResourceState
591 :param time: Time to wait after guids2 has reached status
595 if isinstance(guids1, int):
597 if isinstance(guids2, int):
601 rm = self.get_resource(guid)
602 rm.set_with_conditions(name, value, guids2, state, time)
604 def deploy(self, guids = None, wait_all_ready = True, group = None):
605 """ Deploy all resource manager in guids list
607 :param guids: List of guids of RMs to deploy
610 :param wait_all_ready: Wait until all RMs are ready in
611 order to start the RMs
614 :param group: Id of deployment group in which to deploy RMs
618 self.logger.debug(" ------- DEPLOY START ------ ")
621 # If no guids list was passed, all 'NEW' RMs will be deployed
623 for guid in self.resources:
624 if self.state(guid) == ResourceState.NEW:
627 if isinstance(guids, int):
630 # Create deployment group
631 # New guids can be added to a same deployment group later on
635 group = self._group_id_generator.next()
637 if group not in self._groups:
638 self._groups[group] = []
640 self._groups[group].extend(guids)
642 def wait_all_and_start(group):
643 # Function that checks if all resources are READY
644 # before scheduling a start_with_conditions for each RM
647 # Get all guids in group
648 guids = self._groups[group]
651 if self.state(guid) < ResourceState.READY:
656 callback = functools.partial(wait_all_and_start, group)
657 self.schedule("1s", callback)
659 # If all resources are ready, we schedule the start
661 rm = self.get_resource(guid)
662 self.schedule("0s", rm.start_with_conditions)
664 if wait_all_ready and new_group:
665 # Schedule a function to check that all resources are
666 # READY, and only then schedule the start.
667 # This aims at reducing the number of tasks looping in the
669 # Instead of having many start tasks, we will have only one for
671 callback = functools.partial(wait_all_and_start, group)
672 self.schedule("0s", callback)
675 rm = self.get_resource(guid)
676 rm.deployment_group = group
677 self.schedule("0s", rm.deploy_with_conditions)
679 if not wait_all_ready:
680 self.schedule("0s", rm.start_with_conditions)
682 if rm.conditions.get(ResourceAction.STOP):
683 # Only if the RM has STOP conditions we
684 # schedule a stop. Otherwise the RM will stop immediately
685 self.schedule("0s", rm.stop_with_conditions)
687 def release(self, guids = None):
688 """ Release al RMs on the guids list or
689 all the resources if no list is specified
691 :param guids: List of RM guids
696 guids = self.resources
698 # Remove all pending tasks from the scheduler queue
699 for tid in list(self._scheduler.pending):
700 self._scheduler.remove(tid)
705 rm = self.get_resource(guid)
706 self.schedule("0s", rm.release)
708 self.wait_released(guids)
711 """ Shutdown the Experiment Controller.
712 Releases all the resources and stops task processing thread
715 # If there was a major failure we can't exit gracefully
716 if self._state == ECState.FAILED:
717 raise RuntimeError("EC failure. Can not exit gracefully")
721 # Mark the EC state as TERMINATED
722 self._state = ECState.TERMINATED
724 # Stop processing thread
727 # Notify condition to wake up the processing thread
730 if self._thread.is_alive():
733 def schedule(self, date, callback, track = False):
734 """ Schedule a callback to be executed at time date.
736 :param date: string containing execution time for the task.
737 It can be expressed as an absolute time, using
738 timestamp format, or as a relative time matching
739 ^\d+.\d+(h|m|s|ms|us)$
741 :param callback: code to be executed for the task. Must be a
742 Python function, and receives args and kwargs
745 :param track: if set to True, the task will be retrivable with
746 the get_task() method
748 :return : The Id of the task
750 timestamp = stabsformat(date)
751 task = Task(timestamp, callback)
752 task = self._scheduler.schedule(task)
755 self._tasks[task.id] = task
757 # Notify condition to wake up the processing thread
763 """ Process scheduled tasks.
767 The _process method is executed in an independent thread held by the
768 ExperimentController for as long as the experiment is running.
770 Tasks are scheduled by invoking the schedule method with a target callback.
771 The schedule method is given a execution time which controls the
772 order in which tasks are processed.
774 Tasks are processed in parallel using multithreading.
775 The environmental variable NEPI_NTHREADS can be used to control
776 the number of threads used to process tasks. The default value is 50.
780 To execute tasks in parallel, an ParallelRunner (PR) object, holding
781 a pool of threads (workers), is used.
782 For each available thread in the PR, the next task popped from
783 the scheduler queue is 'put' in the PR.
784 Upon receiving a task to execute, each PR worker (thread) invokes the
785 _execute method of the EC, passing the task as argument.
786 This method, calls task.callback inside a try/except block. If an
787 exception is raised by the tasks.callback, it will be trapped by the
788 try block, logged to standard error (usually the console), and the EC
789 state will be set to ECState.FAILED.
790 The invocation of _notify immediately after, forces the processing
791 loop in the _process method, to wake up if it was blocked waiting for new
792 tasks to arrived, and to check the EC state.
793 As the EC is in FAILED state, the processing loop exits and the
794 'finally' block is invoked. In the 'finally' block, the 'sync' method
795 of the PR is invoked, which forces the PR to raise any unchecked errors
796 that might have been raised by the workers.
802 while not self._stop:
806 task = self._scheduler.next()
809 # No task to execute. Wait for a new task to be scheduled.
812 # The task timestamp is in the future. Wait for timeout
813 # or until another task is scheduled.
815 if now < task.timestamp:
816 # Calculate timeout in seconds
817 timeout = tdiffsec(task.timestamp, now)
819 # Re-schedule task with the same timestamp
820 self._scheduler.schedule(task)
824 # Wait timeout or until a new task awakes the condition
825 self._cond.wait(timeout)
830 # Process tasks in parallel
831 self._runner.put(self._execute, task)
834 err = traceback.format_exc()
835 self.logger.error("Error while processing tasks in the EC: %s" % err)
837 # Set the EC to FAILED state
838 self._state = ECState.FAILED
840 # Set the FailureManager failure level to EC failure
841 self._fm.set_ec_failure()
843 self.logger.debug("Exiting the task processing loop ... ")
846 self._runner.destroy()
848 def _execute(self, task):
849 """ Executes a single task.
851 :param task: Object containing the callback to execute
856 If the invokation of the task callback raises an
857 exception, the processing thread of the ExperimentController
858 will be stopped and the experiment will be aborted.
862 task.status = TaskStatus.DONE
865 task.result = task.callback()
868 err = traceback.format_exc()
870 task.status = TaskStatus.ERROR
872 self.logger.error("Error occurred while executing task: %s" % err)
875 """ Awakes the processing thread in case it is blocked waiting
876 for a new task to be scheduled.