2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Human readable identifier for the experiment.
52 It will be used in the name of the directory
53 where experiment related information is stored
56 :param root_dir: Root directory where experiment specific folder
57 will be created to store experiment information
61 The ExperimentController (EC), is the entity responsible for
62 managing a single experiment.
63 Through the EC interface the user can create ResourceManagers (RMs),
64 configure them and interconnect them, in order to describe the experiment.
66 Only when the 'deploy()' method is invoked, the EC will take actions
67 to transform the 'described' experiment into a 'running' experiment.
69 While the experiment is running, it is possible to continue to
70 create/configure/connect RMs, and to deploy them to involve new
71 resources in the experiment.
75 def __init__(self, exp_id = None, root_dir = "/tmp"):
76 super(ExperimentController, self).__init__()
77 # root directory to store files
78 self._root_dir = root_dir
80 # experiment identifier given by the user
81 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
83 # generator of globally unique ids
84 self._guid_generator = guid.GuidGenerator()
87 self._resources = dict()
90 self._scheduler = HeapScheduler()
95 # Event processing thread
96 self._cond = threading.Condition()
97 self._thread = threading.Thread(target = self._process)
98 self._thread.setDaemon(True)
102 self._state = ECState.RUNNING
105 self._logger = logging.getLogger("ExperimentController")
109 """ Return the logger of the Experiment Controller
116 """ Return the state of the Experiment Controller
123 """ Return the experiment ID
126 exp_id = self._exp_id
127 if not exp_id.startswith("nepi-"):
128 exp_id = "nepi-" + exp_id
133 """ Put the state of the Experiment Controller into a final state :
134 Either TERMINATED or FAILED
137 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
139 def wait_finished(self, guids):
140 """ Blocking method that wait until all the RM from the 'guid' list
141 reached the state FINISHED
143 :param guids: List of guids
146 return self.wait(guids)
148 def wait_started(self, guids):
149 """ Blocking method that wait until all the RM from the 'guid' list
150 reached the state STARTED
152 :param guids: List of guids
155 return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
157 def wait(self, guids, states = [ResourceState.FINISHED]):
158 """ Blocking method that waits until all the RM from the 'guid' list
159 reached state 'state' or until a failure occurs
161 :param guids: List of guids
164 if isinstance(guids, int):
167 while not all([self.state(guid) in states for guid in guids]) and \
168 not any([self.state(guid) in [
169 ResourceState.STOPPED,
170 ResourceState.FAILED] for guid in guids]) and \
172 # We keep the sleep big to decrease the number of RM state queries
175 def get_task(self, tid):
176 """ Get a specific task
178 :param tid: Id of the task
182 return self._tasks.get(tid)
184 def get_resource(self, guid):
185 """ Get a specific Resource Manager
187 :param guid: Id of the task
189 :rtype: ResourceManager
191 return self._resources.get(guid)
195 """ Returns the list of all the Resource Manager Id
200 return self._resources.keys()
202 def register_resource(self, rtype, guid = None):
203 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
204 for the RM of type 'rtype' and add it to the list of Resources.
206 :param rtype: Type of the RM
208 :return: Id of the RM
211 # Get next available guid
212 guid = self._guid_generator.next(guid)
215 rm = ResourceFactory.create(rtype, self, guid)
218 self._resources[guid] = rm
222 def get_attributes(self, guid):
223 """ Return all the attibutes of a specific RM
225 :param guid: Guid of the RM
227 :return: List of attributes
230 rm = self.get_resource(guid)
231 return rm.get_attributes()
233 def register_connection(self, guid1, guid2):
234 """ Registers a guid1 with a guid2.
235 The declaration order is not important
237 :param guid1: First guid to connect
238 :type guid1: ResourceManager
240 :param guid2: Second guid to connect
241 :type guid: ResourceManager
243 rm1 = self.get_resource(guid1)
244 rm2 = self.get_resource(guid2)
249 def register_condition(self, group1, action, group2, state,
251 """ Registers an action START or STOP for all RM on group1 to occur
252 time 'time' after all elements in group2 reached state 'state'.
254 :param group1: List of guids of RMs subjected to action
257 :param action: Action to register (either START or STOP)
258 :type action: ResourceAction
260 :param group2: List of guids of RMs to we waited for
263 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
264 :type state: ResourceState
266 :param time: Time to wait after group2 has reached status
270 if isinstance(group1, int):
272 if isinstance(group2, int):
276 rm = self.get_resource(guid1)
277 rm.register_condition(action, group2, state, time)
279 def register_trace(self, guid, name):
282 :param name: Name of the trace
285 rm = self.get_resource(guid)
286 rm.register_trace(name)
288 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
289 """ Get information on collected trace
291 :param name: Name of the trace
294 :param attr: Can be one of:
295 - TraceAttr.ALL (complete trace content),
296 - TraceAttr.STREAM (block in bytes to read starting at offset),
297 - TraceAttr.PATH (full path to the trace file),
298 - TraceAttr.SIZE (size of trace file).
301 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
304 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
309 rm = self.get_resource(guid)
310 return rm.trace(name, attr, block, offset)
312 def discover(self, guid):
313 """ Discover a specific RM defined by its 'guid'
315 :param guid: Guid of the RM
319 rm = self.get_resource(guid)
322 def provision(self, guid):
323 """ Provision a specific RM defined by its 'guid'
325 :param guid: Guid of the RM
329 rm = self.get_resource(guid)
330 return rm.provision()
332 def get(self, guid, name):
333 """ Get a specific attribute 'name' from the RM 'guid'
335 :param guid: Guid of the RM
338 :param name: attribute's name
342 rm = self.get_resource(guid)
345 def set(self, guid, name, value):
346 """ Set a specific attribute 'name' from the RM 'guid'
347 with the value 'value'
349 :param guid: Guid of the RM
352 :param name: attribute's name
355 :param value: attribute's value
358 rm = self.get_resource(guid)
359 return rm.set(name, value)
361 def state(self, guid, hr = False):
362 """ Returns the state of a resource
364 :param guid: Resource guid
367 :param hr: Human readable. Forces return of a
368 status string instead of a number
372 rm = self.get_resource(guid)
376 return ResourceState2str.get(state)
380 def stop(self, guid):
381 """ Stop a specific RM defined by its 'guid'
383 :param guid: Guid of the RM
387 rm = self.get_resource(guid)
390 def start(self, guid):
391 """ Start a specific RM defined by its 'guid'
393 :param guid: Guid of the RM
397 rm = self.get_resource(guid)
400 def set_with_conditions(self, name, value, group1, group2, state,
402 """ Set value 'value' on attribute with name 'name' on all RMs of
403 group1 when 'time' has elapsed since all elements in group2
404 have reached state 'state'.
406 :param name: Name of attribute to set in RM
409 :param value: Value of attribute to set in RM
412 :param group1: List of guids of RMs subjected to action
415 :param action: Action to register (either START or STOP)
416 :type action: ResourceAction
418 :param group2: List of guids of RMs to we waited for
421 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
422 :type state: ResourceState
424 :param time: Time to wait after group2 has reached status
428 if isinstance(group1, int):
430 if isinstance(group2, int):
434 rm = self.get_resource(guid)
435 rm.set_with_conditions(name, value, group2, state, time)
437 def stop_with_conditions(self, guid):
438 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
440 :param guid: Guid of the RM
444 rm = self.get_resource(guid)
445 return rm.stop_with_conditions()
447 def start_with_conditions(self, guid):
448 """ Start a specific RM defined by its 'guid' only if all the conditions are true
450 :param guid: Guid of the RM
454 rm = self.get_resource(guid)
455 return rm.start_with_condition()
457 def deploy(self, group = None, wait_all_ready = True):
458 """ Deploy all resource manager in group
460 :param group: List of guids of RMs to deploy
463 :param wait_all_ready: Wait until all RMs are ready in
464 order to start the RMs
468 self.logger.debug(" ------- DEPLOY START ------ ")
471 # By default, if not deployment group is indicated,
472 # all RMs that are undeployed will be deployed
474 for guid in self.resources:
475 if self.state(guid) == ResourceState.NEW:
478 if isinstance(group, int):
481 # Before starting deployment we disorder the group list with the
482 # purpose of speeding up the whole deployment process.
483 # It is likely that the user inserted in the 'group' list closely
484 # resources one after another (e.g. all applications
485 # connected to the same node can likely appear one after another).
486 # This can originate a slow down in the deployment since the N
487 # threads the parallel runner uses to processes tasks may all
488 # be taken up by the same family of resources waiting for the
489 # same conditions (e.g. LinuxApplications running on a same
490 # node share a single lock, so they will tend to be serialized).
491 # If we disorder the group list, this problem can be mitigated.
492 random.shuffle(group)
494 def wait_all_and_start(group):
497 if self.state(guid) < ResourceState.READY:
502 callback = functools.partial(wait_all_and_start, group)
503 self.schedule("1s", callback)
505 # If all resources are read, we schedule the start
507 rm = self.get_resource(guid)
508 self.schedule("0s", rm.start_with_conditions)
511 # Schedule the function that will check all resources are
512 # READY, and only then it will schedule the start.
513 # This is aimed to reduce the number of tasks looping in the scheduler.
514 # Intead of having N start tasks, we will have only one
515 callback = functools.partial(wait_all_and_start, group)
516 self.schedule("1s", callback)
519 rm = self.get_resource(guid)
520 self.schedule("0s", rm.deploy)
522 if not wait_all_ready:
523 self.schedule("1s", rm.start_with_conditions)
525 if rm.conditions.get(ResourceAction.STOP):
526 # Only if the RM has STOP conditions we
527 # schedule a stop. Otherwise the RM will stop immediately
528 self.schedule("2s", rm.stop_with_conditions)
530 def release(self, group = None):
531 """ Release the elements of the list 'group' or
532 all the resources if any group is specified
534 :param group: List of RM
539 group = self.resources
543 rm = self.get_resource(guid)
544 thread = threading.Thread(target=rm.release)
545 threads.append(thread)
546 thread.setDaemon(True)
549 while list(threads) and not self.finished:
551 # Time out after 5 seconds to check EC not terminated
553 if not thread.is_alive():
554 threads.remove(thread)
557 """ Shutdown the Experiment Controller.
558 Releases all the resources and stops task processing thread
563 # Mark the EC state as TERMINATED
564 self._state = ECState.TERMINATED
566 # Notify condition to wake up the processing thread
569 if self._thread.is_alive():
572 def schedule(self, date, callback, track = False):
573 """ Schedule a callback to be executed at time date.
575 :param date: string containing execution time for the task.
576 It can be expressed as an absolute time, using
577 timestamp format, or as a relative time matching
578 ^\d+.\d+(h|m|s|ms|us)$
580 :param callback: code to be executed for the task. Must be a
581 Python function, and receives args and kwargs
584 :param track: if set to True, the task will be retrivable with
585 the get_task() method
587 :return : The Id of the task
589 timestamp = strfvalid(date)
591 task = Task(timestamp, callback)
592 task = self._scheduler.schedule(task)
595 self._tasks[task.id] = task
597 # Notify condition to wake up the processing thread
603 """ Process scheduled tasks.
607 The _process method is executed in an independent thread held by the
608 ExperimentController for as long as the experiment is running.
610 Tasks are scheduled by invoking the schedule method with a target callback.
611 The schedule method is given a execution time which controls the
612 order in which tasks are processed.
614 Tasks are processed in parallel using multithreading.
615 The environmental variable NEPI_NTHREADS can be used to control
616 the number of threads used to process tasks. The default value is 50.
620 To execute tasks in parallel, an ParallelRunner (PR) object, holding
621 a pool of threads (workers), is used.
622 For each available thread in the PR, the next task popped from
623 the scheduler queue is 'put' in the PR.
624 Upon receiving a task to execute, each PR worker (thread) invokes the
625 _execute method of the EC, passing the task as argument.
626 This method, calls task.callback inside a try/except block. If an
627 exception is raised by the tasks.callback, it will be trapped by the
628 try block, logged to standard error (usually the console), and the EC
629 state will be set to ECState.FAILED.
630 The invocation of _notify immediately after, forces the processing
631 loop in the _process method, to wake up if it was blocked waiting for new
632 tasks to arrived, and to check the EC state.
633 As the EC is in FAILED state, the processing loop exits and the
634 'finally' block is invoked. In the 'finally' block, the 'sync' method
635 of the PR is invoked, which forces the PR to raise any unchecked errors
636 that might have been raised by the workers.
639 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
641 runner = ParallelRun(maxthreads = nthreads)
645 while not self.finished:
648 task = self._scheduler.next()
651 # No task to execute. Wait for a new task to be scheduled.
654 # The task timestamp is in the future. Wait for timeout
655 # or until another task is scheduled.
657 if now < task.timestamp:
658 # Calculate timeout in seconds
659 timeout = strfdiff(task.timestamp, now)
661 # Re-schedule task with the same timestamp
662 self._scheduler.schedule(task)
666 # Wait timeout or until a new task awakes the condition
667 self._cond.wait(timeout)
672 # Process tasks in parallel
673 runner.put(self._execute, task)
676 err = traceback.format_exc()
677 self.logger.error("Error while processing tasks in the EC: %s" % err)
679 self._state = ECState.FAILED
681 self.logger.debug("Exiting the task processing loop ... ")
684 def _execute(self, task):
685 """ Executes a single task.
687 :param task: Object containing the callback to execute
692 If the invokation of the task callback raises an
693 exception, the processing thread of the ExperimentController
694 will be stopped and the experiment will be aborted.
698 task.status = TaskStatus.DONE
701 task.result = task.callback()
704 err = traceback.format_exc()
706 task.status = TaskStatus.ERROR
708 self.logger.error("Error occurred while executing task: %s" % err)
710 # Set the EC to FAILED state (this will force to exit the task
712 self._state = ECState.FAILED
714 # Notify condition to wake up the processing thread
717 # Propage error to the ParallelRunner
721 """ Awakes the processing thread in case it is blocked waiting
722 for a new task to be scheduled.