2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Id of the experiment
53 :param root_dir: Root directory of the experiment
58 This class is the only one used by the User. Indeed, the user "talks"
59 only with the Experiment Controller and this latter forward to
60 the different Resources Manager the order provided by the user.
64 def __init__(self, exp_id = None, root_dir = "/tmp"):
65 super(ExperimentController, self).__init__()
66 # root directory to store files
67 self._root_dir = root_dir
69 # experiment identifier given by the user
70 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
72 # generator of globally unique ids
73 self._guid_generator = guid.GuidGenerator()
76 self._resources = dict()
79 self._scheduler = HeapScheduler()
84 # Event processing thread
85 self._cond = threading.Condition()
86 self._thread = threading.Thread(target = self._process)
87 self._thread.setDaemon(True)
91 self._state = ECState.RUNNING
94 self._logger = logging.getLogger("ExperimentController")
98 """ Return the logger of the Experiment Controller
105 """ Return the state of the Experiment Controller
112 """ Return the experiment ID
115 exp_id = self._exp_id
116 if not exp_id.startswith("nepi-"):
117 exp_id = "nepi-" + exp_id
122 """ Put the state of the Experiment Controller into a final state :
123 Either TERMINATED or FAILED
126 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
128 def wait_finished(self, guids):
129 """ Blocking method that wait until all the RM from the 'guid' list
130 reach the state FINISHED
132 :param guids: List of guids
135 if isinstance(guids, int):
138 while not all([self.state(guid) in [ResourceState.FINISHED,
139 ResourceState.STOPPED,
140 ResourceState.FAILED] \
141 for guid in guids]) and not self.finished:
142 # We keep the sleep as large as possible to
143 # decrese the number of RM state requests
146 def get_task(self, tid):
147 """ Get a specific task
149 :param tid: Id of the task
153 return self._tasks.get(tid)
155 def get_resource(self, guid):
156 """ Get a specific Resource Manager
158 :param guid: Id of the task
160 :rtype: ResourceManager
162 return self._resources.get(guid)
166 """ Returns the list of all the Resource Manager Id
170 return self._resources.keys()
172 def register_resource(self, rtype, guid = None):
173 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
174 for the RM of type 'rtype' and add it to the list of Resources.
176 :param rtype: Type of the RM
178 :return : Id of the RM
181 # Get next available guid
182 guid = self._guid_generator.next(guid)
185 rm = ResourceFactory.create(rtype, self, guid)
188 self._resources[guid] = rm
192 def get_attributes(self, guid):
193 """ Return all the attibutes of a specific RM
195 :param guid: Guid of the RM
197 :return : List of attributes
200 rm = self.get_resource(guid)
201 return rm.get_attributes()
203 def register_connection(self, guid1, guid2):
204 """ Registers a guid1 with a guid2.
205 The declaration order is not important
207 :param guid1: First guid to connect
208 :type guid1: ResourceManager
210 :param guid2: Second guid to connect
211 :type guid: ResourceManager
214 rm1 = self.get_resource(guid1)
215 rm2 = self.get_resource(guid2)
220 def register_condition(self, group1, action, group2, state,
222 """ Registers an action START or STOP for all RM on group1 to occur
223 time 'time' after all elements in group2 reached state 'state'.
225 :param group1: List of guids of RMs subjected to action
228 :param action: Action to register (either START or STOP)
229 :type action: ResourceAction
231 :param group2: List of guids of RMs to we waited for
234 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
235 :type state: ResourceState
237 :param time: Time to wait after group2 has reached status
241 if isinstance(group1, int):
243 if isinstance(group2, int):
247 rm = self.get_resource(guid1)
248 rm.register_condition(action, group2, state, time)
250 def register_trace(self, guid, name):
253 :param name: Name of the trace
256 rm = self.get_resource(guid)
257 rm.register_trace(name)
259 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
260 """ Get information on collected trace
262 :param name: Name of the trace
265 :param attr: Can be one of:
266 - TraceAttr.ALL (complete trace content),
267 - TraceAttr.STREAM (block in bytes to read starting at offset),
268 - TraceAttr.PATH (full path to the trace file),
269 - TraceAttr.SIZE (size of trace file).
272 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
275 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
280 rm = self.get_resource(guid)
281 return rm.trace(name, attr, block, offset)
283 def discover(self, guid):
284 """ Discover a specific RM defined by its 'guid'
286 :param guid: Guid of the RM
290 rm = self.get_resource(guid)
293 def provision(self, guid):
294 """ Provision a specific RM defined by its 'guid'
296 :param guid: Guid of the RM
300 rm = self.get_resource(guid)
301 return rm.provision()
303 def get(self, guid, name):
304 """ Get a specific attribute 'name' from the RM 'guid'
306 :param guid: Guid of the RM
309 :param name: attribute's name
313 rm = self.get_resource(guid)
316 def set(self, guid, name, value):
317 """ Set a specific attribute 'name' from the RM 'guid'
318 with the value 'value'
320 :param guid: Guid of the RM
323 :param name: attribute's name
326 :param value: attribute's value
329 rm = self.get_resource(guid)
330 return rm.set(name, value)
332 def state(self, guid, hr = False):
333 """ Returns the state of a resource
335 :param guid: Resource guid
338 :param hr: Human readable. Forces return of a
339 status string instead of a number
343 rm = self.get_resource(guid)
345 return ResourceState2str.get(rm.state)
349 def stop(self, guid):
350 """ Stop a specific RM defined by its 'guid'
352 :param guid: Guid of the RM
356 rm = self.get_resource(guid)
359 def start(self, guid):
360 """ Start a specific RM defined by its 'guid'
362 :param guid: Guid of the RM
366 rm = self.get_resource(guid)
369 def set_with_conditions(self, name, value, group1, group2, state,
371 """ Set value 'value' on attribute with name 'name' on all RMs of
372 group1 when 'time' has elapsed since all elements in group2
373 have reached state 'state'.
375 :param name: Name of attribute to set in RM
378 :param value: Value of attribute to set in RM
381 :param group1: List of guids of RMs subjected to action
384 :param action: Action to register (either START or STOP)
385 :type action: ResourceAction
387 :param group2: List of guids of RMs to we waited for
390 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
391 :type state: ResourceState
393 :param time: Time to wait after group2 has reached status
397 if isinstance(group1, int):
399 if isinstance(group2, int):
403 rm = self.get_resource(guid)
404 rm.set_with_conditions(name, value, group2, state, time)
406 def stop_with_conditions(self, guid):
407 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
409 :param guid: Guid of the RM
413 rm = self.get_resource(guid)
414 return rm.stop_with_conditions()
416 def start_with_conditions(self, guid):
417 """ Start a specific RM defined by its 'guid' only if all the conditions are true
419 :param guid: Guid of the RM
423 rm = self.get_resource(guid)
424 return rm.start_with_condition()
426 def deploy(self, group = None, wait_all_ready = True):
427 """ Deploy all resource manager in group
429 :param group: List of guids of RMs to deploy
432 :param wait_all_ready: Wait until all RMs are ready in
433 order to start the RMs
437 self.logger.debug(" ------- DEPLOY START ------ ")
440 group = self.resources
442 if isinstance(group, int):
445 # Before starting deployment we disorder the group list with the
446 # purpose of speeding up the whole deployment process.
447 # It is likely that the user inserted in the 'group' list closely
448 # resources one after another (e.g. all applications
449 # connected to the same node can likely appear one after another).
450 # This can originate a slow down in the deployment since the N
451 # threads the parallel runner uses to processes tasks may all
452 # be taken up by the same family of resources waiting for the
453 # same conditions (e.g. LinuxApplications running on a same
454 # node share a single lock, so they will tend to be serialized).
455 # If we disorder the group list, this problem can be mitigated.
456 #random.shuffle(group)
458 def wait_all_and_start(group):
461 rm = self.get_resource(guid)
462 if rm.state < ResourceState.READY:
467 callback = functools.partial(wait_all_and_start, group)
468 self.schedule("1s", callback)
470 # If all resources are read, we schedule the start
472 rm = self.get_resource(guid)
473 self.schedule("0s", rm.start_with_conditions)
476 # Schedule the function that will check all resources are
477 # READY, and only then it will schedule the start.
478 # This is aimed to reduce the number of tasks looping in the scheduler.
479 # Intead of having N start tasks, we will have only one
480 callback = functools.partial(wait_all_and_start, group)
481 self.schedule("1s", callback)
484 rm = self.get_resource(guid)
485 self.schedule("0s", rm.deploy)
487 if not wait_all_ready:
488 self.schedule("1s", rm.start_with_conditions)
490 if rm.conditions.get(ResourceAction.STOP):
491 # Only if the RM has STOP conditions we
492 # schedule a stop. Otherwise the RM will stop immediately
493 self.schedule("2s", rm.stop_with_conditions)
496 def release(self, group = None):
497 """ Release the elements of the list 'group' or
498 all the resources if any group is specified
500 :param group: List of RM
505 group = self.resources
509 rm = self.get_resource(guid)
510 thread = threading.Thread(target=rm.release)
511 threads.append(thread)
512 thread.setDaemon(True)
515 while list(threads) and not self.finished:
517 # Time out after 5 seconds to check EC not terminated
519 if not thread.is_alive():
520 threads.remove(thread)
523 """ Shutdown the Experiment Controller.
524 Releases all the resources and stops task processing thread
529 # Mark the EC state as TERMINATED
530 self._state = ECState.TERMINATED
532 # Notify condition to wake up the processing thread
535 if self._thread.is_alive():
538 def schedule(self, date, callback, track = False):
539 """ Schedule a callback to be executed at time date.
541 :param date: string containing execution time for the task.
542 It can be expressed as an absolute time, using
543 timestamp format, or as a relative time matching
544 ^\d+.\d+(h|m|s|ms|us)$
546 :param callback: code to be executed for the task. Must be a
547 Python function, and receives args and kwargs
550 :param track: if set to True, the task will be retrivable with
551 the get_task() method
553 :return : The Id of the task
555 timestamp = strfvalid(date)
557 task = Task(timestamp, callback)
558 task = self._scheduler.schedule(task)
561 self._tasks[task.id] = task
563 # Notify condition to wake up the processing thread
569 """ Process scheduled tasks.
571 The _process method is executed in an independent thread held by the
572 ExperimentController for as long as the experiment is running.
574 Tasks are scheduled by invoking the schedule method with a target callback.
575 The schedule method is givedn a execution time which controls the
576 order in which tasks are processed.
578 Tasks are processed in parallel using multithreading.
579 The environmental variable NEPI_NTHREADS can be used to control
580 the number of threads used to process tasks. The default value is 50.
583 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
585 runner = ParallelRun(maxthreads = nthreads)
589 while not self.finished:
591 task = self._scheduler.next()
595 # It there are not tasks in the tasks queue we need to
596 # wait until a call to schedule wakes us up
601 # If the task timestamp is in the future the thread needs to wait
602 # until time elapse or until another task is scheduled
604 if now < task.timestamp:
605 # Calculate time difference in seconds
606 timeout = strfdiff(task.timestamp, now)
607 # Re-schedule task with the same timestamp
608 self._scheduler.schedule(task)
609 # Sleep until timeout or until a new task awakes the condition
611 self._cond.wait(timeout)
614 # Process tasks in parallel
615 runner.put(self._execute, task)
618 err = traceback.format_exc()
619 self._logger.error("Error while processing tasks in the EC: %s" % err)
621 self._state = ECState.FAILED
625 def _execute(self, task):
626 """ Executes a single task.
628 If the invokation of the task callback raises an
629 exception, the processing thread of the ExperimentController
630 will be stopped and the experiment will be aborted.
632 :param task: Object containing the callback to execute
637 task.status = TaskStatus.DONE
640 task.result = task.callback()
643 err = traceback.format_exc()
645 task.status = TaskStatus.ERROR
647 self._logger.error("Error occurred while executing task: %s" % err)
649 # Set the EC to FAILED state (this will force to exit the task
651 self._state = ECState.FAILED
653 # Notify condition to wake up the processing thread
656 # Propage error to the ParallelRunner
660 """ Awakes the processing thread in case it is blocked waiting
661 for a new task to be scheduled.