2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
40 class ECState(object):
41 """ State of the Experiment Controller
48 class ExperimentController(object):
50 .. class:: Class Args :
52 :param exp_id: Human readable identifier for the experiment scenario.
53 It will be used in the name of the directory
54 where experiment related information is stored
59 An experiment, or scenario, is defined by a concrete use, behavior,
60 configuration and interconnection of resources that describe a single
61 experiment case (We call this the experiment description).
62 A same experiment (scenario) can be run many times.
64 The ExperimentController (EC), is the entity responsible for
65 managing an experiment instance (run). The same scenario can be
66 recreated (and re-run) by instantiating an EC and recreating
67 the same experiment description.
69 In NEPI, an experiment is represented as a graph of interconnected
70 resources. A resource is a generic concept in the sense that any
71 component taking part of an experiment, whether physical of
72 virtual, is considered a resource. A resources could be a host,
73 a virtual machine, an application, a simulator, a IP address.
75 A ResourceManager (RM), is the entity responsible for managing a
76 single resource. ResourceManagers are specific to a resource
77 type (i.e. An RM to control a Linux application will not be
78 the same as the RM used to control a ns-3 simulation).
79 In order for a new type of resource to be supported in NEPI
80 a new RM must be implemented. NEPI already provides different
81 RMs to control basic resources, and new can be extended from
84 Through the EC interface the user can create ResourceManagers (RMs),
85 configure them and interconnect them, in order to describe an experiment.
86 Describing an experiment through the EC does not run the experiment.
87 Only when the 'deploy()' method is invoked on the EC, will the EC take
88 actions to transform the 'described' experiment into a 'running' experiment.
90 While the experiment is running, it is possible to continue to
91 create/configure/connect RMs, and to deploy them to involve new
92 resources in the experiment (this is known as 'interactive' deployment).
94 An experiments in NEPI is identified by a string id,
95 which is either given by the user, or automatically generated by NEPI.
96 The purpose of this identifier is to separate files and results that
97 belong to different experiment scenarios.
98 However, since a same 'experiment' can be run many times, the experiment
99 id is not enough to identify an experiment instance (run).
100 For this reason, the ExperimentController has two identifier, the
101 exp_id, which can be re-used by different ExperimentController instances,
102 and the run_id, which unique to a ExperimentController instance, and
103 is automatically generated by NEPI.
107 def __init__(self, exp_id = None):
108 super(ExperimentController, self).__init__()
109 # root directory to store files
111 # Run identifier. It identifies a concrete instance (run) of an experiment.
112 # Since a same experiment (same configuration) can be run many times,
113 # this id permits to identify concrete exoeriment run
114 self._run_id = tsformat()
116 # Experiment identifier. Usually assigned by the user
117 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
119 # generator of globally unique ids
120 self._guid_generator = guid.GuidGenerator()
123 self._resources = dict()
126 self._scheduler = HeapScheduler()
131 # Event processing thread
132 self._cond = threading.Condition()
133 self._thread = threading.Thread(target = self._process)
134 self._thread.setDaemon(True)
138 self._state = ECState.RUNNING
141 self._logger = logging.getLogger("ExperimentController")
145 """ Return the logger of the Experiment Controller
152 """ Return the state of the Experiment Controller
159 """ Return the experiment id assigned by the user
166 """ Return the experiment instance (run) identifier
173 """ Put the state of the Experiment Controller into a final state :
174 Either TERMINATED or FAILED
177 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
179 def wait_finished(self, guids):
180 """ Blocking method that wait until all the RM from the 'guid' list
181 reached the state FINISHED
183 :param guids: List of guids
186 return self.wait(guids)
188 def wait_started(self, guids):
189 """ Blocking method that wait until all the RM from the 'guid' list
190 reached the state STARTED
192 :param guids: List of guids
195 return self.wait(guids, states = [ResourceState.STARTED,
196 ResourceState.STOPPED,
197 ResourceState.FAILED,
198 ResourceState.FINISHED])
200 def wait_released(self, guids):
201 """ Blocking method that wait until all the RM from the 'guid' list
202 reached the state RELEASED
204 :param guids: List of guids
207 return self.wait(guids, states = [ResourceState.RELEASED,
208 ResourceState.STOPPED,
209 ResourceState.FAILED,
210 ResourceState.FINISHED])
212 def wait(self, guids, states = [ResourceState.FINISHED,
213 ResourceState.FAILED,
214 ResourceState.STOPPED]):
215 """ Blocking method that waits until all the RM from the 'guid' list
216 reached state 'state' or until a failure occurs
218 :param guids: List of guids
221 if isinstance(guids, int):
224 # we randomly alter the order of the guids to avoid ordering
225 # dependencies (e.g. LinuxApplication RMs runing on the same
226 # linux host will be synchronized by the LinuxNode SSH lock)
227 random.shuffle(guids)
230 # If no more guids to wait for or an error occured, then exit
231 if len(guids) == 0 or self.finished:
234 # If a guid reached one of the target states, remove it from list
236 state = self.state(guid)
242 self.logger.debug(" WAITING FOR %g - state %s " % (guid,
243 self.state(guid, hr = True)))
245 # Take the opportunity to 'refresh' the states of the RMs.
246 # Query only the first up to N guids (not to overwhelm
249 lim = n if len(guids) > n else ( len(guids) -1 )
250 nguids = guids[0: lim]
252 # schedule state request for all guids (take advantage of
253 # scheduler multi threading).
255 callback = functools.partial(self.state, guid)
256 self.schedule("0s", callback)
258 # If the guid is not in one of the target states, wait and
259 # continue quering. We keep the sleep big to decrease the
260 # number of RM state queries
263 def get_task(self, tid):
264 """ Get a specific task
266 :param tid: Id of the task
270 return self._tasks.get(tid)
272 def get_resource(self, guid):
273 """ Get a specific Resource Manager
275 :param guid: Id of the task
277 :rtype: ResourceManager
279 return self._resources.get(guid)
283 """ Returns the list of all the Resource Manager Id
288 return self._resources.keys()
290 def register_resource(self, rtype, guid = None):
291 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
292 for the RM of type 'rtype' and add it to the list of Resources.
294 :param rtype: Type of the RM
296 :return: Id of the RM
299 # Get next available guid
300 guid = self._guid_generator.next(guid)
303 rm = ResourceFactory.create(rtype, self, guid)
306 self._resources[guid] = rm
310 def get_attributes(self, guid):
311 """ Return all the attibutes of a specific RM
313 :param guid: Guid of the RM
315 :return: List of attributes
318 rm = self.get_resource(guid)
319 return rm.get_attributes()
321 def register_connection(self, guid1, guid2):
322 """ Registers a guid1 with a guid2.
323 The declaration order is not important
325 :param guid1: First guid to connect
326 :type guid1: ResourceManager
328 :param guid2: Second guid to connect
329 :type guid: ResourceManager
331 rm1 = self.get_resource(guid1)
332 rm2 = self.get_resource(guid2)
334 rm1.register_connection(guid2)
335 rm2.register_connection(guid1)
337 def register_condition(self, group1, action, group2, state,
339 """ Registers an action START or STOP for all RM on group1 to occur
340 time 'time' after all elements in group2 reached state 'state'.
342 :param group1: List of guids of RMs subjected to action
345 :param action: Action to register (either START or STOP)
346 :type action: ResourceAction
348 :param group2: List of guids of RMs to we waited for
351 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
352 :type state: ResourceState
354 :param time: Time to wait after group2 has reached status
358 if isinstance(group1, int):
360 if isinstance(group2, int):
364 rm = self.get_resource(guid1)
365 rm.register_condition(action, group2, state, time)
367 def enable_trace(self, guid, name):
370 :param name: Name of the trace
373 rm = self.get_resource(guid)
374 rm.enable_trace(name)
376 def trace_enabled(self, guid, name):
377 """ Returns True if trace is enabled
379 :param name: Name of the trace
382 rm = self.get_resource(guid)
383 return rm.trace_enabled(name)
385 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
386 """ Get information on collected trace
388 :param name: Name of the trace
391 :param attr: Can be one of:
392 - TraceAttr.ALL (complete trace content),
393 - TraceAttr.STREAM (block in bytes to read starting at offset),
394 - TraceAttr.PATH (full path to the trace file),
395 - TraceAttr.SIZE (size of trace file).
398 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
401 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
406 rm = self.get_resource(guid)
407 return rm.trace(name, attr, block, offset)
409 def discover(self, guid):
410 """ Discover a specific RM defined by its 'guid'
412 :param guid: Guid of the RM
416 rm = self.get_resource(guid)
419 def provision(self, guid):
420 """ Provision a specific RM defined by its 'guid'
422 :param guid: Guid of the RM
426 rm = self.get_resource(guid)
427 return rm.provision()
429 def get(self, guid, name):
430 """ Get a specific attribute 'name' from the RM 'guid'
432 :param guid: Guid of the RM
435 :param name: attribute's name
439 rm = self.get_resource(guid)
442 def set(self, guid, name, value):
443 """ Set a specific attribute 'name' from the RM 'guid'
444 with the value 'value'
446 :param guid: Guid of the RM
449 :param name: attribute's name
452 :param value: attribute's value
455 rm = self.get_resource(guid)
456 return rm.set(name, value)
458 def state(self, guid, hr = False):
459 """ Returns the state of a resource
461 :param guid: Resource guid
464 :param hr: Human readable. Forces return of a
465 status string instead of a number
469 rm = self.get_resource(guid)
473 return ResourceState2str.get(state)
477 def stop(self, guid):
478 """ Stop a specific RM defined by its 'guid'
480 :param guid: Guid of the RM
484 rm = self.get_resource(guid)
487 def start(self, guid):
488 """ Start a specific RM defined by its 'guid'
490 :param guid: Guid of the RM
494 rm = self.get_resource(guid)
497 def set_with_conditions(self, name, value, group1, group2, state,
499 """ Set value 'value' on attribute with name 'name' on all RMs of
500 group1 when 'time' has elapsed since all elements in group2
501 have reached state 'state'.
503 :param name: Name of attribute to set in RM
506 :param value: Value of attribute to set in RM
509 :param group1: List of guids of RMs subjected to action
512 :param action: Action to register (either START or STOP)
513 :type action: ResourceAction
515 :param group2: List of guids of RMs to we waited for
518 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
519 :type state: ResourceState
521 :param time: Time to wait after group2 has reached status
525 if isinstance(group1, int):
527 if isinstance(group2, int):
531 rm = self.get_resource(guid)
532 rm.set_with_conditions(name, value, group2, state, time)
534 def stop_with_conditions(self, guid):
535 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
537 :param guid: Guid of the RM
541 rm = self.get_resource(guid)
542 return rm.stop_with_conditions()
544 def start_with_conditions(self, guid):
545 """ Start a specific RM defined by its 'guid' only if all the conditions are true
547 :param guid: Guid of the RM
551 rm = self.get_resource(guid)
552 return rm.start_with_conditions()
554 def deploy(self, group = None, wait_all_ready = True):
555 """ Deploy all resource manager in group
557 :param group: List of guids of RMs to deploy
560 :param wait_all_ready: Wait until all RMs are ready in
561 order to start the RMs
565 self.logger.debug(" ------- DEPLOY START ------ ")
568 # By default, if not deployment group is indicated,
569 # all RMs that are undeployed will be deployed
571 for guid in self.resources:
572 if self.state(guid) == ResourceState.NEW:
575 if isinstance(group, int):
578 # Before starting deployment we disorder the group list with the
579 # purpose of speeding up the whole deployment process.
580 # It is likely that the user inserted in the 'group' list closely
581 # resources one after another (e.g. all applications
582 # connected to the same node can likely appear one after another).
583 # This can originate a slow down in the deployment since the N
584 # threads the parallel runner uses to processes tasks may all
585 # be taken up by the same family of resources waiting for the
586 # same conditions (e.g. LinuxApplications running on a same
587 # node share a single lock, so they will tend to be serialized).
588 # If we disorder the group list, this problem can be mitigated.
589 random.shuffle(group)
591 def wait_all_and_start(group):
594 if self.state(guid) < ResourceState.READY:
599 callback = functools.partial(wait_all_and_start, group)
600 self.schedule("1s", callback)
602 # If all resources are read, we schedule the start
604 rm = self.get_resource(guid)
605 self.schedule("0s", rm.start_with_conditions)
608 # Schedule the function that will check all resources are
609 # READY, and only then it will schedule the start.
610 # This is aimed to reduce the number of tasks looping in the scheduler.
611 # Intead of having N start tasks, we will have only one
612 callback = functools.partial(wait_all_and_start, group)
613 self.schedule("1s", callback)
616 rm = self.get_resource(guid)
617 self.schedule("0s", rm.deploy)
619 if not wait_all_ready:
620 self.schedule("1s", rm.start_with_conditions)
622 if rm.conditions.get(ResourceAction.STOP):
623 # Only if the RM has STOP conditions we
624 # schedule a stop. Otherwise the RM will stop immediately
625 self.schedule("2s", rm.stop_with_conditions)
627 def release(self, group = None):
628 """ Release the elements of the list 'group' or
629 all the resources if any group is specified
631 :param group: List of RM
636 group = self.resources
639 rm = self.get_resource(guid)
640 self.schedule("0s", rm.release)
642 self.wait_released(group)
645 """ Shutdown the Experiment Controller.
646 Releases all the resources and stops task processing thread
651 # Mark the EC state as TERMINATED
652 self._state = ECState.TERMINATED
654 # Notify condition to wake up the processing thread
657 if self._thread.is_alive():
660 def schedule(self, date, callback, track = False):
661 """ Schedule a callback to be executed at time date.
663 :param date: string containing execution time for the task.
664 It can be expressed as an absolute time, using
665 timestamp format, or as a relative time matching
666 ^\d+.\d+(h|m|s|ms|us)$
668 :param callback: code to be executed for the task. Must be a
669 Python function, and receives args and kwargs
672 :param track: if set to True, the task will be retrivable with
673 the get_task() method
675 :return : The Id of the task
677 timestamp = stabsformat(date)
678 task = Task(timestamp, callback)
679 task = self._scheduler.schedule(task)
682 self._tasks[task.id] = task
684 # Notify condition to wake up the processing thread
690 """ Process scheduled tasks.
694 The _process method is executed in an independent thread held by the
695 ExperimentController for as long as the experiment is running.
697 Tasks are scheduled by invoking the schedule method with a target callback.
698 The schedule method is given a execution time which controls the
699 order in which tasks are processed.
701 Tasks are processed in parallel using multithreading.
702 The environmental variable NEPI_NTHREADS can be used to control
703 the number of threads used to process tasks. The default value is 50.
707 To execute tasks in parallel, an ParallelRunner (PR) object, holding
708 a pool of threads (workers), is used.
709 For each available thread in the PR, the next task popped from
710 the scheduler queue is 'put' in the PR.
711 Upon receiving a task to execute, each PR worker (thread) invokes the
712 _execute method of the EC, passing the task as argument.
713 This method, calls task.callback inside a try/except block. If an
714 exception is raised by the tasks.callback, it will be trapped by the
715 try block, logged to standard error (usually the console), and the EC
716 state will be set to ECState.FAILED.
717 The invocation of _notify immediately after, forces the processing
718 loop in the _process method, to wake up if it was blocked waiting for new
719 tasks to arrived, and to check the EC state.
720 As the EC is in FAILED state, the processing loop exits and the
721 'finally' block is invoked. In the 'finally' block, the 'sync' method
722 of the PR is invoked, which forces the PR to raise any unchecked errors
723 that might have been raised by the workers.
726 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
728 runner = ParallelRun(maxthreads = nthreads)
732 while not self.finished:
735 task = self._scheduler.next()
738 # No task to execute. Wait for a new task to be scheduled.
741 # The task timestamp is in the future. Wait for timeout
742 # or until another task is scheduled.
744 if now < task.timestamp:
745 # Calculate timeout in seconds
746 timeout = tdiffsec(task.timestamp, now)
748 # Re-schedule task with the same timestamp
749 self._scheduler.schedule(task)
753 # Wait timeout or until a new task awakes the condition
754 self._cond.wait(timeout)
759 # Process tasks in parallel
760 runner.put(self._execute, task)
763 err = traceback.format_exc()
764 self.logger.error("Error while processing tasks in the EC: %s" % err)
766 self._state = ECState.FAILED
768 self.logger.debug("Exiting the task processing loop ... ")
771 def _execute(self, task):
772 """ Executes a single task.
774 :param task: Object containing the callback to execute
779 If the invokation of the task callback raises an
780 exception, the processing thread of the ExperimentController
781 will be stopped and the experiment will be aborted.
785 task.status = TaskStatus.DONE
788 task.result = task.callback()
791 err = traceback.format_exc()
793 task.status = TaskStatus.ERROR
795 self.logger.error("Error occurred while executing task: %s" % err)
797 # Set the EC to FAILED state (this will force to exit the task
799 self._state = ECState.FAILED
801 # Notify condition to wake up the processing thread
804 # Propage error to the ParallelRunner
808 """ Awakes the processing thread in case it is blocked waiting
809 for a new task to be scheduled.