2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
28 from nepi.util.plotter import ECPlotter, PFormats
30 # TODO: use multiprocessing instead of threading
31 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
41 class FailureLevel(object):
42 """ Describes the system failure state """
47 class FailureManager(object):
48 """ The FailureManager is responsible for handling errors
49 and deciding whether an experiment should be aborted or not
53 def __init__(self, ec):
54 self._ec = weakref.ref(ec)
55 self._failure_level = FailureLevel.OK
60 """ Returns the ExperimentController associated to this FailureManager
70 def eval_failure(self, guid):
71 if self._failure_level == FailureLevel.OK:
72 rm = self.ec.get_resource(guid)
74 critical = rm.get("critical")
76 if state == ResourceState.FAILED and critical:
77 self._failure_level = FailureLevel.RM_FAILURE
79 self.ec.logger.debug("RM critical failure occurred on guid %d." \
80 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
82 def set_ec_failure(self):
83 self._failure_level = FailureLevel.EC_FAILURE
85 class ECState(object):
86 """ Possible states for an ExperimentController
93 class ExperimentController(object):
95 .. class:: Class Args :
97 :param exp_id: Human readable identifier for the experiment scenario.
102 An experiment, or scenario, is defined by a concrete set of resources,
103 behavior, configuration and interconnection of those resources.
104 The Experiment Description (ED) is a detailed representation of a
105 single experiment. It contains all the necessary information to
106 allow repeating the experiment. NEPI allows to describe
107 experiments by registering components (resources), configuring them
108 and interconnecting them.
110 A same experiment (scenario) can be executed many times, generating
111 different results. We call an experiment execution (instance) a 'run'.
113 The ExperimentController (EC), is the entity responsible of
114 managing an experiment run. The same scenario can be
115 recreated (and re-run) by instantiating an EC and recreating
116 the same experiment description.
118 In NEPI, an experiment is represented as a graph of interconnected
119 resources. A resource is a generic concept in the sense that any
120 component taking part of an experiment, whether physical of
121 virtual, is considered a resource. A resources could be a host,
122 a virtual machine, an application, a simulator, a IP address.
124 A ResourceManager (RM), is the entity responsible for managing a
125 single resource. ResourceManagers are specific to a resource
126 type (i.e. An RM to control a Linux application will not be
127 the same as the RM used to control a ns-3 simulation).
128 To support a new type of resource in NEPI, a new RM must be
129 implemented. NEPI already provides a variety of
130 RMs to control basic resources, and new can be extended from
133 Through the EC interface the user can create ResourceManagers (RMs),
134 configure them and interconnect them, to describe an experiment.
135 Describing an experiment through the EC does not run the experiment.
136 Only when the 'deploy()' method is invoked on the EC, the EC will take
137 actions to transform the 'described' experiment into a 'running' experiment.
139 While the experiment is running, it is possible to continue to
140 create/configure/connect RMs, and to deploy them to involve new
141 resources in the experiment (this is known as 'interactive' deployment).
143 An experiments in NEPI is identified by a string id,
144 which is either given by the user, or automatically generated by NEPI.
145 The purpose of this identifier is to separate files and results that
146 belong to different experiment scenarios.
147 However, since a same 'experiment' can be run many times, the experiment
148 id is not enough to identify an experiment instance (run).
149 For this reason, the ExperimentController has two identifier, the
150 exp_id, which can be re-used in different ExperimentController,
151 and the run_id, which is unique to one ExperimentController instance, and
152 is automatically generated by NEPI.
157 def load(cls, path, format = SFormats.XML):
158 serializer = ECSerializer()
159 ec = serializer.load(path)
162 def __init__(self, exp_id = None):
163 super(ExperimentController, self).__init__()
166 self._logger = logging.getLogger("ExperimentController")
168 # Run identifier. It identifies a concrete execution instance (run)
170 # Since a same experiment (same configuration) can be executed many
171 # times, this run_id permits to separate result files generated on
172 # different experiment executions
173 self._run_id = tsformat()
175 # Experiment identifier. Usually assigned by the user
176 # Identifies the experiment scenario (i.e. configuration,
177 # resources used, etc)
178 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
180 # generator of globally unique ids
181 self._guid_generator = guid.GuidGenerator()
184 self._resources = dict()
186 # Scheduler. It a queue that holds tasks scheduled for
187 # execution, and yields the next task to be executed
188 # ordered by execution and arrival time
189 self._scheduler = HeapScheduler()
194 # RM groups (for deployment)
195 self._groups = dict()
197 # generator of globally unique id for groups
198 self._group_id_generator = guid.GuidGenerator()
200 # Flag to stop processing thread
203 # Entity in charge of managing system failures
204 self._fm = FailureManager(self)
207 self._state = ECState.RUNNING
209 # The runner is a pool of threads used to parallelize
214 # Event processing thread
215 self._cond = threading.Condition()
216 self._thread = threading.Thread(target = self._process)
217 self._thread.setDaemon(True)
222 """ Returns the logger instance of the Experiment Controller
228 def failure_level(self):
229 """ Returns the level of FAILURE of th experiment
233 return self._fm._failure_level
237 """ Returns the state of the Experiment Controller
244 """ Returns the experiment id assigned by the user
251 """ Returns the experiment instance (run) identifier (automatically
259 """ Returns the number of processing nthreads used
262 return self._nthreads
267 """ Returns True if the experiment has failed and should be interrupted,
271 return self._fm.abort
273 def inform_failure(self, guid):
274 """ Reports a failure in a RM to the EC for evaluation
276 :param guid: Resource id
281 return self._fm.eval_failure(guid)
283 def wait_finished(self, guids):
284 """ Blocking method that waits until all RMs in the 'guids' list
285 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
286 RELEASED ), or until a failure in the experiment occurs
289 :param guids: List of guids
297 return self.wait(guids, state = ResourceState.STOPPED,
300 def wait_started(self, guids):
301 """ Blocking method that waits until all RMs in the 'guids' list
302 have reached a state >= STARTED, or until a failure in the
303 experiment occurs (i.e. abort == True)
305 :param guids: List of guids
313 return self.wait(guids, state = ResourceState.STARTED,
316 def wait_released(self, guids):
317 """ Blocking method that waits until all RMs in the 'guids' list
318 have reached a state == RELEASED, or until the EC fails
320 :param guids: List of guids
326 return self._state == ECState.FAILED
328 return self.wait(guids, state = ResourceState.RELEASED,
331 def wait_deployed(self, guids):
332 """ Blocking method that waits until all RMs in the 'guids' list
333 have reached a state >= READY, or until a failure in the
334 experiment occurs (i.e. abort == True)
336 :param guids: List of guids
344 return self.wait(guids, state = ResourceState.READY,
347 def wait(self, guids, state, quit):
348 """ Blocking method that waits until all RMs in the 'guids' list
349 have reached a state >= 'state', or until the 'quit' callback
352 :param guids: List of guids
356 if isinstance(guids, int):
359 # Make a copy to avoid modifying the original guids list
363 # If there are no more guids to wait for
364 # or the quit function returns True, exit the loop
365 if len(guids) == 0 or quit():
368 # If a guid reached one of the target states, remove it from list
370 rm = self.get_resource(guid)
374 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
375 rm.get_rtype(), guid, rstate, state))
378 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
379 guid, rstate, state))
385 def plot(self, fpath = None, format= PFormats.FIGURE, persist = False):
386 plotter = ECPlotter()
387 fpath = plotter.plot(self, fpath = fpath, format= format,
391 def serialize(self, format = SFormats.XML):
392 serializer = ECSerializer()
393 sec = serializer.load(self, format = format)
396 def save(self, path, format = SFormats.XML):
397 serializer = ECSerializer()
398 path = serializer.save(self, path, format = format)
401 def get_task(self, tid):
402 """ Returns a task by its id
404 :param tid: Id of the task
410 return self._tasks.get(tid)
412 def get_resource(self, guid):
413 """ Returns a registered ResourceManager by its guid
415 :param guid: Id of the resource
418 :rtype: ResourceManager
421 rm = self._resources.get(guid)
424 def get_resources_by_type(self, rtype):
425 """ Returns a registered ResourceManager by its guid
427 :param rtype: Resource type
430 :rtype: list of ResourceManagers
434 for guid, rm in self._resources.iteritems():
435 if rm.get_rtype() == type:
439 def remove_resource(self, guid):
440 del self._resources[guid]
444 """ Returns the set() of guids of all the ResourceManager
446 :return: Set of all RM guids
450 keys = self._resources.keys()
454 def register_resource(self, rtype, guid = None):
455 """ Registers a new ResourceManager of type 'rtype' in the experiment
457 This method will assign a new 'guid' for the RM, if no guid
460 :param rtype: Type of the RM
463 :return: Guid of the RM
467 # Get next available guid
468 guid = self._guid_generator.next(guid)
471 rm = ResourceFactory.create(rtype, self, guid)
474 self._resources[guid] = rm
478 def get_attributes(self, guid):
479 """ Returns all the attributes of the RM with guid 'guid'
481 :param guid: Guid of the RM
484 :return: List of attributes
488 rm = self.get_resource(guid)
489 return rm.get_attributes()
491 def get_attribute(self, guid, name):
492 """ Returns the attribute 'name' of the RM with guid 'guid'
494 :param guid: Guid of the RM
497 :param name: Name of the attribute
500 :return: The attribute with name 'name'
504 rm = self.get_resource(guid)
505 return rm.get_attribute(name)
507 def register_connection(self, guid1, guid2):
508 """ Registers a connection between a RM with guid 'guid1'
509 and another RM with guid 'guid2'.
511 The order of the in which the two guids are provided is not
512 important, since the connection relationship is symmetric.
514 :param guid1: First guid to connect
515 :type guid1: ResourceManager
517 :param guid2: Second guid to connect
518 :type guid: ResourceManager
521 rm1 = self.get_resource(guid1)
522 rm2 = self.get_resource(guid2)
524 rm1.register_connection(guid2)
525 rm2.register_connection(guid1)
527 def register_condition(self, guids1, action, guids2, state,
529 """ Registers an action START, STOP or DEPLOY for all RM on list
530 guids1 to occur at time 'time' after all elements in list guids2
531 have reached state 'state'.
533 :param guids1: List of guids of RMs subjected to action
536 :param action: Action to perform (either START, STOP or DEPLOY)
537 :type action: ResourceAction
539 :param guids2: List of guids of RMs to we waited for
542 :param state: State to wait for on RMs of list guids2 (STARTED,
544 :type state: ResourceState
546 :param time: Time to wait after guids2 has reached status
550 if isinstance(guids1, int):
552 if isinstance(guids2, int):
556 rm = self.get_resource(guid1)
557 rm.register_condition(action, guids2, state, time)
559 def enable_trace(self, guid, name):
560 """ Enables a trace to be collected during the experiment run
562 :param name: Name of the trace
566 rm = self.get_resource(guid)
567 rm.enable_trace(name)
569 def trace_enabled(self, guid, name):
570 """ Returns True if the trace of name 'name' is enabled
572 :param name: Name of the trace
576 rm = self.get_resource(guid)
577 return rm.trace_enabled(name)
579 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
580 """ Returns information on a collected trace, the trace stream or
581 blocks (chunks) of the trace stream
583 :param name: Name of the trace
586 :param attr: Can be one of:
587 - TraceAttr.ALL (complete trace content),
588 - TraceAttr.STREAM (block in bytes to read starting
590 - TraceAttr.PATH (full path to the trace file),
591 - TraceAttr.SIZE (size of trace file).
594 :param block: Number of bytes to retrieve from trace, when attr is
598 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
604 rm = self.get_resource(guid)
605 return rm.trace(name, attr, block, offset)
607 def get_traces(self, guid):
608 """ Returns the list of the trace names of the RM with guid 'guid'
610 :param guid: Guid of the RM
613 :return: List of trace names
617 rm = self.get_resource(guid)
618 return rm.get_traces()
621 def discover(self, guid):
622 """ Discovers an available resource matching the criteria defined
623 by the RM with guid 'guid', and associates that resource to the RM
625 Not all RM types require (or are capable of) performing resource
626 discovery. For the RM types which are not capable of doing so,
627 invoking this method does not have any consequences.
629 :param guid: Guid of the RM
633 rm = self.get_resource(guid)
636 def provision(self, guid):
637 """ Provisions the resource associated to the RM with guid 'guid'.
639 Provisioning means making a resource 'accessible' to the user.
640 Not all RM types require (or are capable of) performing resource
641 provisioning. For the RM types which are not capable of doing so,
642 invoking this method does not have any consequences.
644 :param guid: Guid of the RM
648 rm = self.get_resource(guid)
649 return rm.provision()
651 def get(self, guid, name):
652 """ Returns the value of the attribute with name 'name' on the
655 :param guid: Guid of the RM
658 :param name: Name of the attribute
661 :return: The value of the attribute with name 'name'
664 rm = self.get_resource(guid)
667 def set(self, guid, name, value):
668 """ Modifies the value of the attribute with name 'name' on the
671 :param guid: Guid of the RM
674 :param name: Name of the attribute
677 :param value: Value of the attribute
680 rm = self.get_resource(guid)
683 def get_global(self, rtype, name):
684 """ Returns the value of the global attribute with name 'name' on the
685 RMs of rtype 'rtype'.
687 :param guid: Guid of the RM
690 :param name: Name of the attribute
693 :return: The value of the attribute with name 'name'
696 rclass = ResourceFactory.get_resource_type(rtype)
697 return rclass.get_global(name)
699 def set_global(self, rtype, name, value):
700 """ Modifies the value of the global attribute with name 'name' on the
701 RMs of with rtype 'rtype'.
703 :param guid: Guid of the RM
706 :param name: Name of the attribute
709 :param value: Value of the attribute
712 rclass = ResourceFactory.get_resource_type(rtype)
713 return rclass.set_global(name, value)
715 def state(self, guid, hr = False):
716 """ Returns the state of a resource
718 :param guid: Resource guid
721 :param hr: Human readable. Forces return of a
722 status string instead of a number
726 rm = self.get_resource(guid)
730 return ResourceState2str.get(state)
734 def stop(self, guid):
735 """ Stops the RM with guid 'guid'
737 Stopping a RM means that the resource it controls will
738 no longer take part of the experiment.
740 :param guid: Guid of the RM
744 rm = self.get_resource(guid)
747 def start(self, guid):
748 """ Starts the RM with guid 'guid'
750 Starting a RM means that the resource it controls will
751 begin taking part of the experiment.
753 :param guid: Guid of the RM
757 rm = self.get_resource(guid)
760 def get_start_time(self, guid):
761 """ Returns the start time of the RM as a timestamp """
762 rm = self.get_resource(guid)
765 def get_stop_time(self, guid):
766 """ Returns the stop time of the RM as a timestamp """
767 rm = self.get_resource(guid)
770 def get_discover_time(self, guid):
771 """ Returns the discover time of the RM as a timestamp """
772 rm = self.get_resource(guid)
773 return rm.discover_time
775 def get_provision_time(self, guid):
776 """ Returns the provision time of the RM as a timestamp """
777 rm = self.get_resource(guid)
778 return rm.provision_time
780 def get_ready_time(self, guid):
781 """ Returns the deployment time of the RM as a timestamp """
782 rm = self.get_resource(guid)
785 def get_release_time(self, guid):
786 """ Returns the release time of the RM as a timestamp """
787 rm = self.get_resource(guid)
788 return rm.release_time
790 def get_failed_time(self, guid):
791 """ Returns the time failure occured for the RM as a timestamp """
792 rm = self.get_resource(guid)
793 return rm.failed_time
795 def set_with_conditions(self, name, value, guids1, guids2, state,
797 """ Modifies the value of attribute with name 'name' on all RMs
798 on the guids1 list when time 'time' has elapsed since all
799 elements in guids2 list have reached state 'state'.
801 :param name: Name of attribute to set in RM
804 :param value: Value of attribute to set in RM
807 :param guids1: List of guids of RMs subjected to action
810 :param action: Action to register (either START or STOP)
811 :type action: ResourceAction
813 :param guids2: List of guids of RMs to we waited for
816 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
817 :type state: ResourceState
819 :param time: Time to wait after guids2 has reached status
823 if isinstance(guids1, int):
825 if isinstance(guids2, int):
829 rm = self.get_resource(guid)
830 rm.set_with_conditions(name, value, guids2, state, time)
832 def deploy(self, guids = None, wait_all_ready = True, group = None):
833 """ Deploys all ResourceManagers in the guids list.
835 If the argument 'guids' is not given, all RMs with state NEW
838 :param guids: List of guids of RMs to deploy
841 :param wait_all_ready: Wait until all RMs are ready in
842 order to start the RMs
845 :param group: Id of deployment group in which to deploy RMs
849 self.logger.debug(" ------- DEPLOY START ------ ")
852 # If no guids list was passed, all 'NEW' RMs will be deployed
854 for guid, rm in self._resources.iteritems():
855 if rm.state == ResourceState.NEW:
858 if isinstance(guids, int):
861 # Create deployment group
862 # New guids can be added to a same deployment group later on
866 group = self._group_id_generator.next()
868 if group not in self._groups:
869 self._groups[group] = []
871 self._groups[group].extend(guids)
873 def wait_all_and_start(group):
874 # Function that checks if all resources are READY
875 # before scheduling a start_with_conditions for each RM
878 # Get all guids in group
879 guids = self._groups[group]
882 if self.state(guid) < ResourceState.READY:
887 callback = functools.partial(wait_all_and_start, group)
888 self.schedule("1s", callback)
890 # If all resources are ready, we schedule the start
892 rm = self.get_resource(guid)
893 self.schedule("0s", rm.start_with_conditions)
895 if rm.conditions.get(ResourceAction.STOP):
896 # Only if the RM has STOP conditions we
897 # schedule a stop. Otherwise the RM will stop immediately
898 self.schedule("0s", rm.stop_with_conditions)
900 if wait_all_ready and new_group:
901 # Schedule a function to check that all resources are
902 # READY, and only then schedule the start.
903 # This aims at reducing the number of tasks looping in the
905 # Instead of having many start tasks, we will have only one for
907 callback = functools.partial(wait_all_and_start, group)
908 self.schedule("0s", callback)
911 rm = self.get_resource(guid)
912 rm.deployment_group = group
913 self.schedule("0s", rm.deploy_with_conditions)
915 if not wait_all_ready:
916 self.schedule("0s", rm.start_with_conditions)
918 if rm.conditions.get(ResourceAction.STOP):
919 # Only if the RM has STOP conditions we
920 # schedule a stop. Otherwise the RM will stop immediately
921 self.schedule("0s", rm.stop_with_conditions)
923 def release(self, guids = None):
924 """ Releases all ResourceManagers in the guids list.
926 If the argument 'guids' is not given, all RMs registered
927 in the experiment are released.
929 :param guids: List of RM guids
933 if isinstance(guids, int):
937 guids = self.resources
940 rm = self.get_resource(guid)
941 self.schedule("0s", rm.release)
943 self.wait_released(guids)
946 if self.get(guid, "hardRelease"):
947 self.remove_resource(guid)
950 """ Releases all resources and stops the ExperimentController
953 # If there was a major failure we can't exit gracefully
954 if self._state == ECState.FAILED:
955 raise RuntimeError("EC failure. Can not exit gracefully")
957 # Remove all pending tasks from the scheduler queue
958 for tid in list(self._scheduler.pending):
959 self._scheduler.remove(tid)
961 # Remove pending tasks from the workers queue
966 # Mark the EC state as TERMINATED
967 self._state = ECState.TERMINATED
969 # Stop processing thread
972 # Notify condition to wake up the processing thread
975 if self._thread.is_alive():
978 def schedule(self, date, callback, track = False):
979 """ Schedules a callback to be executed at time 'date'.
981 :param date: string containing execution time for the task.
982 It can be expressed as an absolute time, using
983 timestamp format, or as a relative time matching
984 ^\d+.\d+(h|m|s|ms|us)$
986 :param callback: code to be executed for the task. Must be a
987 Python function, and receives args and kwargs
990 :param track: if set to True, the task will be retrievable with
991 the get_task() method
993 :return : The Id of the task
997 timestamp = stabsformat(date)
998 task = Task(timestamp, callback)
999 task = self._scheduler.schedule(task)
1002 self._tasks[task.id] = task
1004 # Notify condition to wake up the processing thread
1010 """ Process scheduled tasks.
1014 Tasks are scheduled by invoking the schedule method with a target
1015 callback and an execution time.
1016 The schedule method creates a new Task object with that callback
1017 and execution time, and pushes it into the '_scheduler' queue.
1018 The execution time and the order of arrival of tasks are used
1019 to order the tasks in the queue.
1021 The _process method is executed in an independent thread held by
1022 the ExperimentController for as long as the experiment is running.
1023 This method takes tasks from the '_scheduler' queue in a loop
1024 and processes them in parallel using multithreading.
1025 The environmental variable NEPI_NTHREADS can be used to control
1026 the number of threads used to process tasks. The default value is
1029 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1030 This object keeps a pool of threads (workers), and a queue of tasks
1031 scheduled for 'immediate' execution.
1033 On each iteration, the '_process' loop will take the next task that
1034 is scheduled for 'future' execution from the '_scheduler' queue,
1035 and if the execution time of that task is >= to the current time,
1036 it will push that task into the PR for 'immediate execution'.
1037 As soon as a worker is free, the PR will assign the next task to
1040 Upon receiving a task to execute, each PR worker (thread) will
1041 invoke the _execute method of the EC, passing the task as
1043 The _execute method will then invoke task.callback inside a
1044 try/except block. If an exception is raised by the tasks.callback,
1045 it will be trapped by the try block, logged to standard error
1046 (usually the console), and the task will be marked as failed.
1050 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1051 self._runner = ParallelRun(maxthreads = self.nthreads)
1052 self._runner.start()
1054 while not self._stop:
1056 self._cond.acquire()
1058 task = self._scheduler.next()
1061 # No task to execute. Wait for a new task to be scheduled.
1064 # The task timestamp is in the future. Wait for timeout
1065 # or until another task is scheduled.
1067 if now < task.timestamp:
1068 # Calculate timeout in seconds
1069 timeout = tdiffsec(task.timestamp, now)
1071 # Re-schedule task with the same timestamp
1072 self._scheduler.schedule(task)
1076 # Wait timeout or until a new task awakes the condition
1077 self._cond.wait(timeout)
1079 self._cond.release()
1082 # Process tasks in parallel
1083 self._runner.put(self._execute, task)
1086 err = traceback.format_exc()
1087 self.logger.error("Error while processing tasks in the EC: %s" % err)
1089 # Set the EC to FAILED state
1090 self._state = ECState.FAILED
1092 # Set the FailureManager failure level to EC failure
1093 self._fm.set_ec_failure()
1095 self.logger.debug("Exiting the task processing loop ... ")
1098 self._runner.destroy()
1100 def _execute(self, task):
1101 """ Executes a single task.
1103 :param task: Object containing the callback to execute
1109 task.result = task.callback()
1110 task.status = TaskStatus.DONE
1113 err = traceback.format_exc()
1115 task.status = TaskStatus.ERROR
1117 self.logger.error("Error occurred while executing task: %s" % err)
1120 """ Awakes the processing thread if it is blocked waiting
1121 for new tasks to arrive
1124 self._cond.acquire()
1126 self._cond.release()