2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Human readable identifier for the experiment scenario.
52 It will be used in the name of the directory
53 where experiment related information is stored
58 An experiment, or scenario, is defined by a concrete use, behavior,
59 configuration and interconnection of resources that describe a single
60 experiment case (We call this the experiment description).
61 A same experiment (scenario) can be run many times.
63 The ExperimentController (EC), is the entity responsible for
64 managing an experiment instance (run). The same scenario can be
65 recreated (and re-run) by instantiating an EC and recreating
66 the same experiment description.
68 In NEPI, an experiment is represented as a graph of interconnected
69 resources. A resource is a generic concept in the sense that any
70 component taking part of an experiment, whether physical of
71 virtual, is considered a resource. A resources could be a host,
72 a virtual machine, an application, a simulator, a IP address.
74 A ResourceManager (RM), is the entity responsible for managing a
75 single resource. ResourceManagers are specific to a resource
76 type (i.e. An RM to control a Linux application will not be
77 the same as the RM used to control a ns-3 simulation).
78 In order for a new type of resource to be supported in NEPI
79 a new RM must be implemented. NEPI already provides different
80 RMs to control basic resources, and new can be extended from
83 Through the EC interface the user can create ResourceManagers (RMs),
84 configure them and interconnect them, in order to describe an experiment.
85 Describing an experiment through the EC does not run the experiment.
86 Only when the 'deploy()' method is invoked on the EC, will the EC take
87 actions to transform the 'described' experiment into a 'running' experiment.
89 While the experiment is running, it is possible to continue to
90 create/configure/connect RMs, and to deploy them to involve new
91 resources in the experiment (this is known as 'interactive' deployment).
93 An experiments in NEPI is identified by a string id,
94 which is either given by the user, or automatically generated by NEPI.
95 The purpose of this identifier is to separate files and results that
96 belong to different experiment scenarios.
97 However, since a same 'experiment' can be run many times, the experiment
98 id is not enough to identify an experiment instance (run).
99 For this reason, the ExperimentController has two identifier, the
100 exp_id, which can be re-used by different ExperimentController instances,
101 and the run_id, which unique to a ExperimentController instance, and
102 is automatically generated by NEPI.
106 def __init__(self, exp_id = None):
107 super(ExperimentController, self).__init__()
109 self._logger = logging.getLogger("ExperimentController")
111 # Run identifier. It identifies a concrete instance (run) of an experiment.
112 # Since a same experiment (same configuration) can be run many times,
113 # this id permits to identify concrete exoeriment run
114 self._run_id = tsformat()
116 # Experiment identifier. Usually assigned by the user
117 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
119 # generator of globally unique ids
120 self._guid_generator = guid.GuidGenerator()
123 self._resources = dict()
126 self._scheduler = HeapScheduler()
132 self._groups = dict()
134 # generator of globally unique id for groups
135 self._group_id_generator = guid.GuidGenerator()
137 # Event processing thread
138 self._cond = threading.Condition()
139 self._thread = threading.Thread(target = self._process)
140 self._thread.setDaemon(True)
144 self._state = ECState.RUNNING
148 """ Return the logger of the Experiment Controller
155 """ Return the state of the Experiment Controller
162 """ Return the experiment id assigned by the user
169 """ Return the experiment instance (run) identifier
176 """ Put the state of the Experiment Controller into a final state :
177 Either TERMINATED or FAILED
180 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
182 def wait_finished(self, guids):
183 """ Blocking method that wait until all the RM from the 'guid' list
184 reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
186 :param guids: List of guids
189 return self.wait(guids)
191 def wait_started(self, guids):
192 """ Blocking method that wait until all the RM from the 'guid' list
193 reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
195 :param guids: List of guids
198 return self.wait(guids, state = ResourceState.STARTED)
200 def wait_released(self, guids):
201 """ Blocking method that wait until all the RM from the 'guid' list
202 reached the state RELEASED (or FAILED)
204 :param guids: List of guids
207 # TODO: solve state concurrency BUG and !!!!
208 # correct waited release state to state = ResourceState.FAILED)
209 return self.wait(guids, state = ResourceState.FINISHED)
211 def wait_deployed(self, guids):
212 """ Blocking method that wait until all the RM from the 'guid' list
213 reached the state READY (or any higher state)
215 :param guids: List of guids
218 return self.wait(guids, state = ResourceState.READY)
220 def wait(self, guids, state = ResourceState.STOPPED):
221 """ Blocking method that waits until all the RM from the 'guid' list
222 reached state 'state' or until a failure occurs
224 :param guids: List of guids
227 if isinstance(guids, int):
230 # we randomly alter the order of the guids to avoid ordering
231 # dependencies (e.g. LinuxApplication RMs runing on the same
232 # linux host will be synchronized by the LinuxNode SSH lock)
233 random.shuffle(guids)
236 # If no more guids to wait for or an error occured, then exit
237 if len(guids) == 0 or self.finished:
240 # If a guid reached one of the target states, remove it from list
242 rstate = self.state(guid)
248 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
249 self.state(guid, hr = True), state))
251 # Take the opportunity to 'refresh' the states of the RMs.
252 # Query only the first up to N guids (not to overwhelm
255 lim = n if len(guids) > n else ( len(guids) -1 )
256 nguids = guids[0: lim]
258 # schedule state request for all guids (take advantage of
259 # scheduler multi threading).
261 callback = functools.partial(self.state, guid)
262 self.schedule("0s", callback)
264 # If the guid is not in one of the target states, wait and
265 # continue quering. We keep the sleep big to decrease the
266 # number of RM state queries
269 def get_task(self, tid):
270 """ Get a specific task
272 :param tid: Id of the task
276 return self._tasks.get(tid)
278 def get_resource(self, guid):
279 """ Get a specific Resource Manager
281 :param guid: Id of the task
283 :rtype: ResourceManager
285 return self._resources.get(guid)
289 """ Returns the list of all the Resource Manager Id
294 return self._resources.keys()
296 def register_resource(self, rtype, guid = None):
297 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
298 for the RM of type 'rtype' and add it to the list of Resources.
300 :param rtype: Type of the RM
302 :return: Id of the RM
305 # Get next available guid
306 guid = self._guid_generator.next(guid)
309 rm = ResourceFactory.create(rtype, self, guid)
312 self._resources[guid] = rm
316 def get_attributes(self, guid):
317 """ Return all the attibutes of a specific RM
319 :param guid: Guid of the RM
321 :return: List of attributes
324 rm = self.get_resource(guid)
325 return rm.get_attributes()
327 def register_connection(self, guid1, guid2):
328 """ Registers a guid1 with a guid2.
329 The declaration order is not important
331 :param guid1: First guid to connect
332 :type guid1: ResourceManager
334 :param guid2: Second guid to connect
335 :type guid: ResourceManager
337 rm1 = self.get_resource(guid1)
338 rm2 = self.get_resource(guid2)
340 rm1.register_connection(guid2)
341 rm2.register_connection(guid1)
343 def register_condition(self, guids1, action, guids2, state,
345 """ Registers an action START or STOP for all RM on guids1 to occur
346 time 'time' after all elements in guids2 reached state 'state'.
348 :param guids1: List of guids of RMs subjected to action
351 :param action: Action to register (either START or STOP)
352 :type action: ResourceAction
354 :param guids2: List of guids of RMs to we waited for
357 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
358 :type state: ResourceState
360 :param time: Time to wait after guids2 has reached status
364 if isinstance(guids1, int):
366 if isinstance(guids2, int):
370 rm = self.get_resource(guid1)
371 rm.register_condition(action, guids2, state, time)
373 def enable_trace(self, guid, name):
376 :param name: Name of the trace
379 rm = self.get_resource(guid)
380 rm.enable_trace(name)
382 def trace_enabled(self, guid, name):
383 """ Returns True if trace is enabled
385 :param name: Name of the trace
388 rm = self.get_resource(guid)
389 return rm.trace_enabled(name)
391 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
392 """ Get information on collected trace
394 :param name: Name of the trace
397 :param attr: Can be one of:
398 - TraceAttr.ALL (complete trace content),
399 - TraceAttr.STREAM (block in bytes to read starting at offset),
400 - TraceAttr.PATH (full path to the trace file),
401 - TraceAttr.SIZE (size of trace file).
404 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
407 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
412 rm = self.get_resource(guid)
413 return rm.trace(name, attr, block, offset)
415 def discover(self, guid):
416 """ Discover a specific RM defined by its 'guid'
418 :param guid: Guid of the RM
422 rm = self.get_resource(guid)
425 def provision(self, guid):
426 """ Provision a specific RM defined by its 'guid'
428 :param guid: Guid of the RM
432 rm = self.get_resource(guid)
433 return rm.provision()
435 def get(self, guid, name):
436 """ Get a specific attribute 'name' from the RM 'guid'
438 :param guid: Guid of the RM
441 :param name: attribute's name
445 rm = self.get_resource(guid)
448 def set(self, guid, name, value):
449 """ Set a specific attribute 'name' from the RM 'guid'
450 with the value 'value'
452 :param guid: Guid of the RM
455 :param name: attribute's name
458 :param value: attribute's value
461 rm = self.get_resource(guid)
462 return rm.set(name, value)
464 def state(self, guid, hr = False):
465 """ Returns the state of a resource
467 :param guid: Resource guid
470 :param hr: Human readable. Forces return of a
471 status string instead of a number
475 rm = self.get_resource(guid)
479 return ResourceState2str.get(state)
483 def stop(self, guid):
484 """ Stop a specific RM defined by its 'guid'
486 :param guid: Guid of the RM
490 rm = self.get_resource(guid)
493 def start(self, guid):
494 """ Start a specific RM defined by its 'guid'
496 :param guid: Guid of the RM
500 rm = self.get_resource(guid)
503 def set_with_conditions(self, name, value, guids1, guids2, state,
505 """ Set value 'value' on attribute with name 'name' on all RMs of
506 guids1 when 'time' has elapsed since all elements in guids2
507 have reached state 'state'.
509 :param name: Name of attribute to set in RM
512 :param value: Value of attribute to set in RM
515 :param guids1: List of guids of RMs subjected to action
518 :param action: Action to register (either START or STOP)
519 :type action: ResourceAction
521 :param guids2: List of guids of RMs to we waited for
524 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
525 :type state: ResourceState
527 :param time: Time to wait after guids2 has reached status
531 if isinstance(guids1, int):
533 if isinstance(guids2, int):
537 rm = self.get_resource(guid)
538 rm.set_with_conditions(name, value, guids2, state, time)
540 def stop_with_conditions(self, guid):
541 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
543 :param guid: Guid of the RM
547 rm = self.get_resource(guid)
548 return rm.stop_with_conditions()
550 def start_with_conditions(self, guid):
551 """ Start a specific RM defined by its 'guid' only if all the conditions are true
553 :param guid: Guid of the RM
557 rm = self.get_resource(guid)
558 return rm.start_with_conditions()
560 def deploy(self, guids = None, wait_all_ready = True, group = None):
561 """ Deploy all resource manager in guids list
563 :param guids: List of guids of RMs to deploy
566 :param wait_all_ready: Wait until all RMs are ready in
567 order to start the RMs
570 :param group: Id of deployment group in which to deploy RMs
574 self.logger.debug(" ------- DEPLOY START ------ ")
577 # If no guids list was indicated, all 'NEW' RMs will be deployed
579 for guid in self.resources:
580 if self.state(guid) == ResourceState.NEW:
583 if isinstance(guids, int):
586 # Create deployment group
590 group = self._group_id_generator.next()
592 if group not in self._groups:
593 self._groups[group] = []
595 self._groups[group].extend(guids)
597 # Before starting deployment we disorder the guids list with the
598 # purpose of speeding up the whole deployment process.
599 # It is likely that the user inserted in the 'guids' list closely
600 # resources one after another (e.g. all applications
601 # connected to the same node can likely appear one after another).
602 # This can originate a slow down in the deployment since the N
603 # threads the parallel runner uses to processes tasks may all
604 # be taken up by the same family of resources waiting for the
605 # same conditions (e.g. LinuxApplications running on a same
606 # node share a single lock, so they will tend to be serialized).
607 # If we disorder the guids list, this problem can be mitigated.
608 random.shuffle(guids)
610 def wait_all_and_start(group):
613 # Get all guids in group
614 guids = self._groups[group]
617 if self.state(guid) < ResourceState.READY:
622 callback = functools.partial(wait_all_and_start, group)
623 self.schedule("1s", callback)
625 # If all resources are read, we schedule the start
627 rm = self.get_resource(guid)
628 self.schedule("0s", rm.start_with_conditions)
630 if wait_all_ready and new_group:
631 # Schedule a function to check that all resources are
632 # READY, and only then schedule the start.
633 # This aimes at reducing the number of tasks looping in the
635 # Intead of having N start tasks, we will have only one for
637 callback = functools.partial(wait_all_and_start, group)
638 self.schedule("1s", callback)
641 rm = self.get_resource(guid)
642 rm.deployment_group = group
643 self.schedule("0s", rm.deploy_with_conditions)
645 if not wait_all_ready:
646 self.schedule("1s", rm.start_with_conditions)
648 if rm.conditions.get(ResourceAction.STOP):
649 # Only if the RM has STOP conditions we
650 # schedule a stop. Otherwise the RM will stop immediately
651 self.schedule("2s", rm.stop_with_conditions)
653 def release(self, guids = None):
654 """ Release al RMs on the guids list or
655 all the resources if no list is specified
657 :param guids: List of RM guids
662 guids = self.resources
665 rm = self.get_resource(guid)
666 self.schedule("0s", rm.release)
668 self.wait_released(guids)
671 """ Shutdown the Experiment Controller.
672 Releases all the resources and stops task processing thread
677 # Mark the EC state as TERMINATED
678 self._state = ECState.TERMINATED
680 # Notify condition to wake up the processing thread
683 if self._thread.is_alive():
686 def schedule(self, date, callback, track = False):
687 """ Schedule a callback to be executed at time date.
689 :param date: string containing execution time for the task.
690 It can be expressed as an absolute time, using
691 timestamp format, or as a relative time matching
692 ^\d+.\d+(h|m|s|ms|us)$
694 :param callback: code to be executed for the task. Must be a
695 Python function, and receives args and kwargs
698 :param track: if set to True, the task will be retrivable with
699 the get_task() method
701 :return : The Id of the task
703 timestamp = stabsformat(date)
704 task = Task(timestamp, callback)
705 task = self._scheduler.schedule(task)
708 self._tasks[task.id] = task
710 # Notify condition to wake up the processing thread
716 """ Process scheduled tasks.
720 The _process method is executed in an independent thread held by the
721 ExperimentController for as long as the experiment is running.
723 Tasks are scheduled by invoking the schedule method with a target callback.
724 The schedule method is given a execution time which controls the
725 order in which tasks are processed.
727 Tasks are processed in parallel using multithreading.
728 The environmental variable NEPI_NTHREADS can be used to control
729 the number of threads used to process tasks. The default value is 50.
733 To execute tasks in parallel, an ParallelRunner (PR) object, holding
734 a pool of threads (workers), is used.
735 For each available thread in the PR, the next task popped from
736 the scheduler queue is 'put' in the PR.
737 Upon receiving a task to execute, each PR worker (thread) invokes the
738 _execute method of the EC, passing the task as argument.
739 This method, calls task.callback inside a try/except block. If an
740 exception is raised by the tasks.callback, it will be trapped by the
741 try block, logged to standard error (usually the console), and the EC
742 state will be set to ECState.FAILED.
743 The invocation of _notify immediately after, forces the processing
744 loop in the _process method, to wake up if it was blocked waiting for new
745 tasks to arrived, and to check the EC state.
746 As the EC is in FAILED state, the processing loop exits and the
747 'finally' block is invoked. In the 'finally' block, the 'sync' method
748 of the PR is invoked, which forces the PR to raise any unchecked errors
749 that might have been raised by the workers.
752 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
754 runner = ParallelRun(maxthreads = nthreads)
758 while not self.finished:
761 task = self._scheduler.next()
764 # No task to execute. Wait for a new task to be scheduled.
767 # The task timestamp is in the future. Wait for timeout
768 # or until another task is scheduled.
770 if now < task.timestamp:
771 # Calculate timeout in seconds
772 timeout = tdiffsec(task.timestamp, now)
774 # Re-schedule task with the same timestamp
775 self._scheduler.schedule(task)
779 # Wait timeout or until a new task awakes the condition
780 self._cond.wait(timeout)
785 # Process tasks in parallel
786 runner.put(self._execute, task)
789 err = traceback.format_exc()
790 self.logger.error("Error while processing tasks in the EC: %s" % err)
792 self._state = ECState.FAILED
794 self.logger.debug("Exiting the task processing loop ... ")
798 def _execute(self, task):
799 """ Executes a single task.
801 :param task: Object containing the callback to execute
806 If the invokation of the task callback raises an
807 exception, the processing thread of the ExperimentController
808 will be stopped and the experiment will be aborted.
812 task.status = TaskStatus.DONE
815 task.result = task.callback()
818 err = traceback.format_exc()
820 task.status = TaskStatus.ERROR
822 self.logger.error("Error occurred while executing task: %s" % err)
824 # Set the EC to FAILED state (this will force to exit the task
826 self._state = ECState.FAILED
828 # Notify condition to wake up the processing thread
831 # Propage error to the ParallelRunner
835 """ Awakes the processing thread in case it is blocked waiting
836 for a new task to be scheduled.