Fixing 'error: can't start new thread' bug in ParallelRun
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39
40 class ECState(object):
41     """ State of the Experiment Controller
42    
43     """
44     RUNNING = 1
45     FAILED = 2
46     TERMINATED = 3
47
48 class ExperimentController(object):
49     """
50     .. class:: Class Args :
51       
52         :param exp_id: Human readable identifier for the experiment scenario. 
53                        It will be used in the name of the directory 
54                         where experiment related information is stored
55         :type exp_id: str
56
57     .. note::
58
59         An experiment, or scenario, is defined by a concrete use, behavior,
60         configuration and interconnection of resources that describe a single
61         experiment case (We call this the experiment description). 
62         A same experiment (scenario) can be run many times.
63
64         The ExperimentController (EC), is the entity responsible for 
65         managing an experiment instance (run). The same scenario can be 
66         recreated (and re-run) by instantiating an EC and recreating 
67         the same experiment description. 
68
69         In NEPI, an experiment is represented as a graph of interconnected
70         resources. A resource is a generic concept in the sense that any
71         component taking part of an experiment, whether physical of
72         virtual, is considered a resource. A resources could be a host, 
73         a virtual machine, an application, a simulator, a IP address.
74
75         A ResourceManager (RM), is the entity responsible for managing a 
76         single resource. ResourceManagers are specific to a resource
77         type (i.e. An RM to control a Linux application will not be
78         the same as the RM used to control a ns-3 simulation).
79         In order for a new type of resource to be supported in NEPI
80         a new RM must be implemented. NEPI already provides different
81         RMs to control basic resources, and new can be extended from
82         the existing ones.
83
84         Through the EC interface the user can create ResourceManagers (RMs),
85         configure them and interconnect them, in order to describe an experiment.
86         Describing an experiment through the EC does not run the experiment.
87         Only when the 'deploy()' method is invoked on the EC, will the EC take 
88         actions to transform the 'described' experiment into a 'running' experiment.
89
90         While the experiment is running, it is possible to continue to
91         create/configure/connect RMs, and to deploy them to involve new
92         resources in the experiment (this is known as 'interactive' deployment).
93         
94         An experiments in NEPI is identified by a string id, 
95         which is either given by the user, or automatically generated by NEPI.  
96         The purpose of this identifier is to separate files and results that 
97         belong to different experiment scenarios. 
98         However, since a same 'experiment' can be run many times, the experiment
99         id is not enough to identify an experiment instance (run).
100         For this reason, the ExperimentController has two identifier, the 
101         exp_id, which can be re-used by different ExperimentController instances,
102         and the run_id, which unique to a ExperimentController instance, and
103         is automatically generated by NEPI.
104         
105     """
106
107     def __init__(self, exp_id = None): 
108         super(ExperimentController, self).__init__()
109         # Logging
110         self._logger = logging.getLogger("ExperimentController")
111
112         # Run identifier. It identifies a concrete instance (run) of an experiment.
113         # Since a same experiment (same configuration) can be run many times,
114         # this id permits to identify concrete exoeriment run
115         self._run_id = tsformat()
116
117         # Experiment identifier. Usually assigned by the user
118         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
119
120         # generator of globally unique ids
121         self._guid_generator = guid.GuidGenerator()
122         
123         # Resource managers
124         self._resources = dict()
125
126         # Scheduler
127         self._scheduler = HeapScheduler()
128
129         # Tasks
130         self._tasks = dict()
131
132         # RM groups 
133         self._groups = dict()
134
135         # generator of globally unique id for groups
136         self._group_id_generator = guid.GuidGenerator()
137  
138         # Event processing thread
139         self._cond = threading.Condition()
140         self._thread = threading.Thread(target = self._process)
141         self._thread.setDaemon(True)
142         self._thread.start()
143
144         # EC state
145         self._state = ECState.RUNNING
146
147     @property
148     def logger(self):
149         """ Return the logger of the Experiment Controller
150
151         """
152         return self._logger
153
154     @property
155     def ecstate(self):
156         """ Return the state of the Experiment Controller
157
158         """
159         return self._state
160
161     @property
162     def exp_id(self):
163         """ Return the experiment id assigned by the user
164
165         """
166         return self._exp_id
167
168     @property
169     def run_id(self):
170         """ Return the experiment instance (run) identifier  
171
172         """
173         return self._run_id
174
175     @property
176     def finished(self):
177         """ Put the state of the Experiment Controller into a final state :
178             Either TERMINATED or FAILED
179
180         """
181         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
182
183     def wait_finished(self, guids):
184         """ Blocking method that wait until all the RM from the 'guid' list 
185             reached the state FINISHED
186
187         :param guids: List of guids
188         :type guids: list
189         """
190         return self.wait(guids)
191
192     def wait_started(self, guids):
193         """ Blocking method that wait until all the RM from the 'guid' list 
194             reached the state STARTED
195
196         :param guids: List of guids
197         :type guids: list
198         """
199         return self.wait(guids, states = [ResourceState.STARTED,
200             ResourceState.STOPPED,
201             ResourceState.FAILED,
202             ResourceState.FINISHED])
203
204     def wait_released(self, guids):
205         """ Blocking method that wait until all the RM from the 'guid' list 
206             reached the state RELEASED
207
208         :param guids: List of guids
209         :type guids: list
210         """
211         return self.wait(guids, states = [ResourceState.RELEASED,
212             ResourceState.STOPPED,
213             ResourceState.FAILED,
214             ResourceState.FINISHED])
215
216     def wait(self, guids, states = [ResourceState.FINISHED, 
217             ResourceState.FAILED,
218             ResourceState.STOPPED]):
219         """ Blocking method that waits until all the RM from the 'guid' list 
220             reached state 'state' or until a failure occurs
221             
222         :param guids: List of guids
223         :type guids: list
224         """
225         if isinstance(guids, int):
226             guids = [guids]
227
228         # we randomly alter the order of the guids to avoid ordering
229         # dependencies (e.g. LinuxApplication RMs runing on the same
230         # linux host will be synchronized by the LinuxNode SSH lock)
231         random.shuffle(guids)
232
233         while True:
234             # If no more guids to wait for or an error occured, then exit
235             if len(guids) == 0 or self.finished:
236                 break
237
238             # If a guid reached one of the target states, remove it from list
239             guid = guids[0]
240             state = self.state(guid)
241
242             if state in states:
243                 guids.remove(guid)
244             else:
245                 # Debug...
246                 self.logger.debug(" WAITING FOR %g - state %s " % (guid,
247                     self.state(guid, hr = True)))
248
249                 # Take the opportunity to 'refresh' the states of the RMs.
250                 # Query only the first up to N guids (not to overwhelm 
251                 # the local machine)
252                 n = 100
253                 lim = n if len(guids) > n else ( len(guids) -1 )
254                 nguids = guids[0: lim]
255
256                 # schedule state request for all guids (take advantage of
257                 # scheduler multi threading).
258                 for guid in nguids:
259                     callback = functools.partial(self.state, guid)
260                     self.schedule("0s", callback)
261
262                 # If the guid is not in one of the target states, wait and
263                 # continue quering. We keep the sleep big to decrease the
264                 # number of RM state queries
265                 time.sleep(2)
266   
267     def get_task(self, tid):
268         """ Get a specific task
269
270         :param tid: Id of the task
271         :type tid: int
272         :rtype: Task
273         """
274         return self._tasks.get(tid)
275
276     def get_resource(self, guid):
277         """ Get a specific Resource Manager
278
279         :param guid: Id of the task
280         :type guid: int
281         :rtype: ResourceManager
282         """
283         return self._resources.get(guid)
284
285     @property
286     def resources(self):
287         """ Returns the list of all the Resource Manager Id
288
289         :rtype: set
290
291         """
292         return self._resources.keys()
293
294     def register_resource(self, rtype, guid = None):
295         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
296         for the RM of type 'rtype' and add it to the list of Resources.
297
298         :param rtype: Type of the RM
299         :type rtype: str
300         :return: Id of the RM
301         :rtype: int
302         """
303         # Get next available guid
304         guid = self._guid_generator.next(guid)
305         
306         # Instantiate RM
307         rm = ResourceFactory.create(rtype, self, guid)
308
309         # Store RM
310         self._resources[guid] = rm
311
312         return guid
313
314     def get_attributes(self, guid):
315         """ Return all the attibutes of a specific RM
316
317         :param guid: Guid of the RM
318         :type guid: int
319         :return: List of attributes
320         :rtype: list
321         """
322         rm = self.get_resource(guid)
323         return rm.get_attributes()
324
325     def register_connection(self, guid1, guid2):
326         """ Registers a guid1 with a guid2. 
327             The declaration order is not important
328
329             :param guid1: First guid to connect
330             :type guid1: ResourceManager
331
332             :param guid2: Second guid to connect
333             :type guid: ResourceManager
334         """
335         rm1 = self.get_resource(guid1)
336         rm2 = self.get_resource(guid2)
337
338         rm1.register_connection(guid2)
339         rm2.register_connection(guid1)
340
341     def register_condition(self, guids1, action, guids2, state,
342             time = None):
343         """ Registers an action START or STOP for all RM on guids1 to occur 
344             time 'time' after all elements in guids2 reached state 'state'.
345
346             :param guids1: List of guids of RMs subjected to action
347             :type guids1: list
348
349             :param action: Action to register (either START or STOP)
350             :type action: ResourceAction
351
352             :param guids2: List of guids of RMs to we waited for
353             :type guids2: list
354
355             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
356             :type state: ResourceState
357
358             :param time: Time to wait after guids2 has reached status 
359             :type time: string
360
361         """
362         if isinstance(guids1, int):
363             guids1 = [guids1]
364         if isinstance(guids2, int):
365             guids2 = [guids2]
366
367         for guid1 in guids1:
368             rm = self.get_resource(guid1)
369             rm.register_condition(action, guids2, state, time)
370
371     def enable_trace(self, guid, name):
372         """ Enable trace
373
374         :param name: Name of the trace
375         :type name: str
376         """
377         rm = self.get_resource(guid)
378         rm.enable_trace(name)
379
380     def trace_enabled(self, guid, name):
381         """ Returns True if trace is enabled
382
383         :param name: Name of the trace
384         :type name: str
385         """
386         rm = self.get_resource(guid)
387         return rm.trace_enabled(name)
388
389     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
390         """ Get information on collected trace
391
392         :param name: Name of the trace
393         :type name: str
394
395         :param attr: Can be one of:
396                          - TraceAttr.ALL (complete trace content), 
397                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
398                          - TraceAttr.PATH (full path to the trace file),
399                          - TraceAttr.SIZE (size of trace file). 
400         :type attr: str
401
402         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
403         :type name: int
404
405         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
406         :type name: int
407
408         :rtype: str
409         """
410         rm = self.get_resource(guid)
411         return rm.trace(name, attr, block, offset)
412
413     def discover(self, guid):
414         """ Discover a specific RM defined by its 'guid'
415
416             :param guid: Guid of the RM
417             :type guid: int
418
419         """
420         rm = self.get_resource(guid)
421         return rm.discover()
422
423     def provision(self, guid):
424         """ Provision a specific RM defined by its 'guid'
425
426             :param guid: Guid of the RM
427             :type guid: int
428
429         """
430         rm = self.get_resource(guid)
431         return rm.provision()
432
433     def get(self, guid, name):
434         """ Get a specific attribute 'name' from the RM 'guid'
435
436             :param guid: Guid of the RM
437             :type guid: int
438
439             :param name: attribute's name
440             :type name: str
441
442         """
443         rm = self.get_resource(guid)
444         return rm.get(name)
445
446     def set(self, guid, name, value):
447         """ Set a specific attribute 'name' from the RM 'guid' 
448             with the value 'value' 
449
450             :param guid: Guid of the RM
451             :type guid: int
452
453             :param name: attribute's name
454             :type name: str
455
456             :param value: attribute's value
457
458         """
459         rm = self.get_resource(guid)
460         return rm.set(name, value)
461
462     def state(self, guid, hr = False):
463         """ Returns the state of a resource
464
465             :param guid: Resource guid
466             :type guid: integer
467
468             :param hr: Human readable. Forces return of a 
469                 status string instead of a number 
470             :type hr: boolean
471
472         """
473         rm = self.get_resource(guid)
474         state = rm.state
475
476         if hr:
477             return ResourceState2str.get(state)
478
479         return state
480
481     def stop(self, guid):
482         """ Stop a specific RM defined by its 'guid'
483
484             :param guid: Guid of the RM
485             :type guid: int
486
487         """
488         rm = self.get_resource(guid)
489         return rm.stop()
490
491     def start(self, guid):
492         """ Start a specific RM defined by its 'guid'
493
494             :param guid: Guid of the RM
495             :type guid: int
496
497         """
498         rm = self.get_resource(guid)
499         return rm.start()
500
501     def set_with_conditions(self, name, value, guids1, guids2, state,
502             time = None):
503         """ Set value 'value' on attribute with name 'name' on all RMs of
504             guids1 when 'time' has elapsed since all elements in guids2 
505             have reached state 'state'.
506
507             :param name: Name of attribute to set in RM
508             :type name: string
509
510             :param value: Value of attribute to set in RM
511             :type name: string
512
513             :param guids1: List of guids of RMs subjected to action
514             :type guids1: list
515
516             :param action: Action to register (either START or STOP)
517             :type action: ResourceAction
518
519             :param guids2: List of guids of RMs to we waited for
520             :type guids2: list
521
522             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
523             :type state: ResourceState
524
525             :param time: Time to wait after guids2 has reached status 
526             :type time: string
527
528         """
529         if isinstance(guids1, int):
530             guids1 = [guids1]
531         if isinstance(guids2, int):
532             guids2 = [guids2]
533
534         for guid1 in guids1:
535             rm = self.get_resource(guid)
536             rm.set_with_conditions(name, value, guids2, state, time)
537
538     def stop_with_conditions(self, guid):
539         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
540
541             :param guid: Guid of the RM
542             :type guid: int
543
544         """
545         rm = self.get_resource(guid)
546         return rm.stop_with_conditions()
547
548     def start_with_conditions(self, guid):
549         """ Start a specific RM defined by its 'guid' only if all the conditions are true
550
551             :param guid: Guid of the RM
552             :type guid: int
553
554         """
555         rm = self.get_resource(guid)
556         return rm.start_with_conditions()
557
558     def deploy(self, guids = None, wait_all_ready = True, group = None):
559         """ Deploy all resource manager in guids list
560
561         :param guids: List of guids of RMs to deploy
562         :type guids: list
563
564         :param wait_all_ready: Wait until all RMs are ready in
565             order to start the RMs
566         :type guid: int
567
568         :param group: Id of deployment group in which to deploy RMs
569         :type group: int
570
571         """
572         self.logger.debug(" ------- DEPLOY START ------ ")
573
574         if not guids:
575             # If no guids list was indicated, all 'NEW' RMs will be deployed
576             guids = []
577             for guid in self.resources:
578                 if self.state(guid) == ResourceState.NEW:
579                     guids.append(guid)
580                 
581         if isinstance(guids, int):
582             guids = [guids]
583
584         # Create deployment group 
585         if not group:
586             group = self._group_id_generator.next(guid)
587
588         if group not in self._groups:
589             self._groups[group] = []
590
591         self._groups[group].extend(guids)
592
593         # Before starting deployment we disorder the guids list with the
594         # purpose of speeding up the whole deployment process.
595         # It is likely that the user inserted in the 'guids' list closely
596         # resources one after another (e.g. all applications
597         # connected to the same node can likely appear one after another).
598         # This can originate a slow down in the deployment since the N 
599         # threads the parallel runner uses to processes tasks may all
600         # be taken up by the same family of resources waiting for the 
601         # same conditions (e.g. LinuxApplications running on a same 
602         # node share a single lock, so they will tend to be serialized).
603         # If we disorder the guids list, this problem can be mitigated.
604         random.shuffle(guids)
605
606         def wait_all_and_start(group):
607             reschedule = False
608             
609             # Get all guids in group
610             guids = self._groups[group]
611
612             for guid in guids:
613                 if self.state(guid) < ResourceState.READY:
614                     reschedule = True
615                     break
616
617             if reschedule:
618                 callback = functools.partial(wait_all_and_start, group)
619                 self.schedule("1s", callback)
620             else:
621                 # If all resources are read, we schedule the start
622                 for guid in guids:
623                     rm = self.get_resource(guid)
624                     self.schedule("0s", rm.start_with_conditions)
625
626         if wait_all_ready:
627             # Schedule a function to check that all resources are
628             # READY, and only then schedule the start.
629             # This aimes at reducing the number of tasks looping in the 
630             # scheduler. 
631             # Intead of having N start tasks, we will have only one for 
632             # the whole group.
633             callback = functools.partial(wait_all_and_start, group)
634             self.schedule("1s", callback)
635
636         for guid in guids:
637             rm = self.get_resource(guid)
638             rm.deployment_group = group
639             self.schedule("0s", rm.deploy)
640
641             if not wait_all_ready:
642                 self.schedule("1s", rm.start_with_conditions)
643
644             if rm.conditions.get(ResourceAction.STOP):
645                 # Only if the RM has STOP conditions we
646                 # schedule a stop. Otherwise the RM will stop immediately
647                 self.schedule("2s", rm.stop_with_conditions)
648
649     def release(self, guids = None):
650         """ Release al RMs on the guids list or 
651         all the resources if no list is specified
652
653             :param guids: List of RM guids
654             :type guids: list
655
656         """
657         if not guids:
658             guids = self.resources
659
660         for guid in guids:
661             rm = self.get_resource(guid)
662             self.schedule("0s", rm.release)
663
664         self.wait_released(guids)
665         
666     def shutdown(self):
667         """ Shutdown the Experiment Controller. 
668         Releases all the resources and stops task processing thread
669
670         """
671         self.release()
672
673         # Mark the EC state as TERMINATED
674         self._state = ECState.TERMINATED
675
676         # Notify condition to wake up the processing thread
677         self._notify()
678         
679         if self._thread.is_alive():
680            self._thread.join()
681
682     def schedule(self, date, callback, track = False):
683         """ Schedule a callback to be executed at time date.
684
685             :param date: string containing execution time for the task.
686                     It can be expressed as an absolute time, using
687                     timestamp format, or as a relative time matching
688                     ^\d+.\d+(h|m|s|ms|us)$
689
690             :param callback: code to be executed for the task. Must be a
691                         Python function, and receives args and kwargs
692                         as arguments.
693
694             :param track: if set to True, the task will be retrivable with
695                     the get_task() method
696
697             :return : The Id of the task
698         """
699         timestamp = stabsformat(date)
700         task = Task(timestamp, callback)
701         task = self._scheduler.schedule(task)
702
703         if track:
704             self._tasks[task.id] = task
705
706         # Notify condition to wake up the processing thread
707         self._notify()
708
709         return task.id
710      
711     def _process(self):
712         """ Process scheduled tasks.
713
714         .. note::
715
716         The _process method is executed in an independent thread held by the 
717         ExperimentController for as long as the experiment is running.
718         
719         Tasks are scheduled by invoking the schedule method with a target callback. 
720         The schedule method is given a execution time which controls the
721         order in which tasks are processed. 
722
723         Tasks are processed in parallel using multithreading. 
724         The environmental variable NEPI_NTHREADS can be used to control
725         the number of threads used to process tasks. The default value is 50.
726
727         Exception handling:
728
729         To execute tasks in parallel, an ParallelRunner (PR) object, holding
730         a pool of threads (workers), is used.
731         For each available thread in the PR, the next task popped from 
732         the scheduler queue is 'put' in the PR.
733         Upon receiving a task to execute, each PR worker (thread) invokes the 
734         _execute method of the EC, passing the task as argument. 
735         This method, calls task.callback inside a try/except block. If an 
736         exception is raised by the tasks.callback, it will be trapped by the 
737         try block, logged to standard error (usually the console), and the EC 
738         state will be set to ECState.FAILED.
739         The invocation of _notify immediately after, forces the processing
740         loop in the _process method, to wake up if it was blocked waiting for new 
741         tasks to arrived, and to check the EC state.
742         As the EC is in FAILED state, the processing loop exits and the 
743         'finally' block is invoked. In the 'finally' block, the 'sync' method
744         of the PR is invoked, which forces the PR to raise any unchecked errors
745         that might have been raised by the workers.
746
747         """
748         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
749
750         runner = ParallelRun(maxthreads = nthreads)
751         runner.start()
752
753         try:
754             while not self.finished:
755                 self._cond.acquire()
756
757                 task = self._scheduler.next()
758                 
759                 if not task:
760                     # No task to execute. Wait for a new task to be scheduled.
761                     self._cond.wait()
762                 else:
763                     # The task timestamp is in the future. Wait for timeout 
764                     # or until another task is scheduled.
765                     now = tnow()
766                     if now < task.timestamp:
767                         # Calculate timeout in seconds
768                         timeout = tdiffsec(task.timestamp, now)
769
770                         # Re-schedule task with the same timestamp
771                         self._scheduler.schedule(task)
772                         
773                         task = None
774
775                         # Wait timeout or until a new task awakes the condition
776                         self._cond.wait(timeout)
777                
778                 self._cond.release()
779
780                 if task:
781                     # Process tasks in parallel
782                     runner.put(self._execute, task)
783         except: 
784             import traceback
785             err = traceback.format_exc()
786             self.logger.error("Error while processing tasks in the EC: %s" % err)
787
788             self._state = ECState.FAILED
789         finally:   
790             self.logger.debug("Exiting the task processing loop ... ")
791             runner.sync()
792             runner.destroy()
793
794     def _execute(self, task):
795         """ Executes a single task. 
796
797             :param task: Object containing the callback to execute
798             :type task: Task
799
800         .. note::
801
802         If the invokation of the task callback raises an
803         exception, the processing thread of the ExperimentController
804         will be stopped and the experiment will be aborted.
805
806         """
807         # Invoke callback
808         task.status = TaskStatus.DONE
809
810         try:
811             task.result = task.callback()
812         except:
813             import traceback
814             err = traceback.format_exc()
815             task.result = err
816             task.status = TaskStatus.ERROR
817             
818             self.logger.error("Error occurred while executing task: %s" % err)
819
820             # Set the EC to FAILED state (this will force to exit the task
821             # processing thread)
822             self._state = ECState.FAILED
823
824             # Notify condition to wake up the processing thread
825             self._notify()
826
827             # Propage error to the ParallelRunner
828             raise
829
830     def _notify(self):
831         """ Awakes the processing thread in case it is blocked waiting
832         for a new task to be scheduled.
833         """
834         self._cond.acquire()
835         self._cond.notify()
836         self._cond.release()
837