Code cleanup. Setting resource state through specific functions
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 from nepi.util import guid
21 from nepi.util.parallel import ParallelRun
22 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
23 from nepi.execution.resource import ResourceFactory, ResourceAction, \
24         ResourceState, ResourceState2str
25 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
26 from nepi.execution.trace import TraceAttr
27
28 # TODO: use multiprocessing instead of threading
29 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
30
31 import functools
32 import logging
33 import os
34 import random
35 import sys
36 import time
37 import threading
38
39 class ECState(object):
40     """ State of the Experiment Controller
41    
42     """
43     RUNNING = 1
44     FAILED = 2
45     TERMINATED = 3
46
47 class ExperimentController(object):
48     """
49     .. class:: Class Args :
50       
51         :param exp_id: Human readable identifier for the experiment scenario. 
52                        It will be used in the name of the directory 
53                         where experiment related information is stored
54         :type exp_id: str
55
56     .. note::
57
58         An experiment, or scenario, is defined by a concrete use, behavior,
59         configuration and interconnection of resources that describe a single
60         experiment case (We call this the experiment description). 
61         A same experiment (scenario) can be run many times.
62
63         The ExperimentController (EC), is the entity responsible for 
64         managing an experiment instance (run). The same scenario can be 
65         recreated (and re-run) by instantiating an EC and recreating 
66         the same experiment description. 
67
68         In NEPI, an experiment is represented as a graph of interconnected
69         resources. A resource is a generic concept in the sense that any
70         component taking part of an experiment, whether physical of
71         virtual, is considered a resource. A resources could be a host, 
72         a virtual machine, an application, a simulator, a IP address.
73
74         A ResourceManager (RM), is the entity responsible for managing a 
75         single resource. ResourceManagers are specific to a resource
76         type (i.e. An RM to control a Linux application will not be
77         the same as the RM used to control a ns-3 simulation).
78         In order for a new type of resource to be supported in NEPI
79         a new RM must be implemented. NEPI already provides different
80         RMs to control basic resources, and new can be extended from
81         the existing ones.
82
83         Through the EC interface the user can create ResourceManagers (RMs),
84         configure them and interconnect them, in order to describe an experiment.
85         Describing an experiment through the EC does not run the experiment.
86         Only when the 'deploy()' method is invoked on the EC, will the EC take 
87         actions to transform the 'described' experiment into a 'running' experiment.
88
89         While the experiment is running, it is possible to continue to
90         create/configure/connect RMs, and to deploy them to involve new
91         resources in the experiment (this is known as 'interactive' deployment).
92         
93         An experiments in NEPI is identified by a string id, 
94         which is either given by the user, or automatically generated by NEPI.  
95         The purpose of this identifier is to separate files and results that 
96         belong to different experiment scenarios. 
97         However, since a same 'experiment' can be run many times, the experiment
98         id is not enough to identify an experiment instance (run).
99         For this reason, the ExperimentController has two identifier, the 
100         exp_id, which can be re-used by different ExperimentController instances,
101         and the run_id, which unique to a ExperimentController instance, and
102         is automatically generated by NEPI.
103         
104     """
105
106     def __init__(self, exp_id = None): 
107         super(ExperimentController, self).__init__()
108         # Logging
109         self._logger = logging.getLogger("ExperimentController")
110
111         # Run identifier. It identifies a concrete instance (run) of an experiment.
112         # Since a same experiment (same configuration) can be run many times,
113         # this id permits to identify concrete exoeriment run
114         self._run_id = tsformat()
115
116         # Experiment identifier. Usually assigned by the user
117         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
118
119         # generator of globally unique ids
120         self._guid_generator = guid.GuidGenerator()
121         
122         # Resource managers
123         self._resources = dict()
124
125         # Scheduler
126         self._scheduler = HeapScheduler()
127
128         # Tasks
129         self._tasks = dict()
130
131         # RM groups 
132         self._groups = dict()
133
134         # generator of globally unique id for groups
135         self._group_id_generator = guid.GuidGenerator()
136  
137         # Event processing thread
138         self._cond = threading.Condition()
139         self._thread = threading.Thread(target = self._process)
140         self._thread.setDaemon(True)
141         self._thread.start()
142
143         # EC state
144         self._state = ECState.RUNNING
145
146     @property
147     def logger(self):
148         """ Return the logger of the Experiment Controller
149
150         """
151         return self._logger
152
153     @property
154     def ecstate(self):
155         """ Return the state of the Experiment Controller
156
157         """
158         return self._state
159
160     @property
161     def exp_id(self):
162         """ Return the experiment id assigned by the user
163
164         """
165         return self._exp_id
166
167     @property
168     def run_id(self):
169         """ Return the experiment instance (run) identifier  
170
171         """
172         return self._run_id
173
174     @property
175     def finished(self):
176         """ Put the state of the Experiment Controller into a final state :
177             Either TERMINATED or FAILED
178
179         """
180         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
181
182     def wait_finished(self, guids):
183         """ Blocking method that wait until all the RM from the 'guid' list 
184             reached the state FINISHED ( or STOPPED, FAILED or RELEASED )
185
186         :param guids: List of guids
187         :type guids: list
188         """
189         return self.wait(guids)
190
191     def wait_started(self, guids):
192         """ Blocking method that wait until all the RM from the 'guid' list 
193             reached the state STARTED ( or STOPPED, FINISHED, FAILED, RELEASED)
194
195         :param guids: List of guids
196         :type guids: list
197         """
198         return self.wait(guids, state = ResourceState.STARTED)
199
200     def wait_released(self, guids):
201         """ Blocking method that wait until all the RM from the 'guid' list 
202             reached the state RELEASED (or FAILED)
203
204         :param guids: List of guids
205         :type guids: list
206         """
207         # TODO: solve state concurrency BUG and !!!!
208         # correct waited release state to state = ResourceState.FAILED)
209         return self.wait(guids, state = ResourceState.FINISHED)
210
211     def wait_deployed(self, guids):
212         """ Blocking method that wait until all the RM from the 'guid' list 
213             reached the state READY (or any higher state)
214
215         :param guids: List of guids
216         :type guids: list
217         """
218         return self.wait(guids, state = ResourceState.READY)
219
220     def wait(self, guids, state = ResourceState.STOPPED):
221         """ Blocking method that waits until all the RM from the 'guid' list 
222             reached state 'state' or until a failure occurs
223             
224         :param guids: List of guids
225         :type guids: list
226         """
227         if isinstance(guids, int):
228             guids = [guids]
229
230         # we randomly alter the order of the guids to avoid ordering
231         # dependencies (e.g. LinuxApplication RMs runing on the same
232         # linux host will be synchronized by the LinuxNode SSH lock)
233         random.shuffle(guids)
234
235         while True:
236             # If no more guids to wait for or an error occured, then exit
237             if len(guids) == 0 or self.finished:
238                 break
239
240             # If a guid reached one of the target states, remove it from list
241             guid = guids[0]
242             rstate = self.state(guid)
243
244             if rstate >= state:
245                 guids.remove(guid)
246             else:
247                 # Debug...
248                 self.logger.debug(" WAITING FOR guid %d - state is %s, required is >= %s " % (guid,
249                     self.state(guid, hr = True), state))
250
251                 # Take the opportunity to 'refresh' the states of the RMs.
252                 # Query only the first up to N guids (not to overwhelm 
253                 # the local machine)
254                 n = 100
255                 lim = n if len(guids) > n else ( len(guids) -1 )
256                 nguids = guids[0: lim]
257
258                 # schedule state request for all guids (take advantage of
259                 # scheduler multi threading).
260                 for guid in nguids:
261                     callback = functools.partial(self.state, guid)
262                     self.schedule("0s", callback)
263
264                 # If the guid is not in one of the target states, wait and
265                 # continue quering. We keep the sleep big to decrease the
266                 # number of RM state queries
267                 time.sleep(4)
268   
269     def get_task(self, tid):
270         """ Get a specific task
271
272         :param tid: Id of the task
273         :type tid: int
274         :rtype: Task
275         """
276         return self._tasks.get(tid)
277
278     def get_resource(self, guid):
279         """ Get a specific Resource Manager
280
281         :param guid: Id of the task
282         :type guid: int
283         :rtype: ResourceManager
284         """
285         return self._resources.get(guid)
286
287     @property
288     def resources(self):
289         """ Returns the list of all the Resource Manager Id
290
291         :rtype: set
292
293         """
294         return self._resources.keys()
295
296     def register_resource(self, rtype, guid = None):
297         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
298         for the RM of type 'rtype' and add it to the list of Resources.
299
300         :param rtype: Type of the RM
301         :type rtype: str
302         :return: Id of the RM
303         :rtype: int
304         """
305         # Get next available guid
306         guid = self._guid_generator.next(guid)
307         
308         # Instantiate RM
309         rm = ResourceFactory.create(rtype, self, guid)
310
311         # Store RM
312         self._resources[guid] = rm
313
314         return guid
315
316     def get_attributes(self, guid):
317         """ Return all the attibutes of a specific RM
318
319         :param guid: Guid of the RM
320         :type guid: int
321         :return: List of attributes
322         :rtype: list
323         """
324         rm = self.get_resource(guid)
325         return rm.get_attributes()
326
327     def register_connection(self, guid1, guid2):
328         """ Registers a guid1 with a guid2. 
329             The declaration order is not important
330
331             :param guid1: First guid to connect
332             :type guid1: ResourceManager
333
334             :param guid2: Second guid to connect
335             :type guid: ResourceManager
336         """
337         rm1 = self.get_resource(guid1)
338         rm2 = self.get_resource(guid2)
339
340         rm1.register_connection(guid2)
341         rm2.register_connection(guid1)
342
343     def register_condition(self, guids1, action, guids2, state,
344             time = None):
345         """ Registers an action START or STOP for all RM on guids1 to occur 
346             time 'time' after all elements in guids2 reached state 'state'.
347
348             :param guids1: List of guids of RMs subjected to action
349             :type guids1: list
350
351             :param action: Action to register (either START or STOP)
352             :type action: ResourceAction
353
354             :param guids2: List of guids of RMs to we waited for
355             :type guids2: list
356
357             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
358             :type state: ResourceState
359
360             :param time: Time to wait after guids2 has reached status 
361             :type time: string
362
363         """
364         if isinstance(guids1, int):
365             guids1 = [guids1]
366         if isinstance(guids2, int):
367             guids2 = [guids2]
368
369         for guid1 in guids1:
370             rm = self.get_resource(guid1)
371             rm.register_condition(action, guids2, state, time)
372
373     def enable_trace(self, guid, name):
374         """ Enable trace
375
376         :param name: Name of the trace
377         :type name: str
378         """
379         rm = self.get_resource(guid)
380         rm.enable_trace(name)
381
382     def trace_enabled(self, guid, name):
383         """ Returns True if trace is enabled
384
385         :param name: Name of the trace
386         :type name: str
387         """
388         rm = self.get_resource(guid)
389         return rm.trace_enabled(name)
390
391     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
392         """ Get information on collected trace
393
394         :param name: Name of the trace
395         :type name: str
396
397         :param attr: Can be one of:
398                          - TraceAttr.ALL (complete trace content), 
399                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
400                          - TraceAttr.PATH (full path to the trace file),
401                          - TraceAttr.SIZE (size of trace file). 
402         :type attr: str
403
404         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
405         :type name: int
406
407         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
408         :type name: int
409
410         :rtype: str
411         """
412         rm = self.get_resource(guid)
413         return rm.trace(name, attr, block, offset)
414
415     def discover(self, guid):
416         """ Discover a specific RM defined by its 'guid'
417
418             :param guid: Guid of the RM
419             :type guid: int
420
421         """
422         rm = self.get_resource(guid)
423         return rm.discover()
424
425     def provision(self, guid):
426         """ Provision a specific RM defined by its 'guid'
427
428             :param guid: Guid of the RM
429             :type guid: int
430
431         """
432         rm = self.get_resource(guid)
433         return rm.provision()
434
435     def get(self, guid, name):
436         """ Get a specific attribute 'name' from the RM 'guid'
437
438             :param guid: Guid of the RM
439             :type guid: int
440
441             :param name: attribute's name
442             :type name: str
443
444         """
445         rm = self.get_resource(guid)
446         return rm.get(name)
447
448     def set(self, guid, name, value):
449         """ Set a specific attribute 'name' from the RM 'guid' 
450             with the value 'value' 
451
452             :param guid: Guid of the RM
453             :type guid: int
454
455             :param name: attribute's name
456             :type name: str
457
458             :param value: attribute's value
459
460         """
461         rm = self.get_resource(guid)
462         return rm.set(name, value)
463
464     def state(self, guid, hr = False):
465         """ Returns the state of a resource
466
467             :param guid: Resource guid
468             :type guid: integer
469
470             :param hr: Human readable. Forces return of a 
471                 status string instead of a number 
472             :type hr: boolean
473
474         """
475         rm = self.get_resource(guid)
476         state = rm.state
477
478         if hr:
479             return ResourceState2str.get(state)
480
481         return state
482
483     def stop(self, guid):
484         """ Stop a specific RM defined by its 'guid'
485
486             :param guid: Guid of the RM
487             :type guid: int
488
489         """
490         rm = self.get_resource(guid)
491         return rm.stop()
492
493     def start(self, guid):
494         """ Start a specific RM defined by its 'guid'
495
496             :param guid: Guid of the RM
497             :type guid: int
498
499         """
500         rm = self.get_resource(guid)
501         return rm.start()
502
503     def set_with_conditions(self, name, value, guids1, guids2, state,
504             time = None):
505         """ Set value 'value' on attribute with name 'name' on all RMs of
506             guids1 when 'time' has elapsed since all elements in guids2 
507             have reached state 'state'.
508
509             :param name: Name of attribute to set in RM
510             :type name: string
511
512             :param value: Value of attribute to set in RM
513             :type name: string
514
515             :param guids1: List of guids of RMs subjected to action
516             :type guids1: list
517
518             :param action: Action to register (either START or STOP)
519             :type action: ResourceAction
520
521             :param guids2: List of guids of RMs to we waited for
522             :type guids2: list
523
524             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
525             :type state: ResourceState
526
527             :param time: Time to wait after guids2 has reached status 
528             :type time: string
529
530         """
531         if isinstance(guids1, int):
532             guids1 = [guids1]
533         if isinstance(guids2, int):
534             guids2 = [guids2]
535
536         for guid1 in guids1:
537             rm = self.get_resource(guid)
538             rm.set_with_conditions(name, value, guids2, state, time)
539
540     def stop_with_conditions(self, guid):
541         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
542
543             :param guid: Guid of the RM
544             :type guid: int
545
546         """
547         rm = self.get_resource(guid)
548         return rm.stop_with_conditions()
549
550     def start_with_conditions(self, guid):
551         """ Start a specific RM defined by its 'guid' only if all the conditions are true
552
553             :param guid: Guid of the RM
554             :type guid: int
555
556         """
557         rm = self.get_resource(guid)
558         return rm.start_with_conditions()
559
560     def deploy(self, guids = None, wait_all_ready = True, group = None):
561         """ Deploy all resource manager in guids list
562
563         :param guids: List of guids of RMs to deploy
564         :type guids: list
565
566         :param wait_all_ready: Wait until all RMs are ready in
567             order to start the RMs
568         :type guid: int
569
570         :param group: Id of deployment group in which to deploy RMs
571         :type group: int
572
573         """
574         self.logger.debug(" ------- DEPLOY START ------ ")
575
576         if not guids:
577             # If no guids list was indicated, all 'NEW' RMs will be deployed
578             guids = []
579             for guid in self.resources:
580                 if self.state(guid) == ResourceState.NEW:
581                     guids.append(guid)
582                 
583         if isinstance(guids, int):
584             guids = [guids]
585
586         # Create deployment group
587         new_group = False
588         if not group:
589             new_group = True
590             group = self._group_id_generator.next(guid)
591
592         if group not in self._groups:
593             self._groups[group] = []
594
595         self._groups[group].extend(guids)
596
597         # Before starting deployment we disorder the guids list with the
598         # purpose of speeding up the whole deployment process.
599         # It is likely that the user inserted in the 'guids' list closely
600         # resources one after another (e.g. all applications
601         # connected to the same node can likely appear one after another).
602         # This can originate a slow down in the deployment since the N 
603         # threads the parallel runner uses to processes tasks may all
604         # be taken up by the same family of resources waiting for the 
605         # same conditions (e.g. LinuxApplications running on a same 
606         # node share a single lock, so they will tend to be serialized).
607         # If we disorder the guids list, this problem can be mitigated.
608         random.shuffle(guids)
609
610         def wait_all_and_start(group):
611             reschedule = False
612             
613             # Get all guids in group
614             guids = self._groups[group]
615
616             for guid in guids:
617                 if self.state(guid) < ResourceState.READY:
618                     reschedule = True
619                     break
620
621             if reschedule:
622                 callback = functools.partial(wait_all_and_start, group)
623                 self.schedule("1s", callback)
624             else:
625                 # If all resources are read, we schedule the start
626                 for guid in guids:
627                     rm = self.get_resource(guid)
628                     self.schedule("0s", rm.start_with_conditions)
629
630         if wait_all_ready and new_group:
631             # Schedule a function to check that all resources are
632             # READY, and only then schedule the start.
633             # This aimes at reducing the number of tasks looping in the 
634             # scheduler. 
635             # Intead of having N start tasks, we will have only one for 
636             # the whole group.
637             callback = functools.partial(wait_all_and_start, group)
638             self.schedule("1s", callback)
639
640         for guid in guids:
641             rm = self.get_resource(guid)
642             rm.deployment_group = group
643             self.schedule("0s", rm.deploy)
644
645             if not wait_all_ready:
646                 self.schedule("1s", rm.start_with_conditions)
647
648             if rm.conditions.get(ResourceAction.STOP):
649                 # Only if the RM has STOP conditions we
650                 # schedule a stop. Otherwise the RM will stop immediately
651                 self.schedule("2s", rm.stop_with_conditions)
652
653     def release(self, guids = None):
654         """ Release al RMs on the guids list or 
655         all the resources if no list is specified
656
657             :param guids: List of RM guids
658             :type guids: list
659
660         """
661         if not guids:
662             guids = self.resources
663
664         for guid in guids:
665             rm = self.get_resource(guid)
666             self.schedule("0s", rm.release)
667
668         self.wait_released(guids)
669         
670     def shutdown(self):
671         """ Shutdown the Experiment Controller. 
672         Releases all the resources and stops task processing thread
673
674         """
675         self.release()
676
677         # Mark the EC state as TERMINATED
678         self._state = ECState.TERMINATED
679
680         # Notify condition to wake up the processing thread
681         self._notify()
682         
683         if self._thread.is_alive():
684            self._thread.join()
685
686     def schedule(self, date, callback, track = False):
687         """ Schedule a callback to be executed at time date.
688
689             :param date: string containing execution time for the task.
690                     It can be expressed as an absolute time, using
691                     timestamp format, or as a relative time matching
692                     ^\d+.\d+(h|m|s|ms|us)$
693
694             :param callback: code to be executed for the task. Must be a
695                         Python function, and receives args and kwargs
696                         as arguments.
697
698             :param track: if set to True, the task will be retrivable with
699                     the get_task() method
700
701             :return : The Id of the task
702         """
703         timestamp = stabsformat(date)
704         task = Task(timestamp, callback)
705         task = self._scheduler.schedule(task)
706
707         if track:
708             self._tasks[task.id] = task
709
710         # Notify condition to wake up the processing thread
711         self._notify()
712
713         return task.id
714      
715     def _process(self):
716         """ Process scheduled tasks.
717
718         .. note::
719
720         The _process method is executed in an independent thread held by the 
721         ExperimentController for as long as the experiment is running.
722         
723         Tasks are scheduled by invoking the schedule method with a target callback. 
724         The schedule method is given a execution time which controls the
725         order in which tasks are processed. 
726
727         Tasks are processed in parallel using multithreading. 
728         The environmental variable NEPI_NTHREADS can be used to control
729         the number of threads used to process tasks. The default value is 50.
730
731         Exception handling:
732
733         To execute tasks in parallel, an ParallelRunner (PR) object, holding
734         a pool of threads (workers), is used.
735         For each available thread in the PR, the next task popped from 
736         the scheduler queue is 'put' in the PR.
737         Upon receiving a task to execute, each PR worker (thread) invokes the 
738         _execute method of the EC, passing the task as argument. 
739         This method, calls task.callback inside a try/except block. If an 
740         exception is raised by the tasks.callback, it will be trapped by the 
741         try block, logged to standard error (usually the console), and the EC 
742         state will be set to ECState.FAILED.
743         The invocation of _notify immediately after, forces the processing
744         loop in the _process method, to wake up if it was blocked waiting for new 
745         tasks to arrived, and to check the EC state.
746         As the EC is in FAILED state, the processing loop exits and the 
747         'finally' block is invoked. In the 'finally' block, the 'sync' method
748         of the PR is invoked, which forces the PR to raise any unchecked errors
749         that might have been raised by the workers.
750
751         """
752         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
753
754         runner = ParallelRun(maxthreads = nthreads)
755         runner.start()
756
757         try:
758             while not self.finished:
759                 self._cond.acquire()
760
761                 task = self._scheduler.next()
762                 
763                 if not task:
764                     # No task to execute. Wait for a new task to be scheduled.
765                     self._cond.wait()
766                 else:
767                     # The task timestamp is in the future. Wait for timeout 
768                     # or until another task is scheduled.
769                     now = tnow()
770                     if now < task.timestamp:
771                         # Calculate timeout in seconds
772                         timeout = tdiffsec(task.timestamp, now)
773
774                         # Re-schedule task with the same timestamp
775                         self._scheduler.schedule(task)
776                         
777                         task = None
778
779                         # Wait timeout or until a new task awakes the condition
780                         self._cond.wait(timeout)
781                
782                 self._cond.release()
783
784                 if task:
785                     # Process tasks in parallel
786                     runner.put(self._execute, task)
787         except: 
788             import traceback
789             err = traceback.format_exc()
790             self.logger.error("Error while processing tasks in the EC: %s" % err)
791
792             self._state = ECState.FAILED
793         finally:   
794             self.logger.debug("Exiting the task processing loop ... ")
795             runner.sync()
796             runner.destroy()
797
798     def _execute(self, task):
799         """ Executes a single task. 
800
801             :param task: Object containing the callback to execute
802             :type task: Task
803
804         .. note::
805
806         If the invokation of the task callback raises an
807         exception, the processing thread of the ExperimentController
808         will be stopped and the experiment will be aborted.
809
810         """
811         # Invoke callback
812         task.status = TaskStatus.DONE
813
814         try:
815             task.result = task.callback()
816         except:
817             import traceback
818             err = traceback.format_exc()
819             task.result = err
820             task.status = TaskStatus.ERROR
821             
822             self.logger.error("Error occurred while executing task: %s" % err)
823
824             # Set the EC to FAILED state (this will force to exit the task
825             # processing thread)
826             self._state = ECState.FAILED
827
828             # Notify condition to wake up the processing thread
829             self._notify()
830
831             # Propage error to the ParallelRunner
832             raise
833
834     def _notify(self):
835         """ Awakes the processing thread in case it is blocked waiting
836         for a new task to be scheduled.
837         """
838         self._cond.acquire()
839         self._cond.notify()
840         self._cond.release()
841