2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
40 class ECState(object):
41 """ State of the Experiment Controller
48 class ExperimentController(object):
50 .. class:: Class Args :
52 :param exp_id: Human readable identifier for the experiment scenario.
53 It will be used in the name of the directory
54 where experiment related information is stored
59 An experiment, or scenario, is defined by a concrete use, behavior,
60 configuration and interconnection of resources that describe a single
61 experiment case (We call this the experiment description).
62 A same experiment (scenario) can be run many times.
64 The ExperimentController (EC), is the entity responsible for
65 managing an experiment instance (run). The same scenario can be
66 recreated (and re-run) by instantiating an EC and recreating
67 the same experiment description.
69 In NEPI, an experiment is represented as a graph of interconnected
70 resources. A resource is a generic concept in the sense that any
71 component taking part of an experiment, whether physical of
72 virtual, is considered a resource. A resources could be a host,
73 a virtual machine, an application, a simulator, a IP address.
75 A ResourceManager (RM), is the entity responsible for managing a
76 single resource. ResourceManagers are specific to a resource
77 type (i.e. An RM to control a Linux application will not be
78 the same as the RM used to control a ns-3 simulation).
79 In order for a new type of resource to be supported in NEPI
80 a new RM must be implemented. NEPI already provides different
81 RMs to control basic resources, and new can be extended from
84 Through the EC interface the user can create ResourceManagers (RMs),
85 configure them and interconnect them, in order to describe an experiment.
86 Describing an experiment through the EC does not run the experiment.
87 Only when the 'deploy()' method is invoked on the EC, will the EC take
88 actions to transform the 'described' experiment into a 'running' experiment.
90 While the experiment is running, it is possible to continue to
91 create/configure/connect RMs, and to deploy them to involve new
92 resources in the experiment (this is known as 'interactive' deployment).
94 An experiments in NEPI is identified by a string id,
95 which is either given by the user, or automatically generated by NEPI.
96 The purpose of this identifier is to separate files and results that
97 belong to different experiment scenarios.
98 However, since a same 'experiment' can be run many times, the experiment
99 id is not enough to identify an experiment instance (run).
100 For this reason, the ExperimentController has two identifier, the
101 exp_id, which can be re-used by different ExperimentController instances,
102 and the run_id, which unique to a ExperimentController instance, and
103 is automatically generated by NEPI.
107 def __init__(self, exp_id = None):
108 super(ExperimentController, self).__init__()
110 self._logger = logging.getLogger("ExperimentController")
112 # Run identifier. It identifies a concrete instance (run) of an experiment.
113 # Since a same experiment (same configuration) can be run many times,
114 # this id permits to identify concrete exoeriment run
115 self._run_id = tsformat()
117 # Experiment identifier. Usually assigned by the user
118 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
120 # generator of globally unique ids
121 self._guid_generator = guid.GuidGenerator()
124 self._resources = dict()
127 self._scheduler = HeapScheduler()
133 self._groups = dict()
135 # generator of globally unique id for groups
136 self._group_id_generator = guid.GuidGenerator()
138 # Event processing thread
139 self._cond = threading.Condition()
140 self._thread = threading.Thread(target = self._process)
141 self._thread.setDaemon(True)
145 self._state = ECState.RUNNING
149 """ Return the logger of the Experiment Controller
156 """ Return the state of the Experiment Controller
163 """ Return the experiment id assigned by the user
170 """ Return the experiment instance (run) identifier
177 """ Put the state of the Experiment Controller into a final state :
178 Either TERMINATED or FAILED
181 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
183 def wait_finished(self, guids):
184 """ Blocking method that wait until all the RM from the 'guid' list
185 reached the state FINISHED
187 :param guids: List of guids
190 return self.wait(guids)
192 def wait_started(self, guids):
193 """ Blocking method that wait until all the RM from the 'guid' list
194 reached the state STARTED
196 :param guids: List of guids
199 return self.wait(guids, states = [ResourceState.STARTED,
200 ResourceState.STOPPED,
201 ResourceState.FAILED,
202 ResourceState.FINISHED])
204 def wait_released(self, guids):
205 """ Blocking method that wait until all the RM from the 'guid' list
206 reached the state RELEASED
208 :param guids: List of guids
211 return self.wait(guids, states = [ResourceState.RELEASED,
212 ResourceState.STOPPED,
213 ResourceState.FAILED,
214 ResourceState.FINISHED])
216 def wait(self, guids, states = [ResourceState.FINISHED,
217 ResourceState.FAILED,
218 ResourceState.STOPPED]):
219 """ Blocking method that waits until all the RM from the 'guid' list
220 reached state 'state' or until a failure occurs
222 :param guids: List of guids
225 if isinstance(guids, int):
228 # we randomly alter the order of the guids to avoid ordering
229 # dependencies (e.g. LinuxApplication RMs runing on the same
230 # linux host will be synchronized by the LinuxNode SSH lock)
231 random.shuffle(guids)
234 # If no more guids to wait for or an error occured, then exit
235 if len(guids) == 0 or self.finished:
238 # If a guid reached one of the target states, remove it from list
240 state = self.state(guid)
246 self.logger.debug(" WAITING FOR %g - state %s " % (guid,
247 self.state(guid, hr = True)))
249 # Take the opportunity to 'refresh' the states of the RMs.
250 # Query only the first up to N guids (not to overwhelm
253 lim = n if len(guids) > n else ( len(guids) -1 )
254 nguids = guids[0: lim]
256 # schedule state request for all guids (take advantage of
257 # scheduler multi threading).
259 callback = functools.partial(self.state, guid)
260 self.schedule("0s", callback)
262 # If the guid is not in one of the target states, wait and
263 # continue quering. We keep the sleep big to decrease the
264 # number of RM state queries
267 def get_task(self, tid):
268 """ Get a specific task
270 :param tid: Id of the task
274 return self._tasks.get(tid)
276 def get_resource(self, guid):
277 """ Get a specific Resource Manager
279 :param guid: Id of the task
281 :rtype: ResourceManager
283 return self._resources.get(guid)
287 """ Returns the list of all the Resource Manager Id
292 return self._resources.keys()
294 def register_resource(self, rtype, guid = None):
295 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
296 for the RM of type 'rtype' and add it to the list of Resources.
298 :param rtype: Type of the RM
300 :return: Id of the RM
303 # Get next available guid
304 guid = self._guid_generator.next(guid)
307 rm = ResourceFactory.create(rtype, self, guid)
310 self._resources[guid] = rm
314 def get_attributes(self, guid):
315 """ Return all the attibutes of a specific RM
317 :param guid: Guid of the RM
319 :return: List of attributes
322 rm = self.get_resource(guid)
323 return rm.get_attributes()
325 def register_connection(self, guid1, guid2):
326 """ Registers a guid1 with a guid2.
327 The declaration order is not important
329 :param guid1: First guid to connect
330 :type guid1: ResourceManager
332 :param guid2: Second guid to connect
333 :type guid: ResourceManager
335 rm1 = self.get_resource(guid1)
336 rm2 = self.get_resource(guid2)
338 rm1.register_connection(guid2)
339 rm2.register_connection(guid1)
341 def register_condition(self, guids1, action, guids2, state,
343 """ Registers an action START or STOP for all RM on guids1 to occur
344 time 'time' after all elements in guids2 reached state 'state'.
346 :param guids1: List of guids of RMs subjected to action
349 :param action: Action to register (either START or STOP)
350 :type action: ResourceAction
352 :param guids2: List of guids of RMs to we waited for
355 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
356 :type state: ResourceState
358 :param time: Time to wait after guids2 has reached status
362 if isinstance(guids1, int):
364 if isinstance(guids2, int):
368 rm = self.get_resource(guid1)
369 rm.register_condition(action, guids2, state, time)
371 def enable_trace(self, guid, name):
374 :param name: Name of the trace
377 rm = self.get_resource(guid)
378 rm.enable_trace(name)
380 def trace_enabled(self, guid, name):
381 """ Returns True if trace is enabled
383 :param name: Name of the trace
386 rm = self.get_resource(guid)
387 return rm.trace_enabled(name)
389 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
390 """ Get information on collected trace
392 :param name: Name of the trace
395 :param attr: Can be one of:
396 - TraceAttr.ALL (complete trace content),
397 - TraceAttr.STREAM (block in bytes to read starting at offset),
398 - TraceAttr.PATH (full path to the trace file),
399 - TraceAttr.SIZE (size of trace file).
402 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
405 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
410 rm = self.get_resource(guid)
411 return rm.trace(name, attr, block, offset)
413 def discover(self, guid):
414 """ Discover a specific RM defined by its 'guid'
416 :param guid: Guid of the RM
420 rm = self.get_resource(guid)
423 def provision(self, guid):
424 """ Provision a specific RM defined by its 'guid'
426 :param guid: Guid of the RM
430 rm = self.get_resource(guid)
431 return rm.provision()
433 def get(self, guid, name):
434 """ Get a specific attribute 'name' from the RM 'guid'
436 :param guid: Guid of the RM
439 :param name: attribute's name
443 rm = self.get_resource(guid)
446 def set(self, guid, name, value):
447 """ Set a specific attribute 'name' from the RM 'guid'
448 with the value 'value'
450 :param guid: Guid of the RM
453 :param name: attribute's name
456 :param value: attribute's value
459 rm = self.get_resource(guid)
460 return rm.set(name, value)
462 def state(self, guid, hr = False):
463 """ Returns the state of a resource
465 :param guid: Resource guid
468 :param hr: Human readable. Forces return of a
469 status string instead of a number
473 rm = self.get_resource(guid)
477 return ResourceState2str.get(state)
481 def stop(self, guid):
482 """ Stop a specific RM defined by its 'guid'
484 :param guid: Guid of the RM
488 rm = self.get_resource(guid)
491 def start(self, guid):
492 """ Start a specific RM defined by its 'guid'
494 :param guid: Guid of the RM
498 rm = self.get_resource(guid)
501 def set_with_conditions(self, name, value, guids1, guids2, state,
503 """ Set value 'value' on attribute with name 'name' on all RMs of
504 guids1 when 'time' has elapsed since all elements in guids2
505 have reached state 'state'.
507 :param name: Name of attribute to set in RM
510 :param value: Value of attribute to set in RM
513 :param guids1: List of guids of RMs subjected to action
516 :param action: Action to register (either START or STOP)
517 :type action: ResourceAction
519 :param guids2: List of guids of RMs to we waited for
522 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
523 :type state: ResourceState
525 :param time: Time to wait after guids2 has reached status
529 if isinstance(guids1, int):
531 if isinstance(guids2, int):
535 rm = self.get_resource(guid)
536 rm.set_with_conditions(name, value, guids2, state, time)
538 def stop_with_conditions(self, guid):
539 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
541 :param guid: Guid of the RM
545 rm = self.get_resource(guid)
546 return rm.stop_with_conditions()
548 def start_with_conditions(self, guid):
549 """ Start a specific RM defined by its 'guid' only if all the conditions are true
551 :param guid: Guid of the RM
555 rm = self.get_resource(guid)
556 return rm.start_with_conditions()
558 def deploy(self, guids = None, wait_all_ready = True, group = None):
559 """ Deploy all resource manager in guids list
561 :param guids: List of guids of RMs to deploy
564 :param wait_all_ready: Wait until all RMs are ready in
565 order to start the RMs
568 :param group: Id of deployment group in which to deploy RMs
572 self.logger.debug(" ------- DEPLOY START ------ ")
575 # If no guids list was indicated, all 'NEW' RMs will be deployed
577 for guid in self.resources:
578 if self.state(guid) == ResourceState.NEW:
581 if isinstance(guids, int):
584 # Create deployment group
588 group = self._group_id_generator.next(guid)
590 if group not in self._groups:
591 self._groups[group] = []
593 self._groups[group].extend(guids)
595 # Before starting deployment we disorder the guids list with the
596 # purpose of speeding up the whole deployment process.
597 # It is likely that the user inserted in the 'guids' list closely
598 # resources one after another (e.g. all applications
599 # connected to the same node can likely appear one after another).
600 # This can originate a slow down in the deployment since the N
601 # threads the parallel runner uses to processes tasks may all
602 # be taken up by the same family of resources waiting for the
603 # same conditions (e.g. LinuxApplications running on a same
604 # node share a single lock, so they will tend to be serialized).
605 # If we disorder the guids list, this problem can be mitigated.
606 random.shuffle(guids)
608 def wait_all_and_start(group):
611 # Get all guids in group
612 guids = self._groups[group]
615 if self.state(guid) < ResourceState.READY:
620 callback = functools.partial(wait_all_and_start, group)
621 self.schedule("1s", callback)
623 # If all resources are read, we schedule the start
625 rm = self.get_resource(guid)
626 self.schedule("0s", rm.start_with_conditions)
628 if wait_all_ready and new_group:
629 # Schedule a function to check that all resources are
630 # READY, and only then schedule the start.
631 # This aimes at reducing the number of tasks looping in the
633 # Intead of having N start tasks, we will have only one for
635 callback = functools.partial(wait_all_and_start, group)
636 self.schedule("1s", callback)
639 rm = self.get_resource(guid)
640 rm.deployment_group = group
641 self.schedule("0s", rm.deploy)
643 if not wait_all_ready:
644 self.schedule("1s", rm.start_with_conditions)
646 if rm.conditions.get(ResourceAction.STOP):
647 # Only if the RM has STOP conditions we
648 # schedule a stop. Otherwise the RM will stop immediately
649 self.schedule("2s", rm.stop_with_conditions)
651 def release(self, guids = None):
652 """ Release al RMs on the guids list or
653 all the resources if no list is specified
655 :param guids: List of RM guids
660 guids = self.resources
663 rm = self.get_resource(guid)
664 self.schedule("0s", rm.release)
666 self.wait_released(guids)
669 """ Shutdown the Experiment Controller.
670 Releases all the resources and stops task processing thread
675 # Mark the EC state as TERMINATED
676 self._state = ECState.TERMINATED
678 # Notify condition to wake up the processing thread
681 if self._thread.is_alive():
684 def schedule(self, date, callback, track = False):
685 """ Schedule a callback to be executed at time date.
687 :param date: string containing execution time for the task.
688 It can be expressed as an absolute time, using
689 timestamp format, or as a relative time matching
690 ^\d+.\d+(h|m|s|ms|us)$
692 :param callback: code to be executed for the task. Must be a
693 Python function, and receives args and kwargs
696 :param track: if set to True, the task will be retrivable with
697 the get_task() method
699 :return : The Id of the task
701 timestamp = stabsformat(date)
702 task = Task(timestamp, callback)
703 task = self._scheduler.schedule(task)
706 self._tasks[task.id] = task
708 # Notify condition to wake up the processing thread
714 """ Process scheduled tasks.
718 The _process method is executed in an independent thread held by the
719 ExperimentController for as long as the experiment is running.
721 Tasks are scheduled by invoking the schedule method with a target callback.
722 The schedule method is given a execution time which controls the
723 order in which tasks are processed.
725 Tasks are processed in parallel using multithreading.
726 The environmental variable NEPI_NTHREADS can be used to control
727 the number of threads used to process tasks. The default value is 50.
731 To execute tasks in parallel, an ParallelRunner (PR) object, holding
732 a pool of threads (workers), is used.
733 For each available thread in the PR, the next task popped from
734 the scheduler queue is 'put' in the PR.
735 Upon receiving a task to execute, each PR worker (thread) invokes the
736 _execute method of the EC, passing the task as argument.
737 This method, calls task.callback inside a try/except block. If an
738 exception is raised by the tasks.callback, it will be trapped by the
739 try block, logged to standard error (usually the console), and the EC
740 state will be set to ECState.FAILED.
741 The invocation of _notify immediately after, forces the processing
742 loop in the _process method, to wake up if it was blocked waiting for new
743 tasks to arrived, and to check the EC state.
744 As the EC is in FAILED state, the processing loop exits and the
745 'finally' block is invoked. In the 'finally' block, the 'sync' method
746 of the PR is invoked, which forces the PR to raise any unchecked errors
747 that might have been raised by the workers.
750 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
752 runner = ParallelRun(maxthreads = nthreads)
756 while not self.finished:
759 task = self._scheduler.next()
762 # No task to execute. Wait for a new task to be scheduled.
765 # The task timestamp is in the future. Wait for timeout
766 # or until another task is scheduled.
768 if now < task.timestamp:
769 # Calculate timeout in seconds
770 timeout = tdiffsec(task.timestamp, now)
772 # Re-schedule task with the same timestamp
773 self._scheduler.schedule(task)
777 # Wait timeout or until a new task awakes the condition
778 self._cond.wait(timeout)
783 # Process tasks in parallel
784 runner.put(self._execute, task)
787 err = traceback.format_exc()
788 self.logger.error("Error while processing tasks in the EC: %s" % err)
790 self._state = ECState.FAILED
792 self.logger.debug("Exiting the task processing loop ... ")
796 def _execute(self, task):
797 """ Executes a single task.
799 :param task: Object containing the callback to execute
804 If the invokation of the task callback raises an
805 exception, the processing thread of the ExperimentController
806 will be stopped and the experiment will be aborted.
810 task.status = TaskStatus.DONE
813 task.result = task.callback()
816 err = traceback.format_exc()
818 task.status = TaskStatus.ERROR
820 self.logger.error("Error occurred while executing task: %s" % err)
822 # Set the EC to FAILED state (this will force to exit the task
824 self._state = ECState.FAILED
826 # Notify condition to wake up the processing thread
829 # Propage error to the ParallelRunner
833 """ Awakes the processing thread in case it is blocked waiting
834 for a new task to be scheduled.