2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import strfnow, strfdiff, strfvalid
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32 ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurrs during deployment scp and ssh processes are left running behind!!
39 class ECState(object):
40 """ State of the Experiment Controller
47 class ExperimentController(object):
49 .. class:: Class Args :
51 :param exp_id: Id of the experiment
53 :param root_dir: Root directory of the experiment
57 This class is the only one used by the User. Indeed, the user "talks"
58 only with the Experiment Controller and this latter forward to
59 the different Resources Manager the order provided by the user.
63 def __init__(self, exp_id = None, root_dir = "/tmp"):
64 super(ExperimentController, self).__init__()
65 # root directory to store files
66 self._root_dir = root_dir
68 # experiment identifier given by the user
69 self._exp_id = exp_id or "nepi-exp-%s" % os.urandom(8).encode('hex')
71 # generator of globally unique ids
72 self._guid_generator = guid.GuidGenerator()
75 self._resources = dict()
78 self._scheduler = HeapScheduler()
83 # Event processing thread
84 self._cond = threading.Condition()
85 self._thread = threading.Thread(target = self._process)
86 self._thread.setDaemon(True)
90 self._state = ECState.RUNNING
93 self._logger = logging.getLogger("ExperimentController")
97 """ Return the logger of the Experiment Controller
104 """ Return the state of the Experiment Controller
111 """ Return the experiment ID
114 exp_id = self._exp_id
115 if not exp_id.startswith("nepi-"):
116 exp_id = "nepi-" + exp_id
121 """ Put the state of the Experiment Controller into a final state :
122 Either TERMINATED or FAILED
125 return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
127 def wait_finished(self, guids):
128 """ Blocking method that wait until all the RM from the 'guid' list
129 reach the state FINISHED
131 :param guids: List of guids
134 if isinstance(guids, int):
137 while not all([self.state(guid) in [ResourceState.FINISHED,
138 ResourceState.STOPPED,
139 ResourceState.FAILED] \
140 for guid in guids]) and not self.finished:
141 # We keep the sleep as large as possible to
142 # decrese the number of RM state requests
145 def get_task(self, tid):
146 """ Get a specific task
148 :param tid: Id of the task
152 return self._tasks.get(tid)
154 def get_resource(self, guid):
155 """ Get a specific Resource Manager
157 :param guid: Id of the task
159 :rtype: ResourceManager
161 return self._resources.get(guid)
165 """ Returns the list of all the Resource Manager Id
170 return self._resources.keys()
172 def register_resource(self, rtype, guid = None):
173 """ Register a Resource Manager. It creates a new 'guid', if it is not specified,
174 for the RM of type 'rtype' and add it to the list of Resources.
176 :param rtype: Type of the RM
178 :return: Id of the RM
181 # Get next available guid
182 guid = self._guid_generator.next(guid)
185 rm = ResourceFactory.create(rtype, self, guid)
188 self._resources[guid] = rm
192 def get_attributes(self, guid):
193 """ Return all the attibutes of a specific RM
195 :param guid: Guid of the RM
197 :return: List of attributes
200 rm = self.get_resource(guid)
201 return rm.get_attributes()
203 def register_connection(self, guid1, guid2):
204 """ Registers a guid1 with a guid2.
205 The declaration order is not important
207 :param guid1: First guid to connect
208 :type guid1: ResourceManager
210 :param guid2: Second guid to connect
211 :type guid: ResourceManager
213 rm1 = self.get_resource(guid1)
214 rm2 = self.get_resource(guid2)
219 def register_condition(self, group1, action, group2, state,
221 """ Registers an action START or STOP for all RM on group1 to occur
222 time 'time' after all elements in group2 reached state 'state'.
224 :param group1: List of guids of RMs subjected to action
227 :param action: Action to register (either START or STOP)
228 :type action: ResourceAction
230 :param group2: List of guids of RMs to we waited for
233 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
234 :type state: ResourceState
236 :param time: Time to wait after group2 has reached status
240 if isinstance(group1, int):
242 if isinstance(group2, int):
246 rm = self.get_resource(guid1)
247 rm.register_condition(action, group2, state, time)
249 def register_trace(self, guid, name):
252 :param name: Name of the trace
255 rm = self.get_resource(guid)
256 rm.register_trace(name)
258 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
259 """ Get information on collected trace
261 :param name: Name of the trace
264 :param attr: Can be one of:
265 - TraceAttr.ALL (complete trace content),
266 - TraceAttr.STREAM (block in bytes to read starting at offset),
267 - TraceAttr.PATH (full path to the trace file),
268 - TraceAttr.SIZE (size of trace file).
271 :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM
274 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
279 rm = self.get_resource(guid)
280 return rm.trace(name, attr, block, offset)
282 def discover(self, guid):
283 """ Discover a specific RM defined by its 'guid'
285 :param guid: Guid of the RM
289 rm = self.get_resource(guid)
292 def provision(self, guid):
293 """ Provision a specific RM defined by its 'guid'
295 :param guid: Guid of the RM
299 rm = self.get_resource(guid)
300 return rm.provision()
302 def get(self, guid, name):
303 """ Get a specific attribute 'name' from the RM 'guid'
305 :param guid: Guid of the RM
308 :param name: attribute's name
312 rm = self.get_resource(guid)
315 def set(self, guid, name, value):
316 """ Set a specific attribute 'name' from the RM 'guid'
317 with the value 'value'
319 :param guid: Guid of the RM
322 :param name: attribute's name
325 :param value: attribute's value
328 rm = self.get_resource(guid)
329 return rm.set(name, value)
331 def state(self, guid, hr = False):
332 """ Returns the state of a resource
334 :param guid: Resource guid
337 :param hr: Human readable. Forces return of a
338 status string instead of a number
342 rm = self.get_resource(guid)
344 return ResourceState2str.get(rm.state)
348 def stop(self, guid):
349 """ Stop a specific RM defined by its 'guid'
351 :param guid: Guid of the RM
355 rm = self.get_resource(guid)
358 def start(self, guid):
359 """ Start a specific RM defined by its 'guid'
361 :param guid: Guid of the RM
365 rm = self.get_resource(guid)
368 def set_with_conditions(self, name, value, group1, group2, state,
370 """ Set value 'value' on attribute with name 'name' on all RMs of
371 group1 when 'time' has elapsed since all elements in group2
372 have reached state 'state'.
374 :param name: Name of attribute to set in RM
377 :param value: Value of attribute to set in RM
380 :param group1: List of guids of RMs subjected to action
383 :param action: Action to register (either START or STOP)
384 :type action: ResourceAction
386 :param group2: List of guids of RMs to we waited for
389 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
390 :type state: ResourceState
392 :param time: Time to wait after group2 has reached status
396 if isinstance(group1, int):
398 if isinstance(group2, int):
402 rm = self.get_resource(guid)
403 rm.set_with_conditions(name, value, group2, state, time)
405 def stop_with_conditions(self, guid):
406 """ Stop a specific RM defined by its 'guid' only if all the conditions are true
408 :param guid: Guid of the RM
412 rm = self.get_resource(guid)
413 return rm.stop_with_conditions()
415 def start_with_conditions(self, guid):
416 """ Start a specific RM defined by its 'guid' only if all the conditions are true
418 :param guid: Guid of the RM
422 rm = self.get_resource(guid)
423 return rm.start_with_condition()
425 def deploy(self, group = None, wait_all_ready = True):
426 """ Deploy all resource manager in group
428 :param group: List of guids of RMs to deploy
431 :param wait_all_ready: Wait until all RMs are ready in
432 order to start the RMs
436 self.logger.debug(" ------- DEPLOY START ------ ")
439 group = self.resources
441 if isinstance(group, int):
444 # Before starting deployment we disorder the group list with the
445 # purpose of speeding up the whole deployment process.
446 # It is likely that the user inserted in the 'group' list closely
447 # resources one after another (e.g. all applications
448 # connected to the same node can likely appear one after another).
449 # This can originate a slow down in the deployment since the N
450 # threads the parallel runner uses to processes tasks may all
451 # be taken up by the same family of resources waiting for the
452 # same conditions (e.g. LinuxApplications running on a same
453 # node share a single lock, so they will tend to be serialized).
454 # If we disorder the group list, this problem can be mitigated.
455 #random.shuffle(group)
457 def wait_all_and_start(group):
460 rm = self.get_resource(guid)
461 if rm.state < ResourceState.READY:
466 callback = functools.partial(wait_all_and_start, group)
467 self.schedule("1s", callback)
469 # If all resources are read, we schedule the start
471 rm = self.get_resource(guid)
472 self.schedule("0s", rm.start_with_conditions)
475 # Schedule the function that will check all resources are
476 # READY, and only then it will schedule the start.
477 # This is aimed to reduce the number of tasks looping in the scheduler.
478 # Intead of having N start tasks, we will have only one
479 callback = functools.partial(wait_all_and_start, group)
480 self.schedule("1s", callback)
483 rm = self.get_resource(guid)
484 self.schedule("0s", rm.deploy)
486 if not wait_all_ready:
487 self.schedule("1s", rm.start_with_conditions)
489 if rm.conditions.get(ResourceAction.STOP):
490 # Only if the RM has STOP conditions we
491 # schedule a stop. Otherwise the RM will stop immediately
492 self.schedule("2s", rm.stop_with_conditions)
495 def release(self, group = None):
496 """ Release the elements of the list 'group' or
497 all the resources if any group is specified
499 :param group: List of RM
504 group = self.resources
508 rm = self.get_resource(guid)
509 thread = threading.Thread(target=rm.release)
510 threads.append(thread)
511 thread.setDaemon(True)
514 while list(threads) and not self.finished:
516 # Time out after 5 seconds to check EC not terminated
518 if not thread.is_alive():
519 threads.remove(thread)
522 """ Shutdown the Experiment Controller.
523 Releases all the resources and stops task processing thread
528 # Mark the EC state as TERMINATED
529 self._state = ECState.TERMINATED
531 # Notify condition to wake up the processing thread
534 if self._thread.is_alive():
537 def schedule(self, date, callback, track = False):
538 """ Schedule a callback to be executed at time date.
540 :param date: string containing execution time for the task.
541 It can be expressed as an absolute time, using
542 timestamp format, or as a relative time matching
543 ^\d+.\d+(h|m|s|ms|us)$
545 :param callback: code to be executed for the task. Must be a
546 Python function, and receives args and kwargs
549 :param track: if set to True, the task will be retrivable with
550 the get_task() method
552 :return : The Id of the task
554 timestamp = strfvalid(date)
556 task = Task(timestamp, callback)
557 task = self._scheduler.schedule(task)
560 self._tasks[task.id] = task
562 # Notify condition to wake up the processing thread
568 """ Process scheduled tasks.
570 The _process method is executed in an independent thread held by the
571 ExperimentController for as long as the experiment is running.
573 Tasks are scheduled by invoking the schedule method with a target callback.
574 The schedule method is givedn a execution time which controls the
575 order in which tasks are processed.
577 Tasks are processed in parallel using multithreading.
578 The environmental variable NEPI_NTHREADS can be used to control
579 the number of threads used to process tasks. The default value is 50.
582 nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
584 runner = ParallelRun(maxthreads = nthreads)
588 while not self.finished:
590 task = self._scheduler.next()
594 # It there are not tasks in the tasks queue we need to
595 # wait until a call to schedule wakes us up
600 # If the task timestamp is in the future the thread needs to wait
601 # until time elapse or until another task is scheduled
603 if now < task.timestamp:
604 # Calculate time difference in seconds
605 timeout = strfdiff(task.timestamp, now)
606 # Re-schedule task with the same timestamp
607 self._scheduler.schedule(task)
608 # Sleep until timeout or until a new task awakes the condition
610 self._cond.wait(timeout)
613 # Process tasks in parallel
614 runner.put(self._execute, task)
617 err = traceback.format_exc()
618 self._logger.error("Error while processing tasks in the EC: %s" % err)
620 self._state = ECState.FAILED
624 def _execute(self, task):
625 """ Executes a single task.
627 If the invokation of the task callback raises an
628 exception, the processing thread of the ExperimentController
629 will be stopped and the experiment will be aborted.
631 :param task: Object containing the callback to execute
636 task.status = TaskStatus.DONE
639 task.result = task.callback()
642 err = traceback.format_exc()
644 task.status = TaskStatus.ERROR
646 self._logger.error("Error occurred while executing task: %s" % err)
648 # Set the EC to FAILED state (this will force to exit the task
650 self._state = ECState.FAILED
652 # Notify condition to wake up the processing thread
655 # Propage error to the ParallelRunner
659 """ Awakes the processing thread in case it is blocked waiting
660 for a new task to be scheduled.