2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Human readable identifier for the experiment.
52 It will be used in the name of the directory
53 where experiment related information is stored
56 :param root_dir: Root directory where experiment specific folder
57 will be created to store experiment information
61 The ExperimentController (EC), is the entity responsible for
62 managing a single experiment.
63 Through the EC interface the user can create ResourceManagers (RMs),
64 configure them and interconnect them, in order to describe the experiment.
66 Only when the 'deploy()' method is invoked, the EC will take actions
67 to transform the 'described' experiment into a 'running' experiment.
69 While the experiment is running, it is possible to continue to
70 create/configure/connect RMs, and to deploy them to involve new
71 resources in the experiment.
75 def __init__(self, exp_id = None, root_dir = "/tmp"):
76 super(ExperimentController, self).__init__()
77 # root directory to store files
78 self._root_dir = root_dir
80 # experiment identifier given by the user
81 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
83 # generator of globally unique ids
84 self._guid_generator = guid.GuidGenerator()
87 self._resources = dict()
90 self._scheduler = HeapScheduler()
95 # Event processing thread
96 self._cond = threading.Condition()
97 self._thread = threading.Thread(target = self._process)
98 self._thread.setDaemon(True)
102 self._state = ECState.RUNNING
105 self._logger = logging.getLogger("ExperimentController")
109 """ Return the logger of the Experiment Controller
116 """ Return the state of the Experiment Controller
123 """ Return the experiment ID
126 exp_id = self._exp_id
127 if not exp_id.startswith("nepi-"):
128 exp_id = "nepi-" + exp_id
133 """ Put the state of the Experiment Controller into a final state :
134 Either TERMINATED or FAILED
137 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
139 def wait_finished(self, guids):
140 """ Blocking method that wait until all the RM from the 'guid' list
141 reached the state FINISHED
143 :param guids: List of guids
146 return self.wait(guids)
148 def wait_started(self, guids):
149 """ Blocking method that wait until all the RM from the 'guid' list
150 reached the state STARTED
152 :param guids: List of guids
155 return self.wait(guids, states = [ResourceState.STARTED,
156 ResourceState.STOPPED,
157 ResourceState.FINISHED])
159 def wait(self, guids, states = [ResourceState.FINISHED,
160 ResourceState.STOPPED]):
161 """ Blocking method that waits until all the RM from the 'guid' list
162 reached state 'state' or until a failure occurs
164 :param guids: List of guids
167 if isinstance(guids, int):
170 while not all([self.state(guid) in states for guid in guids]) and \
171 not any([self.state(guid) in [
172 ResourceState.FAILED] for guid in guids]) and \
177 waited += "guid %d - %s \n" % (guid, self.state(guid, hr = True))
178 self.logger.debug(" WAITING FOR %s " % waited )
180 # We keep the sleep big to decrease the number of RM state queries
183 def get_task(self, tid):
184 """ Get a specific task
186 :param tid: Id of the task
190 return self._tasks.get(tid)
192 def get_resource(self, guid):
193 """ Get a specific Resource Manager
195 :param guid: Id of the task
197 :rtype: ResourceManager
199 return self._resources.get(guid)
203 """ Returns the list of all the Resource Manager Id
208 return self._resources.keys()
210 def register_resource(self, rtype, guid = None):
211 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
212 for the RM of type 'rtype' and add it to the list of Resources.
214 :param rtype: Type of the RM
216 :return: Id of the RM
219 # Get next available guid
220 guid = self._guid_generator.next(guid)
223 rm = ResourceFactory.create(rtype, self, guid)
226 self._resources[guid] = rm
230 def get_attributes(self, guid):
231 """ Return all the attibutes of a specific RM
233 :param guid: Guid of the RM
235 :return: List of attributes
238 rm = self.get_resource(guid)
239 return rm.get_attributes()
241 def register_connection(self, guid1, guid2):
242 """ Registers a guid1 with a guid2.
243 The declaration order is not important
245 :param guid1: First guid to connect
246 :type guid1: ResourceManager
248 :param guid2: Second guid to connect
249 :type guid: ResourceManager
251 rm1 = self.get_resource(guid1)
252 rm2 = self.get_resource(guid2)
254 rm1.register_connection(guid2)
255 rm2.register_connection(guid1)
257 def register_condition(self, group1, action, group2, state,
259 """ Registers an action START or STOP for all RM on group1 to occur
260 time 'time' after all elements in group2 reached state 'state'.
262 :param group1: List of guids of RMs subjected to action
265 :param action: Action to register (either START or STOP)
266 :type action: ResourceAction
268 :param group2: List of guids of RMs to we waited for
271 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
272 :type state: ResourceState
274 :param time: Time to wait after group2 has reached status
278 if isinstance(group1, int):
280 if isinstance(group2, int):
284 rm = self.get_resource(guid1)
285 rm.register_condition(action, group2, state, time)
287 def register_trace(self, guid, name):
290 :param name: Name of the trace
293 rm = self.get_resource(guid)
294 rm.register_trace(name)
296 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
297 """ Get information on collected trace
299 :param name: Name of the trace
302 :param attr: Can be one of:
303 - TraceAttr.ALL (complete trace content),
304 - TraceAttr.STREAM (block in bytes to read starting at offset),
305 - TraceAttr.PATH (full path to the trace file),
306 - TraceAttr.SIZE (size of trace file).
309 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
312 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
317 rm = self.get_resource(guid)
318 return rm.trace(name, attr, block, offset)
320 def discover(self, guid):
321 """ Discover a specific RM defined by its 'guid'
323 :param guid: Guid of the RM
327 rm = self.get_resource(guid)
330 def provision(self, guid):
331 """ Provision a specific RM defined by its 'guid'
333 :param guid: Guid of the RM
337 rm = self.get_resource(guid)
338 return rm.provision()
340 def get(self, guid, name):
341 """ Get a specific attribute 'name' from the RM 'guid'
343 :param guid: Guid of the RM
346 :param name: attribute's name
350 rm = self.get_resource(guid)
353 def set(self, guid, name, value):
354 """ Set a specific attribute 'name' from the RM 'guid'
355 with the value 'value'
357 :param guid: Guid of the RM
360 :param name: attribute's name
363 :param value: attribute's value
366 rm = self.get_resource(guid)
367 return rm.set(name, value)
369 def state(self, guid, hr = False):
370 """ Returns the state of a resource
372 :param guid: Resource guid
375 :param hr: Human readable. Forces return of a
376 status string instead of a number
380 rm = self.get_resource(guid)
384 return ResourceState2str.get(state)
388 def stop(self, guid):
389 """ Stop a specific RM defined by its 'guid'
391 :param guid: Guid of the RM
395 rm = self.get_resource(guid)
398 def start(self, guid):
399 """ Start a specific RM defined by its 'guid'
401 :param guid: Guid of the RM
405 rm = self.get_resource(guid)
408 def set_with_conditions(self, name, value, group1, group2, state,
410 """ Set value 'value' on attribute with name 'name' on all RMs of
411 group1 when 'time' has elapsed since all elements in group2
412 have reached state 'state'.
414 :param name: Name of attribute to set in RM
417 :param value: Value of attribute to set in RM
420 :param group1: List of guids of RMs subjected to action
423 :param action: Action to register (either START or STOP)
424 :type action: ResourceAction
426 :param group2: List of guids of RMs to we waited for
429 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
430 :type state: ResourceState
432 :param time: Time to wait after group2 has reached status
436 if isinstance(group1, int):
438 if isinstance(group2, int):
442 rm = self.get_resource(guid)
443 rm.set_with_conditions(name, value, group2, state, time)
445 def stop_with_conditions(self, guid):
446 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
448 :param guid: Guid of the RM
452 rm = self.get_resource(guid)
453 return rm.stop_with_conditions()
455 def start_with_conditions(self, guid):
456 """ Start a specific RM defined by its 'guid' only if all the conditions are true
458 :param guid: Guid of the RM
462 rm = self.get_resource(guid)
463 return rm.start_with_condition()
465 def deploy(self, group = None, wait_all_ready = True):
466 """ Deploy all resource manager in group
468 :param group: List of guids of RMs to deploy
471 :param wait_all_ready: Wait until all RMs are ready in
472 order to start the RMs
476 self.logger.debug(" ------- DEPLOY START ------ ")
479 # By default, if not deployment group is indicated,
480 # all RMs that are undeployed will be deployed
482 for guid in self.resources:
483 if self.state(guid) == ResourceState.NEW:
486 if isinstance(group, int):
489 # Before starting deployment we disorder the group list with the
490 # purpose of speeding up the whole deployment process.
491 # It is likely that the user inserted in the 'group' list closely
492 # resources one after another (e.g. all applications
493 # connected to the same node can likely appear one after another).
494 # This can originate a slow down in the deployment since the N
495 # threads the parallel runner uses to processes tasks may all
496 # be taken up by the same family of resources waiting for the
497 # same conditions (e.g. LinuxApplications running on a same
498 # node share a single lock, so they will tend to be serialized).
499 # If we disorder the group list, this problem can be mitigated.
500 random.shuffle(group)
502 def wait_all_and_start(group):
505 if self.state(guid) < ResourceState.READY:
510 callback = functools.partial(wait_all_and_start, group)
511 self.schedule("1s", callback)
513 # If all resources are read, we schedule the start
515 rm = self.get_resource(guid)
516 self.schedule("0s", rm.start_with_conditions)
519 # Schedule the function that will check all resources are
520 # READY, and only then it will schedule the start.
521 # This is aimed to reduce the number of tasks looping in the scheduler.
522 # Intead of having N start tasks, we will have only one
523 callback = functools.partial(wait_all_and_start, group)
524 self.schedule("1s", callback)
527 rm = self.get_resource(guid)
528 self.schedule("0s", rm.deploy)
530 if not wait_all_ready:
531 self.schedule("1s", rm.start_with_conditions)
533 if rm.conditions.get(ResourceAction.STOP):
534 # Only if the RM has STOP conditions we
535 # schedule a stop. Otherwise the RM will stop immediately
536 self.schedule("2s", rm.stop_with_conditions)
538 def release(self, group = None):
539 """ Release the elements of the list 'group' or
540 all the resources if any group is specified
542 :param group: List of RM
547 group = self.resources
551 rm = self.get_resource(guid)
552 thread = threading.Thread(target=rm.release)
553 threads.append(thread)
554 thread.setDaemon(True)
557 while list(threads) and not self.finished:
559 # Time out after 5 seconds to check EC not terminated
561 if not thread.is_alive():
562 threads.remove(thread)
565 """ Shutdown the Experiment Controller.
566 Releases all the resources and stops task processing thread
571 # Mark the EC state as TERMINATED
572 self._state = ECState.TERMINATED
574 # Notify condition to wake up the processing thread
577 if self._thread.is_alive():
580 def schedule(self, date, callback, track = False):
581 """ Schedule a callback to be executed at time date.
583 :param date: string containing execution time for the task.
584 It can be expressed as an absolute time, using
585 timestamp format, or as a relative time matching
586 ^\d+.\d+(h|m|s|ms|us)$
588 :param callback: code to be executed for the task. Must be a
589 Python function, and receives args and kwargs
592 :param track: if set to True, the task will be retrivable with
593 the get_task() method
595 :return : The Id of the task
597 timestamp = stabsformat(date)
598 task = Task(timestamp, callback)
599 task = self._scheduler.schedule(task)
602 self._tasks[task.id] = task
604 # Notify condition to wake up the processing thread
610 """ Process scheduled tasks.
614 The _process method is executed in an independent thread held by the
615 ExperimentController for as long as the experiment is running.
617 Tasks are scheduled by invoking the schedule method with a target callback.
618 The schedule method is given a execution time which controls the
619 order in which tasks are processed.
621 Tasks are processed in parallel using multithreading.
622 The environmental variable NEPI_NTHREADS can be used to control
623 the number of threads used to process tasks. The default value is 50.
627 To execute tasks in parallel, an ParallelRunner (PR) object, holding
628 a pool of threads (workers), is used.
629 For each available thread in the PR, the next task popped from
630 the scheduler queue is 'put' in the PR.
631 Upon receiving a task to execute, each PR worker (thread) invokes the
632 _execute method of the EC, passing the task as argument.
633 This method, calls task.callback inside a try/except block. If an
634 exception is raised by the tasks.callback, it will be trapped by the
635 try block, logged to standard error (usually the console), and the EC
636 state will be set to ECState.FAILED.
637 The invocation of _notify immediately after, forces the processing
638 loop in the _process method, to wake up if it was blocked waiting for new
639 tasks to arrived, and to check the EC state.
640 As the EC is in FAILED state, the processing loop exits and the
641 'finally' block is invoked. In the 'finally' block, the 'sync' method
642 of the PR is invoked, which forces the PR to raise any unchecked errors
643 that might have been raised by the workers.
646 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
648 runner = ParallelRun(maxthreads = nthreads)
652 while not self.finished:
655 task = self._scheduler.next()
658 # No task to execute. Wait for a new task to be scheduled.
661 # The task timestamp is in the future. Wait for timeout
662 # or until another task is scheduled.
664 if now < task.timestamp:
665 # Calculate timeout in seconds
666 timeout = tdiffsec(task.timestamp, now)
668 # Re-schedule task with the same timestamp
669 self._scheduler.schedule(task)
673 # Wait timeout or until a new task awakes the condition
674 self._cond.wait(timeout)
679 # Process tasks in parallel
680 runner.put(self._execute, task)
683 err = traceback.format_exc()
684 self.logger.error("Error while processing tasks in the EC: %s" % err)
686 self._state = ECState.FAILED
688 self.logger.debug("Exiting the task processing loop ... ")
691 def _execute(self, task):
692 """ Executes a single task.
694 :param task: Object containing the callback to execute
699 If the invokation of the task callback raises an
700 exception, the processing thread of the ExperimentController
701 will be stopped and the experiment will be aborted.
705 task.status = TaskStatus.DONE
708 task.result = task.callback()
711 err = traceback.format_exc()
713 task.status = TaskStatus.ERROR
715 self.logger.error("Error occurred while executing task: %s" % err)
717 # Set the EC to FAILED state (this will force to exit the task
719 self._state = ECState.FAILED
721 # Notify condition to wake up the processing thread
724 # Propage error to the ParallelRunner
728 """ Awakes the processing thread in case it is blocked waiting
729 for a new task to be scheduled.