2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Human readable identifier for the experiment.
52 It will be used in the name of the directory
53 where experiment related information is stored
56 :param root_dir: Root directory where experiment specific folder
57 will be created to store experiment information
61 The ExperimentController (EC), is the entity responsible for
62 managing a single experiment.
63 Through the EC interface the user can create ResourceManagers (RMs),
64 configure them and interconnect them, in order to describe the experiment.
66 Only when the 'deploy()' method is invoked, the EC will take actions
67 to transform the 'described' experiment into a 'running' experiment.
69 While the experiment is running, it is possible to continue to
70 create/configure/connect RMs, and to deploy them to involve new
71 resources in the experiment.
75 def __init__(self, exp_id = None, root_dir = "/tmp"):
76 super(ExperimentController, self).__init__()
77 # root directory to store files
78 self._root_dir = root_dir
80 # experiment identifier given by the user
81 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
83 # generator of globally unique ids
84 self._guid_generator = guid.GuidGenerator()
87 self._resources = dict()
90 self._scheduler = HeapScheduler()
95 # Event processing thread
96 self._cond = threading.Condition()
97 self._thread = threading.Thread(target = self._process)
98 self._thread.setDaemon(True)
102 self._state = ECState.RUNNING
105 self._logger = logging.getLogger("ExperimentController")
109 """ Return the logger of the Experiment Controller
116 """ Return the state of the Experiment Controller
123 """ Return the experiment ID
126 exp_id = self._exp_id
127 if not exp_id.startswith("nepi-"):
128 exp_id = "nepi-" + exp_id
133 """ Put the state of the Experiment Controller into a final state :
134 Either TERMINATED or FAILED
137 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
139 def wait_finished(self, guids):
140 """ Blocking method that wait until all the RM from the 'guid' list
141 reached the state FINISHED
143 :param guids: List of guids
146 return self.wait(guids)
148 def wait_started(self, guids):
149 """ Blocking method that wait until all the RM from the 'guid' list
150 reached the state STARTED
152 :param guids: List of guids
155 return self.wait(guids, states = [ResourceState.STARTED, ResourceState.FINISHED])
157 def wait(self, guids, states = [ResourceState.FINISHED]):
158 """ Blocking method that waits until all the RM from the 'guid' list
159 reached state 'state' or until a failure occurs
161 :param guids: List of guids
164 if isinstance(guids, int):
167 while not all([self.state(guid) in states for guid in guids]) and \
168 not any([self.state(guid) in [
169 ResourceState.STOPPED,
170 ResourceState.FAILED] for guid in guids]) and \
172 # We keep the sleep big to decrease the number of RM state queries
175 def get_task(self, tid):
176 """ Get a specific task
178 :param tid: Id of the task
182 return self._tasks.get(tid)
184 def get_resource(self, guid):
185 """ Get a specific Resource Manager
187 :param guid: Id of the task
189 :rtype: ResourceManager
191 return self._resources.get(guid)
195 """ Returns the list of all the Resource Manager Id
200 return self._resources.keys()
202 def register_resource(self, rtype, guid = None):
203 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
204 for the RM of type 'rtype' and add it to the list of Resources.
206 :param rtype: Type of the RM
208 :return: Id of the RM
211 # Get next available guid
212 guid = self._guid_generator.next(guid)
215 rm = ResourceFactory.create(rtype, self, guid)
218 self._resources[guid] = rm
222 def get_attributes(self, guid):
223 """ Return all the attibutes of a specific RM
225 :param guid: Guid of the RM
227 :return: List of attributes
230 rm = self.get_resource(guid)
231 return rm.get_attributes()
233 def register_connection(self, guid1, guid2):
234 """ Registers a guid1 with a guid2.
235 The declaration order is not important
237 :param guid1: First guid to connect
238 :type guid1: ResourceManager
240 :param guid2: Second guid to connect
241 :type guid: ResourceManager
243 rm1 = self.get_resource(guid1)
244 rm2 = self.get_resource(guid2)
249 def register_condition(self, group1, action, group2, state,
251 """ Registers an action START or STOP for all RM on group1 to occur
252 time 'time' after all elements in group2 reached state 'state'.
254 :param group1: List of guids of RMs subjected to action
257 :param action: Action to register (either START or STOP)
258 :type action: ResourceAction
260 :param group2: List of guids of RMs to we waited for
263 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
264 :type state: ResourceState
266 :param time: Time to wait after group2 has reached status
270 if isinstance(group1, int):
272 if isinstance(group2, int):
276 rm = self.get_resource(guid1)
277 rm.register_condition(action, group2, state, time)
279 def register_trace(self, guid, name):
282 :param name: Name of the trace
285 rm = self.get_resource(guid)
286 rm.register_trace(name)
288 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
289 """ Get information on collected trace
291 :param name: Name of the trace
294 :param attr: Can be one of:
295 - TraceAttr.ALL (complete trace content),
296 - TraceAttr.STREAM (block in bytes to read starting at offset),
297 - TraceAttr.PATH (full path to the trace file),
298 - TraceAttr.SIZE (size of trace file).
301 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
304 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
309 rm = self.get_resource(guid)
310 return rm.trace(name, attr, block, offset)
312 def discover(self, guid):
313 """ Discover a specific RM defined by its 'guid'
315 :param guid: Guid of the RM
319 rm = self.get_resource(guid)
322 def provision(self, guid):
323 """ Provision a specific RM defined by its 'guid'
325 :param guid: Guid of the RM
329 rm = self.get_resource(guid)
330 return rm.provision()
332 def get(self, guid, name):
333 """ Get a specific attribute 'name' from the RM 'guid'
335 :param guid: Guid of the RM
338 :param name: attribute's name
342 rm = self.get_resource(guid)
345 def set(self, guid, name, value):
346 """ Set a specific attribute 'name' from the RM 'guid'
347 with the value 'value'
349 :param guid: Guid of the RM
352 :param name: attribute's name
355 :param value: attribute's value
358 rm = self.get_resource(guid)
359 return rm.set(name, value)
361 def state(self, guid, hr = False):
362 """ Returns the state of a resource
364 :param guid: Resource guid
367 :param hr: Human readable. Forces return of a
368 status string instead of a number
372 rm = self.get_resource(guid)
374 return ResourceState2str.get(rm.state)
378 def stop(self, guid):
379 """ Stop a specific RM defined by its 'guid'
381 :param guid: Guid of the RM
385 rm = self.get_resource(guid)
388 def start(self, guid):
389 """ Start a specific RM defined by its 'guid'
391 :param guid: Guid of the RM
395 rm = self.get_resource(guid)
398 def set_with_conditions(self, name, value, group1, group2, state,
400 """ Set value 'value' on attribute with name 'name' on all RMs of
401 group1 when 'time' has elapsed since all elements in group2
402 have reached state 'state'.
404 :param name: Name of attribute to set in RM
407 :param value: Value of attribute to set in RM
410 :param group1: List of guids of RMs subjected to action
413 :param action: Action to register (either START or STOP)
414 :type action: ResourceAction
416 :param group2: List of guids of RMs to we waited for
419 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
420 :type state: ResourceState
422 :param time: Time to wait after group2 has reached status
426 if isinstance(group1, int):
428 if isinstance(group2, int):
432 rm = self.get_resource(guid)
433 rm.set_with_conditions(name, value, group2, state, time)
435 def stop_with_conditions(self, guid):
436 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
438 :param guid: Guid of the RM
442 rm = self.get_resource(guid)
443 return rm.stop_with_conditions()
445 def start_with_conditions(self, guid):
446 """ Start a specific RM defined by its 'guid' only if all the conditions are true
448 :param guid: Guid of the RM
452 rm = self.get_resource(guid)
453 return rm.start_with_condition()
455 def deploy(self, group = None, wait_all_ready = True):
456 """ Deploy all resource manager in group
458 :param group: List of guids of RMs to deploy
461 :param wait_all_ready: Wait until all RMs are ready in
462 order to start the RMs
466 self.logger.debug(" ------- DEPLOY START ------ ")
469 # By default, if not deployment group is indicated,
470 # all RMs that are undeployed will be deployed
472 for guid in self.resources:
473 if self.state(guid) == ResourceState.NEW:
476 if isinstance(group, int):
479 # Before starting deployment we disorder the group list with the
480 # purpose of speeding up the whole deployment process.
481 # It is likely that the user inserted in the 'group' list closely
482 # resources one after another (e.g. all applications
483 # connected to the same node can likely appear one after another).
484 # This can originate a slow down in the deployment since the N
485 # threads the parallel runner uses to processes tasks may all
486 # be taken up by the same family of resources waiting for the
487 # same conditions (e.g. LinuxApplications running on a same
488 # node share a single lock, so they will tend to be serialized).
489 # If we disorder the group list, this problem can be mitigated.
490 #random.shuffle(group)
492 def wait_all_and_start(group):
495 if self.state(guid) < ResourceState.READY:
500 callback = functools.partial(wait_all_and_start, group)
501 self.schedule("1s", callback)
503 # If all resources are read, we schedule the start
505 rm = self.get_resource(guid)
506 self.schedule("0s", rm.start_with_conditions)
509 # Schedule the function that will check all resources are
510 # READY, and only then it will schedule the start.
511 # This is aimed to reduce the number of tasks looping in the scheduler.
512 # Intead of having N start tasks, we will have only one
513 callback = functools.partial(wait_all_and_start, group)
514 self.schedule("1s", callback)
517 rm = self.get_resource(guid)
518 self.schedule("0s", rm.deploy)
520 if not wait_all_ready:
521 self.schedule("1s", rm.start_with_conditions)
523 if rm.conditions.get(ResourceAction.STOP):
524 # Only if the RM has STOP conditions we
525 # schedule a stop. Otherwise the RM will stop immediately
526 self.schedule("2s", rm.stop_with_conditions)
529 def release(self, group = None):
530 """ Release the elements of the list 'group' or
531 all the resources if any group is specified
533 :param group: List of RM
538 group = self.resources
542 rm = self.get_resource(guid)
543 thread = threading.Thread(target=rm.release)
544 threads.append(thread)
545 thread.setDaemon(True)
548 while list(threads) and not self.finished:
550 # Time out after 5 seconds to check EC not terminated
552 if not thread.is_alive():
553 threads.remove(thread)
556 """ Shutdown the Experiment Controller.
557 Releases all the resources and stops task processing thread
562 # Mark the EC state as TERMINATED
563 self._state = ECState.TERMINATED
565 # Notify condition to wake up the processing thread
568 if self._thread.is_alive():
571 def schedule(self, date, callback, track = False):
572 """ Schedule a callback to be executed at time date.
574 :param date: string containing execution time for the task.
575 It can be expressed as an absolute time, using
576 timestamp format, or as a relative time matching
577 ^\d+.\d+(h|m|s|ms|us)$
579 :param callback: code to be executed for the task. Must be a
580 Python function, and receives args and kwargs
583 :param track: if set to True, the task will be retrivable with
584 the get_task() method
586 :return : The Id of the task
588 timestamp = strfvalid(date)
590 task = Task(timestamp, callback)
591 task = self._scheduler.schedule(task)
594 self._tasks[task.id] = task
596 # Notify condition to wake up the processing thread
602 """ Process scheduled tasks.
604 The _process method is executed in an independent thread held by the
605 ExperimentController for as long as the experiment is running.
607 Tasks are scheduled by invoking the schedule method with a target callback.
608 The schedule method is given a execution time which controls the
609 order in which tasks are processed.
611 Tasks are processed in parallel using multithreading.
612 The environmental variable NEPI_NTHREADS can be used to control
613 the number of threads used to process tasks. The default value is 50.
617 To execute tasks in parallel, an ParallelRunner (PR) object, holding
618 a pool of threads (workers), is used.
619 For each available thread in the PR, the next task popped from
620 the scheduler queue is 'put' in the PR.
621 Upon receiving a task to execute, each PR worker (thread) invokes the
622 _execute method of the EC, passing the task as argument.
623 This method, calls task.callback inside a try/except block. If an
624 exception is raised by the tasks.callback, it will be trapped by the
625 try block, logged to standard error (usually the console), and the EC
626 state will be set to ECState.FAILED.
627 The invocation of _notify immediately after, forces the processing
628 loop in the _process method, to wake up if it was blocked waiting for new
629 tasks to arrived, and to check the EC state.
630 As the EC is in FAILED state, the processing loop exits and the
631 'finally' block is invoked. In the 'finally' block, the 'sync' method
632 of the PR is invoked, which forces the PR to raise any unchecked errors
633 that might have been raised by the workers.
636 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
638 runner = ParallelRun(maxthreads = nthreads)
642 while not self.finished:
644 task = self._scheduler.next()
648 # It there are not tasks in the tasks queue we need to
649 # wait until a call to schedule wakes us up
654 # If the task timestamp is in the future the thread needs to wait
655 # until time elapse or until another task is scheduled
657 if now < task.timestamp:
658 # Calculate time difference in seconds
659 timeout = strfdiff(task.timestamp, now)
660 # Re-schedule task with the same timestamp
661 self._scheduler.schedule(task)
662 # Sleep until timeout or until a new task awakes the condition
664 self._cond.wait(timeout)
667 # Process tasks in parallel
668 runner.put(self._execute, task)
671 err = traceback.format_exc()
672 self._logger.error("Error while processing tasks in the EC: %s" % err)
674 self._state = ECState.FAILED
676 self._logger.info("Exiting the task processing loop ... ")
679 def _execute(self, task):
680 """ Executes a single task.
682 If the invokation of the task callback raises an
683 exception, the processing thread of the ExperimentController
684 will be stopped and the experiment will be aborted.
686 :param task: Object containing the callback to execute
691 task.status = TaskStatus.DONE
694 task.result = task.callback()
697 err = traceback.format_exc()
699 task.status = TaskStatus.ERROR
701 self._logger.error("Error occurred while executing task: %s" % err)
703 # Set the EC to FAILED state (this will force to exit the task
705 self._state = ECState.FAILED
707 # Notify condition to wake up the processing thread
710 # Propage error to the ParallelRunner
714 """ Awakes the processing thread in case it is blocked waiting
715 for a new task to be scheduled.