2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
30 # TODO: use multiprocessing instead of threading
31 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
42 class FailureLevel(object):
43 """ Describes the system failure state """
48 class FailureManager(object):
49 """ The FailureManager is responsible for handling errors
50 and deciding whether an experiment should be aborted or not
54 def __init__(self, ec):
55 self._ec = weakref.ref(ec)
56 self._failure_level = FailureLevel.OK
61 """ Returns the ExperimentController associated to this FailureManager
71 def eval_failure(self, guid):
72 if self._failure_level == FailureLevel.OK:
73 rm = self.ec.get_resource(guid)
75 critical = rm.get("critical")
77 if state == ResourceState.FAILED and critical:
78 self._failure_level = FailureLevel.RM_FAILURE
80 self.ec.logger.debug("RM critical failure occurred on guid %d." \
81 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
83 def set_ec_failure(self):
84 self._failure_level = FailureLevel.EC_FAILURE
86 class ECState(object):
87 """ Possible states for an ExperimentController
94 class ExperimentController(object):
96 .. class:: Class Args :
98 :param exp_id: Human readable identifier for the experiment scenario.
103 An experiment, or scenario, is defined by a concrete set of resources,
104 behavior, configuration and interconnection of those resources.
105 The Experiment Description (ED) is a detailed representation of a
106 single experiment. It contains all the necessary information to
107 allow repeating the experiment. NEPI allows to describe
108 experiments by registering components (resources), configuring them
109 and interconnecting them.
111 A same experiment (scenario) can be executed many times, generating
112 different results. We call an experiment execution (instance) a 'run'.
114 The ExperimentController (EC), is the entity responsible of
115 managing an experiment run. The same scenario can be
116 recreated (and re-run) by instantiating an EC and recreating
117 the same experiment description.
119 In NEPI, an experiment is represented as a graph of interconnected
120 resources. A resource is a generic concept in the sense that any
121 component taking part of an experiment, whether physical of
122 virtual, is considered a resource. A resources could be a host,
123 a virtual machine, an application, a simulator, a IP address.
125 A ResourceManager (RM), is the entity responsible for managing a
126 single resource. ResourceManagers are specific to a resource
127 type (i.e. An RM to control a Linux application will not be
128 the same as the RM used to control a ns-3 simulation).
129 To support a new type of resource in NEPI, a new RM must be
130 implemented. NEPI already provides a variety of
131 RMs to control basic resources, and new can be extended from
134 Through the EC interface the user can create ResourceManagers (RMs),
135 configure them and interconnect them, to describe an experiment.
136 Describing an experiment through the EC does not run the experiment.
137 Only when the 'deploy()' method is invoked on the EC, the EC will take
138 actions to transform the 'described' experiment into a 'running' experiment.
140 While the experiment is running, it is possible to continue to
141 create/configure/connect RMs, and to deploy them to involve new
142 resources in the experiment (this is known as 'interactive' deployment).
144 An experiments in NEPI is identified by a string id,
145 which is either given by the user, or automatically generated by NEPI.
146 The purpose of this identifier is to separate files and results that
147 belong to different experiment scenarios.
148 However, since a same 'experiment' can be run many times, the experiment
149 id is not enough to identify an experiment instance (run).
150 For this reason, the ExperimentController has two identifier, the
151 exp_id, which can be re-used in different ExperimentController,
152 and the run_id, which is unique to one ExperimentController instance, and
153 is automatically generated by NEPI.
158 def load(cls, filepath, format = SFormats.XML):
159 serializer = ECSerializer()
160 ec = serializer.load(filepath)
163 def __init__(self, exp_id = None, local_dir = None, persist = False):
164 """ ExperimentController entity to model an execute a network experiment.
166 :param exp_id: Human readable name to identify the experiment
169 :param local_dir: Path to local directory where to store experiment
173 :param persist: Save an XML description of the experiment after
174 completion at local_dir
179 super(ExperimentController, self).__init__()
182 self._logger = logging.getLogger("ExperimentController")
184 # Run identifier. It identifies a concrete execution instance (run)
186 # Since a same experiment (same configuration) can be executed many
187 # times, this run_id permits to separate result files generated on
188 # different experiment executions
189 self._run_id = tsformat()
191 # Experiment identifier. Usually assigned by the user
192 # Identifies the experiment scenario (i.e. configuration,
193 # resources used, etc)
194 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
196 # Local path where to store experiment related files (results, etc)
198 local_dir = tempfile.mkdtemp()
200 self._local_dir = local_dir
201 self._exp_dir = os.path.join(local_dir, self.exp_id)
202 self._run_dir = os.path.join(self.exp_dir, self.run_id)
204 # If True persist the experiment controller in XML format, after completion
205 self._persist = persist
207 # generator of globally unique ids
208 self._guid_generator = guid.GuidGenerator()
211 self._resources = dict()
213 # Scheduler. It a queue that holds tasks scheduled for
214 # execution, and yields the next task to be executed
215 # ordered by execution and arrival time
216 self._scheduler = HeapScheduler()
221 # RM groups (for deployment)
222 self._groups = dict()
224 # generator of globally unique id for groups
225 self._group_id_generator = guid.GuidGenerator()
227 # Flag to stop processing thread
230 # Entity in charge of managing system failures
231 self._fm = FailureManager(self)
234 self._state = ECState.RUNNING
236 # The runner is a pool of threads used to parallelize
241 # Event processing thread
242 self._cond = threading.Condition()
243 self._thread = threading.Thread(target = self._process)
244 self._thread.setDaemon(True)
249 """ Returns the logger instance of the Experiment Controller
255 def failure_level(self):
256 """ Returns the level of FAILURE of th experiment
260 return self._fm._failure_level
264 """ Returns the state of the Experiment Controller
271 """ Returns the experiment id assigned by the user
278 """ Returns the experiment instance (run) identifier (automatically
286 """ Returns the number of processing nthreads used
289 return self._nthreads
293 """ Root local directory for experiment files
296 return self._local_dir
300 """ Local directory to store results and other files related to the
308 """ Local directory to store results and other files related to the
316 """ If Trie persist the ExperimentController to XML format upon completion
323 """ Returns True if the experiment has failed and should be interrupted,
327 return self._fm.abort
329 def inform_failure(self, guid):
330 """ Reports a failure in a RM to the EC for evaluation
332 :param guid: Resource id
337 return self._fm.eval_failure(guid)
339 def wait_finished(self, guids):
340 """ Blocking method that waits until all RMs in the 'guids' list
341 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
342 RELEASED ), or until a failure in the experiment occurs
345 :param guids: List of guids
353 return self.wait(guids, state = ResourceState.STOPPED,
356 def wait_started(self, guids):
357 """ Blocking method that waits until all RMs in the 'guids' list
358 have reached a state >= STARTED, or until a failure in the
359 experiment occurs (i.e. abort == True)
361 :param guids: List of guids
369 return self.wait(guids, state = ResourceState.STARTED,
372 def wait_released(self, guids):
373 """ Blocking method that waits until all RMs in the 'guids' list
374 have reached a state == RELEASED, or until the EC fails
376 :param guids: List of guids
382 return self._state == ECState.FAILED
384 return self.wait(guids, state = ResourceState.RELEASED,
387 def wait_deployed(self, guids):
388 """ Blocking method that waits until all RMs in the 'guids' list
389 have reached a state >= READY, or until a failure in the
390 experiment occurs (i.e. abort == True)
392 :param guids: List of guids
400 return self.wait(guids, state = ResourceState.READY,
403 def wait(self, guids, state, quit):
404 """ Blocking method that waits until all RMs in the 'guids' list
405 have reached a state >= 'state', or until the 'quit' callback
408 :param guids: List of guids
412 if isinstance(guids, int):
415 # Make a copy to avoid modifying the original guids list
419 # If there are no more guids to wait for
420 # or the quit function returns True, exit the loop
421 if len(guids) == 0 or quit():
424 # If a guid reached one of the target states, remove it from list
426 rm = self.get_resource(guid)
430 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
431 rm.get_rtype(), guid, rstate, state))
434 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
435 guid, rstate, state))
441 def plot(self, dirpath = None, format= PFormats.FIGURE, show = False):
442 plotter = ECPlotter()
443 fpath = plotter.plot(self, dirpath = dirpath, format= format,
447 def serialize(self, format = SFormats.XML):
448 serializer = ECSerializer()
449 sec = serializer.load(self, format = format)
452 def save(self, dirpath = None, format = SFormats.XML):
453 serializer = ECSerializer()
454 path = serializer.save(self, dirpath = None, format = format)
457 def get_task(self, tid):
458 """ Returns a task by its id
460 :param tid: Id of the task
466 return self._tasks.get(tid)
468 def get_resource(self, guid):
469 """ Returns a registered ResourceManager by its guid
471 :param guid: Id of the resource
474 :rtype: ResourceManager
477 rm = self._resources.get(guid)
480 def get_resources_by_type(self, rtype):
481 """ Returns a registered ResourceManager by its guid
483 :param rtype: Resource type
486 :rtype: list of ResourceManagers
490 for guid, rm in self._resources.iteritems():
491 if rm.get_rtype() == type:
495 def remove_resource(self, guid):
496 del self._resources[guid]
500 """ Returns the set() of guids of all the ResourceManager
502 :return: Set of all RM guids
506 keys = self._resources.keys()
510 def register_resource(self, rtype, guid = None):
511 """ Registers a new ResourceManager of type 'rtype' in the experiment
513 This method will assign a new 'guid' for the RM, if no guid
516 :param rtype: Type of the RM
519 :return: Guid of the RM
523 # Get next available guid
524 guid = self._guid_generator.next(guid)
527 rm = ResourceFactory.create(rtype, self, guid)
530 self._resources[guid] = rm
534 def get_attributes(self, guid):
535 """ Returns all the attributes of the RM with guid 'guid'
537 :param guid: Guid of the RM
540 :return: List of attributes
544 rm = self.get_resource(guid)
545 return rm.get_attributes()
547 def get_attribute(self, guid, name):
548 """ Returns the attribute 'name' of the RM with guid 'guid'
550 :param guid: Guid of the RM
553 :param name: Name of the attribute
556 :return: The attribute with name 'name'
560 rm = self.get_resource(guid)
561 return rm.get_attribute(name)
563 def register_connection(self, guid1, guid2):
564 """ Registers a connection between a RM with guid 'guid1'
565 and another RM with guid 'guid2'.
567 The order of the in which the two guids are provided is not
568 important, since the connection relationship is symmetric.
570 :param guid1: First guid to connect
571 :type guid1: ResourceManager
573 :param guid2: Second guid to connect
574 :type guid: ResourceManager
577 rm1 = self.get_resource(guid1)
578 rm2 = self.get_resource(guid2)
580 rm1.register_connection(guid2)
581 rm2.register_connection(guid1)
583 def register_condition(self, guids1, action, guids2, state,
585 """ Registers an action START, STOP or DEPLOY for all RM on list
586 guids1 to occur at time 'time' after all elements in list guids2
587 have reached state 'state'.
589 :param guids1: List of guids of RMs subjected to action
592 :param action: Action to perform (either START, STOP or DEPLOY)
593 :type action: ResourceAction
595 :param guids2: List of guids of RMs to we waited for
598 :param state: State to wait for on RMs of list guids2 (STARTED,
600 :type state: ResourceState
602 :param time: Time to wait after guids2 has reached status
606 if isinstance(guids1, int):
608 if isinstance(guids2, int):
612 rm = self.get_resource(guid1)
613 rm.register_condition(action, guids2, state, time)
615 def enable_trace(self, guid, name):
616 """ Enables a trace to be collected during the experiment run
618 :param name: Name of the trace
622 rm = self.get_resource(guid)
623 rm.enable_trace(name)
625 def trace_enabled(self, guid, name):
626 """ Returns True if the trace of name 'name' is enabled
628 :param name: Name of the trace
632 rm = self.get_resource(guid)
633 return rm.trace_enabled(name)
635 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
636 """ Returns information on a collected trace, the trace stream or
637 blocks (chunks) of the trace stream
639 :param name: Name of the trace
642 :param attr: Can be one of:
643 - TraceAttr.ALL (complete trace content),
644 - TraceAttr.STREAM (block in bytes to read starting
646 - TraceAttr.PATH (full path to the trace file),
647 - TraceAttr.SIZE (size of trace file).
650 :param block: Number of bytes to retrieve from trace, when attr is
654 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
660 rm = self.get_resource(guid)
661 return rm.trace(name, attr, block, offset)
663 def get_traces(self, guid):
664 """ Returns the list of the trace names of the RM with guid 'guid'
666 :param guid: Guid of the RM
669 :return: List of trace names
673 rm = self.get_resource(guid)
674 return rm.get_traces()
677 def discover(self, guid):
678 """ Discovers an available resource matching the criteria defined
679 by the RM with guid 'guid', and associates that resource to the RM
681 Not all RM types require (or are capable of) performing resource
682 discovery. For the RM types which are not capable of doing so,
683 invoking this method does not have any consequences.
685 :param guid: Guid of the RM
689 rm = self.get_resource(guid)
692 def provision(self, guid):
693 """ Provisions the resource associated to the RM with guid 'guid'.
695 Provisioning means making a resource 'accessible' to the user.
696 Not all RM types require (or are capable of) performing resource
697 provisioning. For the RM types which are not capable of doing so,
698 invoking this method does not have any consequences.
700 :param guid: Guid of the RM
704 rm = self.get_resource(guid)
705 return rm.provision()
707 def get(self, guid, name):
708 """ Returns the value of the attribute with name 'name' on the
711 :param guid: Guid of the RM
714 :param name: Name of the attribute
717 :return: The value of the attribute with name 'name'
720 rm = self.get_resource(guid)
723 def set(self, guid, name, value):
724 """ Modifies the value of the attribute with name 'name' on the
727 :param guid: Guid of the RM
730 :param name: Name of the attribute
733 :param value: Value of the attribute
736 rm = self.get_resource(guid)
739 def get_global(self, rtype, name):
740 """ Returns the value of the global attribute with name 'name' on the
741 RMs of rtype 'rtype'.
743 :param guid: Guid of the RM
746 :param name: Name of the attribute
749 :return: The value of the attribute with name 'name'
752 rclass = ResourceFactory.get_resource_type(rtype)
753 return rclass.get_global(name)
755 def set_global(self, rtype, name, value):
756 """ Modifies the value of the global attribute with name 'name' on the
757 RMs of with rtype 'rtype'.
759 :param guid: Guid of the RM
762 :param name: Name of the attribute
765 :param value: Value of the attribute
768 rclass = ResourceFactory.get_resource_type(rtype)
769 return rclass.set_global(name, value)
771 def state(self, guid, hr = False):
772 """ Returns the state of a resource
774 :param guid: Resource guid
777 :param hr: Human readable. Forces return of a
778 status string instead of a number
782 rm = self.get_resource(guid)
786 return ResourceState2str.get(state)
790 def stop(self, guid):
791 """ Stops the RM with guid 'guid'
793 Stopping a RM means that the resource it controls will
794 no longer take part of the experiment.
796 :param guid: Guid of the RM
800 rm = self.get_resource(guid)
803 def start(self, guid):
804 """ Starts the RM with guid 'guid'
806 Starting a RM means that the resource it controls will
807 begin taking part of the experiment.
809 :param guid: Guid of the RM
813 rm = self.get_resource(guid)
816 def get_start_time(self, guid):
817 """ Returns the start time of the RM as a timestamp """
818 rm = self.get_resource(guid)
821 def get_stop_time(self, guid):
822 """ Returns the stop time of the RM as a timestamp """
823 rm = self.get_resource(guid)
826 def get_discover_time(self, guid):
827 """ Returns the discover time of the RM as a timestamp """
828 rm = self.get_resource(guid)
829 return rm.discover_time
831 def get_provision_time(self, guid):
832 """ Returns the provision time of the RM as a timestamp """
833 rm = self.get_resource(guid)
834 return rm.provision_time
836 def get_ready_time(self, guid):
837 """ Returns the deployment time of the RM as a timestamp """
838 rm = self.get_resource(guid)
841 def get_release_time(self, guid):
842 """ Returns the release time of the RM as a timestamp """
843 rm = self.get_resource(guid)
844 return rm.release_time
846 def get_failed_time(self, guid):
847 """ Returns the time failure occured for the RM as a timestamp """
848 rm = self.get_resource(guid)
849 return rm.failed_time
851 def set_with_conditions(self, name, value, guids1, guids2, state,
853 """ Modifies the value of attribute with name 'name' on all RMs
854 on the guids1 list when time 'time' has elapsed since all
855 elements in guids2 list have reached state 'state'.
857 :param name: Name of attribute to set in RM
860 :param value: Value of attribute to set in RM
863 :param guids1: List of guids of RMs subjected to action
866 :param action: Action to register (either START or STOP)
867 :type action: ResourceAction
869 :param guids2: List of guids of RMs to we waited for
872 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
873 :type state: ResourceState
875 :param time: Time to wait after guids2 has reached status
879 if isinstance(guids1, int):
881 if isinstance(guids2, int):
885 rm = self.get_resource(guid)
886 rm.set_with_conditions(name, value, guids2, state, time)
888 def deploy(self, guids = None, wait_all_ready = True, group = None):
889 """ Deploys all ResourceManagers in the guids list.
891 If the argument 'guids' is not given, all RMs with state NEW
894 :param guids: List of guids of RMs to deploy
897 :param wait_all_ready: Wait until all RMs are ready in
898 order to start the RMs
901 :param group: Id of deployment group in which to deploy RMs
905 self.logger.debug(" ------- DEPLOY START ------ ")
908 # If no guids list was passed, all 'NEW' RMs will be deployed
910 for guid, rm in self._resources.iteritems():
911 if rm.state == ResourceState.NEW:
914 if isinstance(guids, int):
917 # Create deployment group
918 # New guids can be added to a same deployment group later on
922 group = self._group_id_generator.next()
924 if group not in self._groups:
925 self._groups[group] = []
927 self._groups[group].extend(guids)
929 def wait_all_and_start(group):
930 # Function that checks if all resources are READY
931 # before scheduling a start_with_conditions for each RM
934 # Get all guids in group
935 guids = self._groups[group]
938 if self.state(guid) < ResourceState.READY:
943 callback = functools.partial(wait_all_and_start, group)
944 self.schedule("1s", callback)
946 # If all resources are ready, we schedule the start
948 rm = self.get_resource(guid)
949 self.schedule("0s", rm.start_with_conditions)
951 if rm.conditions.get(ResourceAction.STOP):
952 # Only if the RM has STOP conditions we
953 # schedule a stop. Otherwise the RM will stop immediately
954 self.schedule("0s", rm.stop_with_conditions)
956 if wait_all_ready and new_group:
957 # Schedule a function to check that all resources are
958 # READY, and only then schedule the start.
959 # This aims at reducing the number of tasks looping in the
961 # Instead of having many start tasks, we will have only one for
963 callback = functools.partial(wait_all_and_start, group)
964 self.schedule("0s", callback)
967 rm = self.get_resource(guid)
968 rm.deployment_group = group
969 self.schedule("0s", rm.deploy_with_conditions)
971 if not wait_all_ready:
972 self.schedule("0s", rm.start_with_conditions)
974 if rm.conditions.get(ResourceAction.STOP):
975 # Only if the RM has STOP conditions we
976 # schedule a stop. Otherwise the RM will stop immediately
977 self.schedule("0s", rm.stop_with_conditions)
979 def release(self, guids = None):
980 """ Releases all ResourceManagers in the guids list.
982 If the argument 'guids' is not given, all RMs registered
983 in the experiment are released.
985 :param guids: List of RM guids
989 if isinstance(guids, int):
993 guids = self.resources
996 rm = self.get_resource(guid)
997 self.schedule("0s", rm.release)
999 self.wait_released(guids)
1002 self.save(dirpath = self.run_dir)
1005 if self.get(guid, "hardRelease"):
1006 self.remove_resource(guid)
1009 """ Releases all resources and stops the ExperimentController
1012 # If there was a major failure we can't exit gracefully
1013 if self._state == ECState.FAILED:
1014 raise RuntimeError("EC failure. Can not exit gracefully")
1016 # Remove all pending tasks from the scheduler queue
1017 for tid in list(self._scheduler.pending):
1018 self._scheduler.remove(tid)
1020 # Remove pending tasks from the workers queue
1021 self._runner.empty()
1025 # Mark the EC state as TERMINATED
1026 self._state = ECState.TERMINATED
1028 # Stop processing thread
1031 # Notify condition to wake up the processing thread
1034 if self._thread.is_alive():
1037 def schedule(self, date, callback, track = False):
1038 """ Schedules a callback to be executed at time 'date'.
1040 :param date: string containing execution time for the task.
1041 It can be expressed as an absolute time, using
1042 timestamp format, or as a relative time matching
1043 ^\d+.\d+(h|m|s|ms|us)$
1045 :param callback: code to be executed for the task. Must be a
1046 Python function, and receives args and kwargs
1049 :param track: if set to True, the task will be retrievable with
1050 the get_task() method
1052 :return : The Id of the task
1056 timestamp = stabsformat(date)
1057 task = Task(timestamp, callback)
1058 task = self._scheduler.schedule(task)
1061 self._tasks[task.id] = task
1063 # Notify condition to wake up the processing thread
1069 """ Process scheduled tasks.
1073 Tasks are scheduled by invoking the schedule method with a target
1074 callback and an execution time.
1075 The schedule method creates a new Task object with that callback
1076 and execution time, and pushes it into the '_scheduler' queue.
1077 The execution time and the order of arrival of tasks are used
1078 to order the tasks in the queue.
1080 The _process method is executed in an independent thread held by
1081 the ExperimentController for as long as the experiment is running.
1082 This method takes tasks from the '_scheduler' queue in a loop
1083 and processes them in parallel using multithreading.
1084 The environmental variable NEPI_NTHREADS can be used to control
1085 the number of threads used to process tasks. The default value is
1088 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1089 This object keeps a pool of threads (workers), and a queue of tasks
1090 scheduled for 'immediate' execution.
1092 On each iteration, the '_process' loop will take the next task that
1093 is scheduled for 'future' execution from the '_scheduler' queue,
1094 and if the execution time of that task is >= to the current time,
1095 it will push that task into the PR for 'immediate execution'.
1096 As soon as a worker is free, the PR will assign the next task to
1099 Upon receiving a task to execute, each PR worker (thread) will
1100 invoke the _execute method of the EC, passing the task as
1102 The _execute method will then invoke task.callback inside a
1103 try/except block. If an exception is raised by the tasks.callback,
1104 it will be trapped by the try block, logged to standard error
1105 (usually the console), and the task will be marked as failed.
1109 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1110 self._runner = ParallelRun(maxthreads = self.nthreads)
1111 self._runner.start()
1113 while not self._stop:
1115 self._cond.acquire()
1117 task = self._scheduler.next()
1120 # No task to execute. Wait for a new task to be scheduled.
1123 # The task timestamp is in the future. Wait for timeout
1124 # or until another task is scheduled.
1126 if now < task.timestamp:
1127 # Calculate timeout in seconds
1128 timeout = tdiffsec(task.timestamp, now)
1130 # Re-schedule task with the same timestamp
1131 self._scheduler.schedule(task)
1135 # Wait timeout or until a new task awakes the condition
1136 self._cond.wait(timeout)
1138 self._cond.release()
1141 # Process tasks in parallel
1142 self._runner.put(self._execute, task)
1145 err = traceback.format_exc()
1146 self.logger.error("Error while processing tasks in the EC: %s" % err)
1148 # Set the EC to FAILED state
1149 self._state = ECState.FAILED
1151 # Set the FailureManager failure level to EC failure
1152 self._fm.set_ec_failure()
1154 self.logger.debug("Exiting the task processing loop ... ")
1157 self._runner.destroy()
1159 def _execute(self, task):
1160 """ Executes a single task.
1162 :param task: Object containing the callback to execute
1168 task.result = task.callback()
1169 task.status = TaskStatus.DONE
1172 err = traceback.format_exc()
1174 task.status = TaskStatus.ERROR
1176 self.logger.error("Error occurred while executing task: %s" % err)
1179 """ Awakes the processing thread if it is blocked waiting
1180 for new tasks to arrive
1183 self._cond.acquire()
1185 self._cond.release()