2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39 class FailurePolicy(object):
40 """ Defines how to respond to experiment failures
43 ABORT_ON_RM_FAILURE = 2
45 class FailureLevel(object):
46 """ Describe the system failure state
53 class FailureManager(object):
54 """ The FailureManager is responsible for handling errors,
55 and deciding whether an experiment should be aborted
58 def __init__(self, failure_policy = None):
59 self._failure_level = FailureLevel.OK
60 self._failure_policy = failure_policy or \
61 FailurePolicy.ABORT_ON_RM_FAILURE
65 if self._failure_level == FailureLevel.EC_FAILURE:
68 if self._failure_level in [FailureLevel.TASK_FAILURE,
69 FailureLevel.RM_FAILURE] and \
70 self._failure_policy == FailurePolicy.ABORT_ON_RM_FAILURE:
75 def set_rm_failure(self):
76 self._failure_level = FailureLevel.RM_FAILURE
78 def set_task_failure(self):
79 self._failure_level = FailureLevel.TASK_FAILURE
81 def set_ec_failure(self):
82 self._failure_level = FailureLevel.EC_FAILURE
84 class ECState(object):
85 """ State of the Experiment Controller
92 class ExperimentController(object):
94 .. class:: Class Args :
96 :param exp_id: Human readable identifier for the experiment scenario.
101 An experiment, or scenario, is defined by a concrete set of resources,
102 behavior, configuration and interconnection of those resources.
103 The Experiment Description (ED) is a detailed representation of a
104 single experiment. It contains all the necessary information to
105 allow repeating the experiment. NEPI allows to describe
106 experiments by registering components (resources), configuring them
107 and interconnecting them.
109 A same experiment (scenario) can be executed many times, generating
110 different results. We call an experiment execution (instance) a 'run'.
112 The ExperimentController (EC), is the entity responsible of
113 managing an experiment run. The same scenario can be
114 recreated (and re-run) by instantiating an EC and recreating
115 the same experiment description.
117 In NEPI, an experiment is represented as a graph of interconnected
118 resources. A resource is a generic concept in the sense that any
119 component taking part of an experiment, whether physical of
120 virtual, is considered a resource. A resources could be a host,
121 a virtual machine, an application, a simulator, a IP address.
123 A ResourceManager (RM), is the entity responsible for managing a
124 single resource. ResourceManagers are specific to a resource
125 type (i.e. An RM to control a Linux application will not be
126 the same as the RM used to control a ns-3 simulation).
127 To support a new type of resource in NEPI, a new RM must be
128 implemented. NEPI already provides a variety of
129 RMs to control basic resources, and new can be extended from
132 Through the EC interface the user can create ResourceManagers (RMs),
133 configure them and interconnect them, to describe an experiment.
134 Describing an experiment through the EC does not run the experiment.
135 Only when the 'deploy()' method is invoked on the EC, the EC will take
136 actions to transform the 'described' experiment into a 'running' experiment.
138 While the experiment is running, it is possible to continue to
139 create/configure/connect RMs, and to deploy them to involve new
140 resources in the experiment (this is known as 'interactive' deployment).
142 An experiments in NEPI is identified by a string id,
143 which is either given by the user, or automatically generated by NEPI.
144 The purpose of this identifier is to separate files and results that
145 belong to different experiment scenarios.
146 However, since a same 'experiment' can be run many times, the experiment
147 id is not enough to identify an experiment instance (run).
148 For this reason, the ExperimentController has two identifier, the
149 exp_id, which can be re-used in different ExperimentController,
150 and the run_id, which is unique to one ExperimentController instance, and
151 is automatically generated by NEPI.
155 def __init__(self, exp_id = None):
156 super(ExperimentController, self).__init__()
158 self._logger = logging.getLogger("ExperimentController")
160 # Run identifier. It identifies a concrete execution instance (run)
162 # Since a same experiment (same configuration) can be executed many
163 # times, this run_id permits to separate result files generated on
164 # different experiment executions
165 self._run_id = tsformat()
167 # Experiment identifier. Usually assigned by the user
168 # Identifies the experiment scenario (i.e. configuration,
169 # resources used, etc)
170 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
172 # generator of globally unique ids
173 self._guid_generator = guid.GuidGenerator()
176 self._resources = dict()
179 self._scheduler = HeapScheduler()
184 # RM groups (for deployment)
185 self._groups = dict()
187 # generator of globally unique id for groups
188 self._group_id_generator = guid.GuidGenerator()
190 # Event processing thread
191 self._cond = threading.Condition()
192 self._thread = threading.Thread(target = self._process)
193 self._thread.setDaemon(True)
196 # Flag to stop processing thread
199 # Entity in charge of managing system failures
200 self._fm = FailureManager()
203 self._state = ECState.RUNNING
207 """ Return the logger of the Experiment Controller
214 """ Return the state of the Experiment Controller
221 """ Return the experiment id assigned by the user
228 """ Return the experiment instance (run) identifier
235 return self._fm.abort
237 def set_rm_failure(self):
238 self._fm.set_rm_failure()
240 def wait_finished(self, guids):
241 """ Blocking method that wait until all RMs in the 'guid' list
242 reach a state >= STOPPED (i.e. FINISHED, STOPPED, FAILED or
243 RELEASED ) or until a System Failure occurs (e.g. Task Failure)
245 :param guids: List of guids
253 return self.wait(guids, state = ResourceState.STOPPED,
256 def wait_started(self, guids):
257 """ Blocking method that wait until all RMs in the 'guid' list
258 reach a state >= STARTED or until a System Failure occurs
261 :param guids: List of guids
268 return self.wait(guids, state = ResourceState.STARTED,
271 def wait_released(self, guids):
272 """ Blocking method that wait until all RMs in the 'guid' list
273 reach a state = RELEASED or until the EC fails
275 :param guids: List of guids
280 return self._state == ECState.FAILED
282 return self.wait(guids, state = ResourceState.RELEASED,
285 def wait_deployed(self, guids):
286 """ Blocking method that wait until all RMs in the 'guid' list
287 reach a state >= READY or until a System Failure occurs
290 :param guids: List of guids
297 return self.wait(guids, state = ResourceState.READY,
300 def wait(self, guids, state, quit):
301 """ Blocking method that wait until all RMs in the 'guid' list
302 reach a state >= 'state' or until quit yileds True
304 :param guids: List of guids
307 if isinstance(guids, int):
311 # If there are no more guids to wait for
312 # or the quit function returns True, exit the loop
313 if len(guids) == 0 or quit():
316 # If a guid reached one of the target states, remove it from list
318 rstate = self.state(guid)
324 hrstate = ResourceState2str.get(rstate)
325 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
326 guid, rstate, state))
330 def get_task(self, tid):
331 """ Get a specific task
333 :param tid: Id of the task
337 return self._tasks.get(tid)
339 def get_resource(self, guid):
340 """ Get a specific Resource Manager
342 :param guid: Id of the task
344 :rtype: ResourceManager
346 return self._resources.get(guid)
350 """ Returns the list of all the Resource Manager Id
355 return self._resources.keys()
357 def register_resource(self, rtype, guid = None):
358 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
359 for the RM of type 'rtype' and add it to the list of Resources.
361 :param rtype: Type of the RM
363 :return: Id of the RM
366 # Get next available guid
367 guid = self._guid_generator.next(guid)
370 rm = ResourceFactory.create(rtype, self, guid)
373 self._resources[guid] = rm
377 def get_attributes(self, guid):
378 """ Return all the attibutes of a specific RM
380 :param guid: Guid of the RM
382 :return: List of attributes
385 rm = self.get_resource(guid)
386 return rm.get_attributes()
388 def register_connection(self, guid1, guid2):
389 """ Registers a guid1 with a guid2.
390 The declaration order is not important
392 :param guid1: First guid to connect
393 :type guid1: ResourceManager
395 :param guid2: Second guid to connect
396 :type guid: ResourceManager
398 rm1 = self.get_resource(guid1)
399 rm2 = self.get_resource(guid2)
401 rm1.register_connection(guid2)
402 rm2.register_connection(guid1)
404 def register_condition(self, guids1, action, guids2, state,
406 """ Registers an action START or STOP for all RM on guids1 to occur
407 time 'time' after all elements in guids2 reached state 'state'.
409 :param guids1: List of guids of RMs subjected to action
412 :param action: Action to register (either START or STOP)
413 :type action: ResourceAction
415 :param guids2: List of guids of RMs to we waited for
418 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
419 :type state: ResourceState
421 :param time: Time to wait after guids2 has reached status
425 if isinstance(guids1, int):
427 if isinstance(guids2, int):
431 rm = self.get_resource(guid1)
432 rm.register_condition(action, guids2, state, time)
434 def enable_trace(self, guid, name):
437 :param name: Name of the trace
440 rm = self.get_resource(guid)
441 rm.enable_trace(name)
443 def trace_enabled(self, guid, name):
444 """ Returns True if trace is enabled
446 :param name: Name of the trace
449 rm = self.get_resource(guid)
450 return rm.trace_enabled(name)
452 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
453 """ Get information on collected trace
455 :param name: Name of the trace
458 :param attr: Can be one of:
459 - TraceAttr.ALL (complete trace content),
460 - TraceAttr.STREAM (block in bytes to read starting at offset),
461 - TraceAttr.PATH (full path to the trace file),
462 - TraceAttr.SIZE (size of trace file).
465 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
468 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
473 rm = self.get_resource(guid)
474 return rm.trace(name, attr, block, offset)
476 def discover(self, guid):
477 """ Discover a specific RM defined by its 'guid'
479 :param guid: Guid of the RM
483 rm = self.get_resource(guid)
486 def provision(self, guid):
487 """ Provision a specific RM defined by its 'guid'
489 :param guid: Guid of the RM
493 rm = self.get_resource(guid)
494 return rm.provision()
496 def get(self, guid, name):
497 """ Get a specific attribute 'name' from the RM 'guid'
499 :param guid: Guid of the RM
502 :param name: attribute's name
506 rm = self.get_resource(guid)
509 def set(self, guid, name, value):
510 """ Set a specific attribute 'name' from the RM 'guid'
511 with the value 'value'
513 :param guid: Guid of the RM
516 :param name: attribute's name
519 :param value: attribute's value
522 rm = self.get_resource(guid)
523 return rm.set(name, value)
525 def state(self, guid, hr = False):
526 """ Returns the state of a resource
528 :param guid: Resource guid
531 :param hr: Human readable. Forces return of a
532 status string instead of a number
536 rm = self.get_resource(guid)
540 return ResourceState2str.get(state)
544 def stop(self, guid):
545 """ Stop a specific RM defined by its 'guid'
547 :param guid: Guid of the RM
551 rm = self.get_resource(guid)
554 def start(self, guid):
555 """ Start a specific RM defined by its 'guid'
557 :param guid: Guid of the RM
561 rm = self.get_resource(guid)
564 def set_with_conditions(self, name, value, guids1, guids2, state,
566 """ Set value 'value' on attribute with name 'name' on all RMs of
567 guids1 when 'time' has elapsed since all elements in guids2
568 have reached state 'state'.
570 :param name: Name of attribute to set in RM
573 :param value: Value of attribute to set in RM
576 :param guids1: List of guids of RMs subjected to action
579 :param action: Action to register (either START or STOP)
580 :type action: ResourceAction
582 :param guids2: List of guids of RMs to we waited for
585 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
586 :type state: ResourceState
588 :param time: Time to wait after guids2 has reached status
592 if isinstance(guids1, int):
594 if isinstance(guids2, int):
598 rm = self.get_resource(guid)
599 rm.set_with_conditions(name, value, guids2, state, time)
601 def stop_with_conditions(self, guid):
602 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
604 :param guid: Guid of the RM
608 rm = self.get_resource(guid)
609 return rm.stop_with_conditions()
611 def start_with_conditions(self, guid):
612 """ Start a specific RM defined by its 'guid' only if all the conditions are true
614 :param guid: Guid of the RM
618 rm = self.get_resource(guid)
619 return rm.start_with_conditions()
621 def deploy(self, guids = None, wait_all_ready = True, group = None):
622 """ Deploy all resource manager in guids list
624 :param guids: List of guids of RMs to deploy
627 :param wait_all_ready: Wait until all RMs are ready in
628 order to start the RMs
631 :param group: Id of deployment group in which to deploy RMs
635 self.logger.debug(" ------- DEPLOY START ------ ")
638 # If no guids list was passed, all 'NEW' RMs will be deployed
640 for guid in self.resources:
641 if self.state(guid) == ResourceState.NEW:
644 if isinstance(guids, int):
647 # Create deployment group
648 # New guids can be added to a same deployment group later on
652 group = self._group_id_generator.next()
654 if group not in self._groups:
655 self._groups[group] = []
657 self._groups[group].extend(guids)
659 def wait_all_and_start(group):
660 # Function that checks if all resources are READY
661 # before scheduling a start_with_conditions for each RM
664 # Get all guids in group
665 guids = self._groups[group]
668 if self.state(guid) < ResourceState.READY:
673 callback = functools.partial(wait_all_and_start, group)
674 self.schedule("1s", callback)
676 # If all resources are read, we schedule the start
678 rm = self.get_resource(guid)
679 self.schedule("0s", rm.start_with_conditions)
681 if wait_all_ready and new_group:
682 # Schedule a function to check that all resources are
683 # READY, and only then schedule the start.
684 # This aims at reducing the number of tasks looping in the
686 # Instead of having many start tasks, we will have only one for
688 callback = functools.partial(wait_all_and_start, group)
689 self.schedule("1s", callback)
692 rm = self.get_resource(guid)
693 rm.deployment_group = group
694 self.schedule("0s", rm.deploy_with_conditions)
696 if not wait_all_ready:
697 self.schedule("1s", rm.start_with_conditions)
699 if rm.conditions.get(ResourceAction.STOP):
700 # Only if the RM has STOP conditions we
701 # schedule a stop. Otherwise the RM will stop immediately
702 self.schedule("2s", rm.stop_with_conditions)
704 def release(self, guids = None):
705 """ Release al RMs on the guids list or
706 all the resources if no list is specified
708 :param guids: List of RM guids
713 guids = self.resources
716 rm = self.get_resource(guid)
717 self.schedule("0s", rm.release)
719 self.wait_released(guids)
722 """ Shutdown the Experiment Controller.
723 Releases all the resources and stops task processing thread
726 # TODO: Clean the parallel runner!! STOP all ongoing tasks
731 # Mark the EC state as TERMINATED
732 self._state = ECState.TERMINATED
734 # Stop processing thread
737 # Notify condition to wake up the processing thread
740 if self._thread.is_alive():
743 def schedule(self, date, callback, track = False):
744 """ Schedule a callback to be executed at time date.
746 :param date: string containing execution time for the task.
747 It can be expressed as an absolute time, using
748 timestamp format, or as a relative time matching
749 ^\d+.\d+(h|m|s|ms|us)$
751 :param callback: code to be executed for the task. Must be a
752 Python function, and receives args and kwargs
755 :param track: if set to True, the task will be retrivable with
756 the get_task() method
758 :return : The Id of the task
760 timestamp = stabsformat(date)
761 task = Task(timestamp, callback)
762 task = self._scheduler.schedule(task)
765 self._tasks[task.id] = task
767 # Notify condition to wake up the processing thread
773 """ Process scheduled tasks.
777 The _process method is executed in an independent thread held by the
778 ExperimentController for as long as the experiment is running.
780 Tasks are scheduled by invoking the schedule method with a target callback.
781 The schedule method is given a execution time which controls the
782 order in which tasks are processed.
784 Tasks are processed in parallel using multithreading.
785 The environmental variable NEPI_NTHREADS can be used to control
786 the number of threads used to process tasks. The default value is 50.
790 To execute tasks in parallel, an ParallelRunner (PR) object, holding
791 a pool of threads (workers), is used.
792 For each available thread in the PR, the next task popped from
793 the scheduler queue is 'put' in the PR.
794 Upon receiving a task to execute, each PR worker (thread) invokes the
795 _execute method of the EC, passing the task as argument.
796 This method, calls task.callback inside a try/except block. If an
797 exception is raised by the tasks.callback, it will be trapped by the
798 try block, logged to standard error (usually the console), and the EC
799 state will be set to ECState.FAILED.
800 The invocation of _notify immediately after, forces the processing
801 loop in the _process method, to wake up if it was blocked waiting for new
802 tasks to arrived, and to check the EC state.
803 As the EC is in FAILED state, the processing loop exits and the
804 'finally' block is invoked. In the 'finally' block, the 'sync' method
805 of the PR is invoked, which forces the PR to raise any unchecked errors
806 that might have been raised by the workers.
809 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
811 runner = ParallelRun(maxthreads = nthreads)
814 while not self._stop:
818 task = self._scheduler.next()
821 # No task to execute. Wait for a new task to be scheduled.
824 # The task timestamp is in the future. Wait for timeout
825 # or until another task is scheduled.
827 if now < task.timestamp:
828 # Calculate timeout in seconds
829 timeout = tdiffsec(task.timestamp, now)
831 # Re-schedule task with the same timestamp
832 self._scheduler.schedule(task)
836 # Wait timeout or until a new task awakes the condition
837 self._cond.wait(timeout)
842 # Process tasks in parallel
843 runner.put(self._execute, task)
846 err = traceback.format_exc()
847 self.logger.error("Error while processing tasks in the EC: %s" % err)
849 # Set the EC to FAILED state
850 self._state = ECState.FAILED
852 # Set the FailureManager failure level
853 self._fm.set_ec_failure()
855 self.logger.debug("Exiting the task processing loop ... ")
859 def _execute(self, task):
860 """ Executes a single task.
862 :param task: Object containing the callback to execute
867 If the invokation of the task callback raises an
868 exception, the processing thread of the ExperimentController
869 will be stopped and the experiment will be aborted.
873 task.status = TaskStatus.DONE
876 task.result = task.callback()
879 err = traceback.format_exc()
881 task.status = TaskStatus.ERROR
883 self.logger.error("Error occurred while executing task: %s" % err)
885 # Set the FailureManager failure level
886 self._fm.set_task_failure()
889 """ Awakes the processing thread in case it is blocked waiting
890 for a new task to be scheduled.