2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Id of the experiment
53 :param root_dir: Root directory of the experiment
58 This class is the only one used by the User. Indeed, the user "talks"
59 only with the Experiment Controller and this latter forward to
60 the different Resources Manager the order provided by the user.
64 def __init__(self, exp_id = None, root_dir = "/tmp"):
65 super(ExperimentController, self).__init__()
66 # root directory to store files
67 self._root_dir = root_dir
69 # experiment identifier given by the user
70 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
72 # generator of globally unique ids
73 self._guid_generator = guid.GuidGenerator()
76 self._resources = dict()
79 self._scheduler = HeapScheduler()
84 # Event processing thread
85 self._cond = threading.Condition()
86 self._thread = threading.Thread(target = self._process)
87 self._thread.setDaemon(True)
91 self._state = ECState.RUNNING
94 self._logger = logging.getLogger("ExperimentController")
98 """ Return the logger of the Experiment Controller
105 """ Return the state of the Experiment Controller
112 """ Return the experiment ID
115 exp_id = self._exp_id
116 if not exp_id.startswith("nepi-"):
117 exp_id = "nepi-" + exp_id
122 """ Put the state of the Experiment Controller into a final state :
123 Either TERMINATED or FAILED
126 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
128 def wait_finished(self, guids):
129 """ Blocking method that wait until all the RM from the 'guid' list
130 reach the state FINISHED
132 :param guids: List of guids
135 if isinstance(guids, int):
138 while not all([self.state(guid) in [ResourceState.FINISHED,
139 ResourceState.STOPPED,
140 ResourceState.FAILED] \
141 for guid in guids]) and not self.finished:
142 # We keep the sleep as large as possible to
143 # decrese the number of RM state requests
146 def get_task(self, tid):
147 """ Get a specific task
149 :param tid: Id of the task
153 return self._tasks.get(tid)
155 def get_resource(self, guid):
156 """ Get a specific Resource Manager
158 :param guid: Id of the task
160 :rtype: ResourceManager
162 return self._resources.get(guid)
166 """ Returns the list of all the Resource Manager Id
170 return self._resources.keys()
172 def register_resource(self, rtype, guid = None):
173 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
174 for the RM of type 'rtype' and add it to the list of Resources.
176 :param rtype: Type of the RM
178 :return : Id of the RM
181 # Get next available guid
182 guid = self._guid_generator.next(guid)
185 rm = ResourceFactory.create(rtype, self, guid)
188 self._resources[guid] = rm
192 def get_attributes(self, guid):
193 """ Return all the attibutes of a specific RM
195 :param guid: Guid of the RM
197 :return : List of attributes
200 rm = self.get_resource(guid)
201 return rm.get_attributes()
203 def register_connection(self, guid1, guid2):
204 """ Registers a guid1 with a guid2.
205 The declaration order is not important
207 :param guid1: First guid to connect
208 :type guid1: ResourceManager
210 :param guid2: Second guid to connect
211 :type guid: ResourceManager
214 rm1 = self.get_resource(guid1)
215 rm2 = self.get_resource(guid2)
220 def register_condition(self, group1, action, group2, state,
222 """ Registers an action START or STOP for all RM on group1 to occur
223 time 'time' after all elements in group2 reached state 'state'.
225 :param group1: List of guids of RMs subjected to action
228 :param action: Action to register (either START or STOP)
229 :type action: ResourceAction
231 :param group2: List of guids of RMs to we waited for
234 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
235 :type state: ResourceState
237 :param time: Time to wait after group2 has reached status
241 if isinstance(group1, int):
243 if isinstance(group2, int):
247 rm = self.get_resource(guid1)
248 rm.register_condition(action, group2, state, time)
250 def register_trace(self, guid, name):
253 :param name: Name of the trace
256 rm = self.get_resource(guid)
257 rm.register_trace(name)
259 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
260 """ Get information on collected trace
262 :param name: Name of the trace
265 :param attr: Can be one of:
266 - TraceAttr.ALL (complete trace content),
267 - TraceAttr.STREAM (block in bytes to read starting at offset),
268 - TraceAttr.PATH (full path to the trace file),
269 - TraceAttr.SIZE (size of trace file).
272 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
275 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
280 rm = self.get_resource(guid)
281 return rm.trace(name, attr, block, offset)
283 def discover(self, guid):
284 """ Discover a specific RM defined by its 'guid'
286 :param guid: Guid of the RM
290 rm = self.get_resource(guid)
293 def provision(self, guid):
294 """ Provision a specific RM defined by its 'guid'
296 :param guid: Guid of the RM
300 rm = self.get_resource(guid)
301 return rm.provision()
303 def get(self, guid, name):
304 """ Get a specific attribute 'name' from the RM 'guid'
306 :param guid: Guid of the RM
309 :param name: attribute's name
313 rm = self.get_resource(guid)
316 def set(self, guid, name, value):
317 """ Set a specific attribute 'name' from the RM 'guid'
318 with the value 'value'
320 :param guid: Guid of the RM
323 :param name: attribute's name
326 :param value: attribute's value
329 rm = self.get_resource(guid)
330 return rm.set(name, value)
332 def state(self, guid, hr = False):
333 """ Returns the state of a resource
335 :param guid: Resource guid
338 :param hr: Human readable. Forces return of a
339 status string instead of a number
343 rm = self.get_resource(guid)
345 return ResourceState2str.get(rm.state)
349 def stop(self, guid):
350 """ Stop a specific RM defined by its 'guid'
352 :param guid: Guid of the RM
356 rm = self.get_resource(guid)
359 def start(self, guid):
360 """ Start a specific RM defined by its 'guid'
362 :param guid: Guid of the RM
366 rm = self.get_resource(guid)
369 def set_with_conditions(self, name, value, group1, group2, state,
371 """ Set value 'value' on attribute with name 'name' on all RMs of
372 group1 when 'time' has elapsed since all elements in group2
373 have reached state 'state'.
375 :param name: Name of attribute to set in RM
378 :param value: Value of attribute to set in RM
381 :param group1: List of guids of RMs subjected to action
384 :param action: Action to register (either START or STOP)
385 :type action: ResourceAction
387 :param group2: List of guids of RMs to we waited for
390 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
391 :type state: ResourceState
393 :param time: Time to wait after group2 has reached status
397 if isinstance(group1, int):
399 if isinstance(group2, int):
403 rm = self.get_resource(guid)
404 rm.set_with_conditions(name, value, group2, state, time)
406 def stop_with_conditions(self, guid):
407 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
409 :param guid: Guid of the RM
413 rm = self.get_resource(guid)
414 return rm.stop_with_conditions()
416 def start_with_conditions(self, guid):
417 """ Start a specific RM defined by its 'guid' only if all the conditions are true
419 :param guid: Guid of the RM
423 rm = self.get_resource(guid)
424 return rm.start_with_condition()
426 def deploy(self, group = None, wait_all_ready = True):
427 """ Deploy all resource manager in group
429 :param group: List of guids of RMs to deploy
432 :param wait_all_ready: Wait until all RMs are ready in
433 order to start the RMs
437 self.logger.debug(" ------- DEPLOY START ------ ")
440 group = self.resources
442 # Before starting deployment we disorder the group list with the
443 # purpose of speeding up the whole deployment process.
444 # It is likely that the user inserted in the 'group' list closely
445 # resources one after another (e.g. all applications
446 # connected to the same node can likely appear one after another).
447 # This can originate a slow down in the deployment since the N
448 # threads the parallel runner uses to processes tasks may all
449 # be taken up by the same family of resources waiting for the
450 # same conditions (e.g. LinuxApplications running on a same
451 # node share a single lock, so they will tend to be serialized).
452 # If we disorder the group list, this problem can be mitigated.
453 random.shuffle(group)
455 def wait_all_and_start(group):
458 rm = self.get_resource(guid)
459 if rm.state < ResourceState.READY:
464 callback = functools.partial(wait_all_and_start, group)
465 self.schedule("1s", callback)
467 # If all resources are read, we schedule the start
469 rm = self.get_resource(guid)
470 self.schedule("0.01s", rm.start_with_conditions)
473 # Schedule the function that will check all resources are
474 # READY, and only then it will schedule the start.
475 # This is aimed to reduce the number of tasks looping in the scheduler.
476 # Intead of having N start tasks, we will have only one
477 callback = functools.partial(wait_all_and_start, group)
478 self.schedule("1s", callback)
481 rm = self.get_resource(guid)
482 self.schedule("0.001s", rm.deploy)
484 if not wait_all_ready:
485 self.schedule("1s", rm.start_with_conditions)
487 if rm.conditions.get(ResourceAction.STOP):
488 # Only if the RM has STOP conditions we
489 # schedule a stop. Otherwise the RM will stop immediately
490 self.schedule("2s", rm.stop_with_conditions)
493 def release(self, group = None):
494 """ Release the elements of the list 'group' or
495 all the resources if any group is specified
497 :param group: List of RM
502 group = self.resources
506 rm = self.get_resource(guid)
507 thread = threading.Thread(target=rm.release)
508 threads.append(thread)
509 thread.setDaemon(True)
512 while list(threads) and not self.finished:
514 # Time out after 5 seconds to check EC not terminated
516 if not thread.is_alive():
517 threads.remove(thread)
520 """ Shutdown the Experiment Controller.
521 It means : Release all the resources and stop the scheduler
526 self._stop_scheduler()
528 if self._thread.is_alive():
531 def schedule(self, date, callback, track = False):
532 """ Schedule a callback to be executed at time date.
534 :param date: string containing execution time for the task.
535 It can be expressed as an absolute time, using
536 timestamp format, or as a relative time matching
537 ^\d+.\d+(h|m|s|ms|us)$
539 :param callback: code to be executed for the task. Must be a
540 Python function, and receives args and kwargs
543 :param track: if set to True, the task will be retrivable with
544 the get_task() method
546 :return : The Id of the task
548 timestamp = strfvalid(date)
550 task = Task(timestamp, callback)
551 task = self._scheduler.schedule(task)
554 self._tasks[task.id] = task
556 # Notify condition to wake up the processing thread
564 """ Process at executing the task that are in the scheduler.
568 runner = ParallelRun(maxthreads = 50)
572 while not self.finished:
574 task = self._scheduler.next()
578 # It there are not tasks in the tasks queue we need to
579 # wait until a call to schedule wakes us up
584 # If the task timestamp is in the future the thread needs to wait
585 # until time elapse or until another task is scheduled
587 if now < task.timestamp:
588 # Calculate time difference in seconds
589 timeout = strfdiff(task.timestamp, now)
590 # Re-schedule task with the same timestamp
591 self._scheduler.schedule(task)
592 # Sleep until timeout or until a new task awakes the condition
594 self._cond.wait(timeout)
597 # Process tasks in parallel
598 runner.put(self._execute, task)
601 err = traceback.format_exc()
602 self._logger.error("Error while processing tasks in the EC: %s" % err)
604 self._state = ECState.FAILED
606 # Mark EC state as terminated
607 if self.ecstate == ECState.RUNNING:
608 # Synchronize to get errors if occurred
610 self._state = ECState.TERMINATED
612 def _execute(self, task):
613 """ Invoke the callback of the task 'task'
615 :param task: Id of the task
620 task.status = TaskStatus.DONE
623 task.result = task.callback()
626 err = traceback.format_exc()
628 task.status = TaskStatus.ERROR
630 self._logger.error("Error occurred while executing task: %s" % err)
632 self._stop_scheduler()
634 # Propage error to the ParallelRunner
637 def _stop_scheduler(self):
638 """ Stop the scheduler and put the EC into a FAILED State.
642 # Mark the EC as failed
643 self._state = ECState.FAILED
645 # Wake up the EC in case it was sleeping