2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24 ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27 from nepi.util.serializer import ECSerializer, SFormats
29 # TODO: use multiprocessing instead of threading
30 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
40 class FailureLevel(object):
41 """ Describes the system failure state """
46 class FailureManager(object):
47 """ The FailureManager is responsible for handling errors
48 and deciding whether an experiment should be aborted or not
52 def __init__(self, ec):
53 self._ec = weakref.ref(ec)
54 self._failure_level = FailureLevel.OK
59 """ Returns the ExperimentController associated to this FailureManager
69 def eval_failure(self, guid):
70 if self._failure_level == FailureLevel.OK:
71 rm = self.ec.get_resource(guid)
73 critical = rm.get("critical")
75 if state == ResourceState.FAILED and critical:
76 self._failure_level = FailureLevel.RM_FAILURE
78 self.ec.logger.debug("RM critical failure occurred on guid %d." \
79 " Setting EC FAILURE LEVEL to RM_FAILURE" % guid)
81 def set_ec_failure(self):
82 self._failure_level = FailureLevel.EC_FAILURE
84 class ECState(object):
85 """ Possible states for an ExperimentController
92 class ExperimentController(object):
94 .. class:: Class Args :
96 :param exp_id: Human readable identifier for the experiment scenario.
101 An experiment, or scenario, is defined by a concrete set of resources,
102 behavior, configuration and interconnection of those resources.
103 The Experiment Description (ED) is a detailed representation of a
104 single experiment. It contains all the necessary information to
105 allow repeating the experiment. NEPI allows to describe
106 experiments by registering components (resources), configuring them
107 and interconnecting them.
109 A same experiment (scenario) can be executed many times, generating
110 different results. We call an experiment execution (instance) a 'run'.
112 The ExperimentController (EC), is the entity responsible of
113 managing an experiment run. The same scenario can be
114 recreated (and re-run) by instantiating an EC and recreating
115 the same experiment description.
117 In NEPI, an experiment is represented as a graph of interconnected
118 resources. A resource is a generic concept in the sense that any
119 component taking part of an experiment, whether physical of
120 virtual, is considered a resource. A resources could be a host,
121 a virtual machine, an application, a simulator, a IP address.
123 A ResourceManager (RM), is the entity responsible for managing a
124 single resource. ResourceManagers are specific to a resource
125 type (i.e. An RM to control a Linux application will not be
126 the same as the RM used to control a ns-3 simulation).
127 To support a new type of resource in NEPI, a new RM must be
128 implemented. NEPI already provides a variety of
129 RMs to control basic resources, and new can be extended from
132 Through the EC interface the user can create ResourceManagers (RMs),
133 configure them and interconnect them, to describe an experiment.
134 Describing an experiment through the EC does not run the experiment.
135 Only when the 'deploy()' method is invoked on the EC, the EC will take
136 actions to transform the 'described' experiment into a 'running' experiment.
138 While the experiment is running, it is possible to continue to
139 create/configure/connect RMs, and to deploy them to involve new
140 resources in the experiment (this is known as 'interactive' deployment).
142 An experiments in NEPI is identified by a string id,
143 which is either given by the user, or automatically generated by NEPI.
144 The purpose of this identifier is to separate files and results that
145 belong to different experiment scenarios.
146 However, since a same 'experiment' can be run many times, the experiment
147 id is not enough to identify an experiment instance (run).
148 For this reason, the ExperimentController has two identifier, the
149 exp_id, which can be re-used in different ExperimentController,
150 and the run_id, which is unique to one ExperimentController instance, and
151 is automatically generated by NEPI.
156 def load(cls, path, format = SFormats.XML):
157 serializer = ECSerializer()
158 ec = serializer.load(path)
161 def __init__(self, exp_id = None):
162 super(ExperimentController, self).__init__()
165 self._logger = logging.getLogger("ExperimentController")
167 # Run identifier. It identifies a concrete execution instance (run)
169 # Since a same experiment (same configuration) can be executed many
170 # times, this run_id permits to separate result files generated on
171 # different experiment executions
172 self._run_id = tsformat()
174 # Experiment identifier. Usually assigned by the user
175 # Identifies the experiment scenario (i.e. configuration,
176 # resources used, etc)
177 self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
179 # generator of globally unique ids
180 self._guid_generator = guid.GuidGenerator()
183 self._resources = dict()
185 # Scheduler. It a queue that holds tasks scheduled for
186 # execution, and yields the next task to be executed
187 # ordered by execution and arrival time
188 self._scheduler = HeapScheduler()
193 # RM groups (for deployment)
194 self._groups = dict()
196 # generator of globally unique id for groups
197 self._group_id_generator = guid.GuidGenerator()
199 # Flag to stop processing thread
202 # Entity in charge of managing system failures
203 self._fm = FailureManager(self)
206 self._state = ECState.RUNNING
208 # The runner is a pool of threads used to parallelize
213 # Event processing thread
214 self._cond = threading.Condition()
215 self._thread = threading.Thread(target = self._process)
216 self._thread.setDaemon(True)
221 """ Returns the logger instance of the Experiment Controller
227 def failure_level(self):
228 """ Returns the level of FAILURE of th experiment
232 return self._fm._failure_level
236 """ Returns the state of the Experiment Controller
243 """ Returns the experiment id assigned by the user
250 """ Returns the experiment instance (run) identifier (automatically
258 """ Returns the number of processing nthreads used
261 return self._nthreads
266 """ Returns True if the experiment has failed and should be interrupted,
270 return self._fm.abort
272 def inform_failure(self, guid):
273 """ Reports a failure in a RM to the EC for evaluation
275 :param guid: Resource id
280 return self._fm.eval_failure(guid)
282 def wait_finished(self, guids):
283 """ Blocking method that waits until all RMs in the 'guids' list
284 have reached a state >= STOPPED (i.e. STOPPED, FAILED or
285 RELEASED ), or until a failure in the experiment occurs
288 :param guids: List of guids
296 return self.wait(guids, state = ResourceState.STOPPED,
299 def wait_started(self, guids):
300 """ Blocking method that waits until all RMs in the 'guids' list
301 have reached a state >= STARTED, or until a failure in the
302 experiment occurs (i.e. abort == True)
304 :param guids: List of guids
312 return self.wait(guids, state = ResourceState.STARTED,
315 def wait_released(self, guids):
316 """ Blocking method that waits until all RMs in the 'guids' list
317 have reached a state == RELEASED, or until the EC fails
319 :param guids: List of guids
325 return self._state == ECState.FAILED
327 return self.wait(guids, state = ResourceState.RELEASED,
330 def wait_deployed(self, guids):
331 """ Blocking method that waits until all RMs in the 'guids' list
332 have reached a state >= READY, or until a failure in the
333 experiment occurs (i.e. abort == True)
335 :param guids: List of guids
343 return self.wait(guids, state = ResourceState.READY,
346 def wait(self, guids, state, quit):
347 """ Blocking method that waits until all RMs in the 'guids' list
348 have reached a state >= 'state', or until the 'quit' callback
351 :param guids: List of guids
355 if isinstance(guids, int):
358 # Make a copy to avoid modifying the original guids list
362 # If there are no more guids to wait for
363 # or the quit function returns True, exit the loop
364 if len(guids) == 0 or quit():
367 # If a guid reached one of the target states, remove it from list
369 rm = self.get_resource(guid)
373 self.logger.debug(" %s guid %d DONE - state is %s, required is >= %s " % (
374 rm.get_rtype(), guid, rstate, state))
377 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (
378 guid, rstate, state))
384 def serialize(self, format = SFormats.XML):
385 serializer = ECSerializer()
386 sec = serializer.load(self, format = format)
389 def save(self, path, format = SFormats.XML):
390 serializer = ECSerializer()
391 path = serializer.save(self, path, format = format)
394 def get_task(self, tid):
395 """ Returns a task by its id
397 :param tid: Id of the task
403 return self._tasks.get(tid)
405 def get_resource(self, guid):
406 """ Returns a registered ResourceManager by its guid
408 :param guid: Id of the resource
411 :rtype: ResourceManager
414 rm = self._resources.get(guid)
417 def get_resources_by_type(self, rtype):
418 """ Returns a registered ResourceManager by its guid
420 :param rtype: Resource type
423 :rtype: list of ResourceManagers
427 for guid, rm in self._resources.iteritems():
428 if rm.get_rtype() == type:
432 def remove_resource(self, guid):
433 del self._resources[guid]
437 """ Returns the set() of guids of all the ResourceManager
439 :return: Set of all RM guids
443 keys = self._resources.keys()
447 def register_resource(self, rtype, guid = None):
448 """ Registers a new ResourceManager of type 'rtype' in the experiment
450 This method will assign a new 'guid' for the RM, if no guid
453 :param rtype: Type of the RM
456 :return: Guid of the RM
460 # Get next available guid
461 guid = self._guid_generator.next(guid)
464 rm = ResourceFactory.create(rtype, self, guid)
467 self._resources[guid] = rm
471 def get_attributes(self, guid):
472 """ Returns all the attributes of the RM with guid 'guid'
474 :param guid: Guid of the RM
477 :return: List of attributes
481 rm = self.get_resource(guid)
482 return rm.get_attributes()
484 def get_attribute(self, guid, name):
485 """ Returns the attribute 'name' of the RM with guid 'guid'
487 :param guid: Guid of the RM
490 :param name: Name of the attribute
493 :return: The attribute with name 'name'
497 rm = self.get_resource(guid)
498 return rm.get_attribute(name)
500 def register_connection(self, guid1, guid2):
501 """ Registers a connection between a RM with guid 'guid1'
502 and another RM with guid 'guid2'.
504 The order of the in which the two guids are provided is not
505 important, since the connection relationship is symmetric.
507 :param guid1: First guid to connect
508 :type guid1: ResourceManager
510 :param guid2: Second guid to connect
511 :type guid: ResourceManager
514 rm1 = self.get_resource(guid1)
515 rm2 = self.get_resource(guid2)
517 rm1.register_connection(guid2)
518 rm2.register_connection(guid1)
520 def register_condition(self, guids1, action, guids2, state,
522 """ Registers an action START, STOP or DEPLOY for all RM on list
523 guids1 to occur at time 'time' after all elements in list guids2
524 have reached state 'state'.
526 :param guids1: List of guids of RMs subjected to action
529 :param action: Action to perform (either START, STOP or DEPLOY)
530 :type action: ResourceAction
532 :param guids2: List of guids of RMs to we waited for
535 :param state: State to wait for on RMs of list guids2 (STARTED,
537 :type state: ResourceState
539 :param time: Time to wait after guids2 has reached status
543 if isinstance(guids1, int):
545 if isinstance(guids2, int):
549 rm = self.get_resource(guid1)
550 rm.register_condition(action, guids2, state, time)
552 def enable_trace(self, guid, name):
553 """ Enables a trace to be collected during the experiment run
555 :param name: Name of the trace
559 rm = self.get_resource(guid)
560 rm.enable_trace(name)
562 def trace_enabled(self, guid, name):
563 """ Returns True if the trace of name 'name' is enabled
565 :param name: Name of the trace
569 rm = self.get_resource(guid)
570 return rm.trace_enabled(name)
572 def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
573 """ Returns information on a collected trace, the trace stream or
574 blocks (chunks) of the trace stream
576 :param name: Name of the trace
579 :param attr: Can be one of:
580 - TraceAttr.ALL (complete trace content),
581 - TraceAttr.STREAM (block in bytes to read starting
583 - TraceAttr.PATH (full path to the trace file),
584 - TraceAttr.SIZE (size of trace file).
587 :param block: Number of bytes to retrieve from trace, when attr is
591 :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM
597 rm = self.get_resource(guid)
598 return rm.trace(name, attr, block, offset)
600 def get_traces(self, guid):
601 """ Returns the list of the trace names of the RM with guid 'guid'
603 :param guid: Guid of the RM
606 :return: List of trace names
610 rm = self.get_resource(guid)
611 return rm.get_traces()
614 def discover(self, guid):
615 """ Discovers an available resource matching the criteria defined
616 by the RM with guid 'guid', and associates that resource to the RM
618 Not all RM types require (or are capable of) performing resource
619 discovery. For the RM types which are not capable of doing so,
620 invoking this method does not have any consequences.
622 :param guid: Guid of the RM
626 rm = self.get_resource(guid)
629 def provision(self, guid):
630 """ Provisions the resource associated to the RM with guid 'guid'.
632 Provisioning means making a resource 'accessible' to the user.
633 Not all RM types require (or are capable of) performing resource
634 provisioning. For the RM types which are not capable of doing so,
635 invoking this method does not have any consequences.
637 :param guid: Guid of the RM
641 rm = self.get_resource(guid)
642 return rm.provision()
644 def get(self, guid, name):
645 """ Returns the value of the attribute with name 'name' on the
648 :param guid: Guid of the RM
651 :param name: Name of the attribute
654 :return: The value of the attribute with name 'name'
657 rm = self.get_resource(guid)
660 def set(self, guid, name, value):
661 """ Modifies the value of the attribute with name 'name' on the
664 :param guid: Guid of the RM
667 :param name: Name of the attribute
670 :param value: Value of the attribute
673 rm = self.get_resource(guid)
676 def get_global(self, rtype, name):
677 """ Returns the value of the global attribute with name 'name' on the
678 RMs of rtype 'rtype'.
680 :param guid: Guid of the RM
683 :param name: Name of the attribute
686 :return: The value of the attribute with name 'name'
689 rclass = ResourceFactory.get_resource_type(rtype)
690 return rclass.get_global(name)
692 def set_global(self, rtype, name, value):
693 """ Modifies the value of the global attribute with name 'name' on the
694 RMs of with rtype 'rtype'.
696 :param guid: Guid of the RM
699 :param name: Name of the attribute
702 :param value: Value of the attribute
705 rclass = ResourceFactory.get_resource_type(rtype)
706 return rclass.set_global(name, value)
708 def state(self, guid, hr = False):
709 """ Returns the state of a resource
711 :param guid: Resource guid
714 :param hr: Human readable. Forces return of a
715 status string instead of a number
719 rm = self.get_resource(guid)
723 return ResourceState2str.get(state)
727 def stop(self, guid):
728 """ Stops the RM with guid 'guid'
730 Stopping a RM means that the resource it controls will
731 no longer take part of the experiment.
733 :param guid: Guid of the RM
737 rm = self.get_resource(guid)
740 def start(self, guid):
741 """ Starts the RM with guid 'guid'
743 Starting a RM means that the resource it controls will
744 begin taking part of the experiment.
746 :param guid: Guid of the RM
750 rm = self.get_resource(guid)
753 def get_start_time(self, guid):
754 """ Returns the start time of the RM as a timestamp """
755 rm = self.get_resource(guid)
758 def get_stop_time(self, guid):
759 """ Returns the stop time of the RM as a timestamp """
760 rm = self.get_resource(guid)
763 def get_discover_time(self, guid):
764 """ Returns the discover time of the RM as a timestamp """
765 rm = self.get_resource(guid)
766 return rm.discover_time
768 def get_provision_time(self, guid):
769 """ Returns the provision time of the RM as a timestamp """
770 rm = self.get_resource(guid)
771 return rm.provision_time
773 def get_ready_time(self, guid):
774 """ Returns the deployment time of the RM as a timestamp """
775 rm = self.get_resource(guid)
778 def get_release_time(self, guid):
779 """ Returns the release time of the RM as a timestamp """
780 rm = self.get_resource(guid)
781 return rm.release_time
783 def get_failed_time(self, guid):
784 """ Returns the time failure occured for the RM as a timestamp """
785 rm = self.get_resource(guid)
786 return rm.failed_time
788 def set_with_conditions(self, name, value, guids1, guids2, state,
790 """ Modifies the value of attribute with name 'name' on all RMs
791 on the guids1 list when time 'time' has elapsed since all
792 elements in guids2 list have reached state 'state'.
794 :param name: Name of attribute to set in RM
797 :param value: Value of attribute to set in RM
800 :param guids1: List of guids of RMs subjected to action
803 :param action: Action to register (either START or STOP)
804 :type action: ResourceAction
806 :param guids2: List of guids of RMs to we waited for
809 :param state: State to wait for on RMs (STARTED, STOPPED, etc)
810 :type state: ResourceState
812 :param time: Time to wait after guids2 has reached status
816 if isinstance(guids1, int):
818 if isinstance(guids2, int):
822 rm = self.get_resource(guid)
823 rm.set_with_conditions(name, value, guids2, state, time)
825 def deploy(self, guids = None, wait_all_ready = True, group = None):
826 """ Deploys all ResourceManagers in the guids list.
828 If the argument 'guids' is not given, all RMs with state NEW
831 :param guids: List of guids of RMs to deploy
834 :param wait_all_ready: Wait until all RMs are ready in
835 order to start the RMs
838 :param group: Id of deployment group in which to deploy RMs
842 self.logger.debug(" ------- DEPLOY START ------ ")
845 # If no guids list was passed, all 'NEW' RMs will be deployed
847 for guid, rm in self._resources.iteritems():
848 if rm.state == ResourceState.NEW:
851 if isinstance(guids, int):
854 # Create deployment group
855 # New guids can be added to a same deployment group later on
859 group = self._group_id_generator.next()
861 if group not in self._groups:
862 self._groups[group] = []
864 self._groups[group].extend(guids)
866 def wait_all_and_start(group):
867 # Function that checks if all resources are READY
868 # before scheduling a start_with_conditions for each RM
871 # Get all guids in group
872 guids = self._groups[group]
875 if self.state(guid) < ResourceState.READY:
880 callback = functools.partial(wait_all_and_start, group)
881 self.schedule("1s", callback)
883 # If all resources are ready, we schedule the start
885 rm = self.get_resource(guid)
886 self.schedule("0s", rm.start_with_conditions)
888 if rm.conditions.get(ResourceAction.STOP):
889 # Only if the RM has STOP conditions we
890 # schedule a stop. Otherwise the RM will stop immediately
891 self.schedule("0s", rm.stop_with_conditions)
893 if wait_all_ready and new_group:
894 # Schedule a function to check that all resources are
895 # READY, and only then schedule the start.
896 # This aims at reducing the number of tasks looping in the
898 # Instead of having many start tasks, we will have only one for
900 callback = functools.partial(wait_all_and_start, group)
901 self.schedule("0s", callback)
904 rm = self.get_resource(guid)
905 rm.deployment_group = group
906 self.schedule("0s", rm.deploy_with_conditions)
908 if not wait_all_ready:
909 self.schedule("0s", rm.start_with_conditions)
911 if rm.conditions.get(ResourceAction.STOP):
912 # Only if the RM has STOP conditions we
913 # schedule a stop. Otherwise the RM will stop immediately
914 self.schedule("0s", rm.stop_with_conditions)
916 def release(self, guids = None):
917 """ Releases all ResourceManagers in the guids list.
919 If the argument 'guids' is not given, all RMs registered
920 in the experiment are released.
922 :param guids: List of RM guids
926 if isinstance(guids, int):
930 guids = self.resources
933 rm = self.get_resource(guid)
934 self.schedule("0s", rm.release)
936 self.wait_released(guids)
939 if self.get(guid, "hardRelease"):
940 self.remove_resource(guid)
943 """ Releases all resources and stops the ExperimentController
946 # If there was a major failure we can't exit gracefully
947 if self._state == ECState.FAILED:
948 raise RuntimeError("EC failure. Can not exit gracefully")
950 # Remove all pending tasks from the scheduler queue
951 for tid in list(self._scheduler.pending):
952 self._scheduler.remove(tid)
954 # Remove pending tasks from the workers queue
959 # Mark the EC state as TERMINATED
960 self._state = ECState.TERMINATED
962 # Stop processing thread
965 # Notify condition to wake up the processing thread
968 if self._thread.is_alive():
971 def schedule(self, date, callback, track = False):
972 """ Schedules a callback to be executed at time 'date'.
974 :param date: string containing execution time for the task.
975 It can be expressed as an absolute time, using
976 timestamp format, or as a relative time matching
977 ^\d+.\d+(h|m|s|ms|us)$
979 :param callback: code to be executed for the task. Must be a
980 Python function, and receives args and kwargs
983 :param track: if set to True, the task will be retrievable with
984 the get_task() method
986 :return : The Id of the task
990 timestamp = stabsformat(date)
991 task = Task(timestamp, callback)
992 task = self._scheduler.schedule(task)
995 self._tasks[task.id] = task
997 # Notify condition to wake up the processing thread
1003 """ Process scheduled tasks.
1007 Tasks are scheduled by invoking the schedule method with a target
1008 callback and an execution time.
1009 The schedule method creates a new Task object with that callback
1010 and execution time, and pushes it into the '_scheduler' queue.
1011 The execution time and the order of arrival of tasks are used
1012 to order the tasks in the queue.
1014 The _process method is executed in an independent thread held by
1015 the ExperimentController for as long as the experiment is running.
1016 This method takes tasks from the '_scheduler' queue in a loop
1017 and processes them in parallel using multithreading.
1018 The environmental variable NEPI_NTHREADS can be used to control
1019 the number of threads used to process tasks. The default value is
1022 To execute tasks in parallel, a ParallelRunner (PR) object is used.
1023 This object keeps a pool of threads (workers), and a queue of tasks
1024 scheduled for 'immediate' execution.
1026 On each iteration, the '_process' loop will take the next task that
1027 is scheduled for 'future' execution from the '_scheduler' queue,
1028 and if the execution time of that task is >= to the current time,
1029 it will push that task into the PR for 'immediate execution'.
1030 As soon as a worker is free, the PR will assign the next task to
1033 Upon receiving a task to execute, each PR worker (thread) will
1034 invoke the _execute method of the EC, passing the task as
1036 The _execute method will then invoke task.callback inside a
1037 try/except block. If an exception is raised by the tasks.callback,
1038 it will be trapped by the try block, logged to standard error
1039 (usually the console), and the task will be marked as failed.
1043 self._nthreads = int(os.environ.get("NEPI_NTHREADS", str(self._nthreads)))
1044 self._runner = ParallelRun(maxthreads = self.nthreads)
1045 self._runner.start()
1047 while not self._stop:
1049 self._cond.acquire()
1051 task = self._scheduler.next()
1054 # No task to execute. Wait for a new task to be scheduled.
1057 # The task timestamp is in the future. Wait for timeout
1058 # or until another task is scheduled.
1060 if now < task.timestamp:
1061 # Calculate timeout in seconds
1062 timeout = tdiffsec(task.timestamp, now)
1064 # Re-schedule task with the same timestamp
1065 self._scheduler.schedule(task)
1069 # Wait timeout or until a new task awakes the condition
1070 self._cond.wait(timeout)
1072 self._cond.release()
1075 # Process tasks in parallel
1076 self._runner.put(self._execute, task)
1079 err = traceback.format_exc()
1080 self.logger.error("Error while processing tasks in the EC: %s" % err)
1082 # Set the EC to FAILED state
1083 self._state = ECState.FAILED
1085 # Set the FailureManager failure level to EC failure
1086 self._fm.set_ec_failure()
1088 self.logger.debug("Exiting the task processing loop ... ")
1091 self._runner.destroy()
1093 def _execute(self, task):
1094 """ Executes a single task.
1096 :param task: Object containing the callback to execute
1102 task.result = task.callback()
1103 task.status = TaskStatus.DONE
1106 err = traceback.format_exc()
1108 task.status = TaskStatus.ERROR
1110 self.logger.error("Error occurred while executing task: %s" % err)
1113 """ Awakes the processing thread if it is blocked waiting
1114 for new tasks to arrive
1117 self._cond.acquire()
1119 self._cond.release()