Commiting merged branch nepi-3-dev
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39
40 class ECState(object):
41     """ State of the Experiment Controller
42    
43     """
44     RUNNING = 1
45     FAILED = 2
46     TERMINATED = 3
47
48 class ExperimentController(object):
49     """
50     .. class:: Class Args :
51       
52         :param exp_id: Human readable identifier for the experiment scenario. 
53                        It will be used in the name of the directory 
54                         where experiment related information is stored
55         :type exp_id: str
56
57     .. note::
58
59         An experiment, or scenario, is defined by a concrete use, behavior,
60         configuration and interconnection of resources that describe a single
61         experiment case (We call this the experiment description). 
62         A same experiment (scenario) can be run many times.
63
64         The ExperimentController (EC), is the entity responsible for 
65         managing an experiment instance (run). The same scenario can be 
66         recreated (and re-run) by instantiating an EC and recreating 
67         the same experiment description. 
68
69         In NEPI, an experiment is represented as a graph of interconnected
70         resources. A resource is a generic concept in the sense that any
71         component taking part of an experiment, whether physical of
72         virtual, is considered a resource. A resources could be a host, 
73         a virtual machine, an application, a simulator, a IP address.
74
75         A ResourceManager (RM), is the entity responsible for managing a 
76         single resource. ResourceManagers are specific to a resource
77         type (i.e. An RM to control a Linux application will not be
78         the same as the RM used to control a ns-3 simulation).
79         In order for a new type of resource to be supported in NEPI
80         a new RM must be implemented. NEPI already provides different
81         RMs to control basic resources, and new can be extended from
82         the existing ones.
83
84         Through the EC interface the user can create ResourceManagers (RMs),
85         configure them and interconnect them, in order to describe an experiment.
86         Describing an experiment through the EC does not run the experiment.
87         Only when the 'deploy()' method is invoked on the EC, will the EC take 
88         actions to transform the 'described' experiment into a 'running' experiment.
89
90         While the experiment is running, it is possible to continue to
91         create/configure/connect RMs, and to deploy them to involve new
92         resources in the experiment (this is known as 'interactive' deployment).
93         
94         An experiments in NEPI is identified by a string id, 
95         which is either given by the user, or automatically generated by NEPI.  
96         The purpose of this identifier is to separate files and results that 
97         belong to different experiment scenarios. 
98         However, since a same 'experiment' can be run many times, the experiment
99         id is not enough to identify an experiment instance (run).
100         For this reason, the ExperimentController has two identifier, the 
101         exp_id, which can be re-used by different ExperimentController instances,
102         and the run_id, which unique to a ExperimentController instance, and
103         is automatically generated by NEPI.
104         
105     """
106
107     def __init__(self, exp_id = None): 
108         super(ExperimentController, self).__init__()
109         # root directory to store files
110
111         # Run identifier. It identifies a concrete instance (run) of an experiment.
112         # Since a same experiment (same configuration) can be run many times,
113         # this id permits to identify concrete exoeriment run
114         self._run_id = tsformat()
115
116         # Experiment identifier. Usually assigned by the user
117         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
118
119         # generator of globally unique ids
120         self._guid_generator = guid.GuidGenerator()
121         
122         # Resource managers
123         self._resources = dict()
124
125         # Scheduler
126         self._scheduler = HeapScheduler()
127
128         # Tasks
129         self._tasks = dict()
130
131         # RM groups 
132         self._groups = dict()
133
134         # generator of globally unique id for groups
135         self._group_id_generator = guid.GuidGenerator()
136  
137         # Event processing thread
138         self._cond = threading.Condition()
139         self._thread = threading.Thread(target = self._process)
140         self._thread.setDaemon(True)
141         self._thread.start()
142
143         # EC state
144         self._state = ECState.RUNNING
145
146         # Logging
147         self._logger = logging.getLogger("ExperimentController")
148
149     @property
150     def logger(self):
151         """ Return the logger of the Experiment Controller
152
153         """
154         return self._logger
155
156     @property
157     def ecstate(self):
158         """ Return the state of the Experiment Controller
159
160         """
161         return self._state
162
163     @property
164     def exp_id(self):
165         """ Return the experiment id assigned by the user
166
167         """
168         return self._exp_id
169
170     @property
171     def run_id(self):
172         """ Return the experiment instance (run) identifier  
173
174         """
175         return self._run_id
176
177     @property
178     def finished(self):
179         """ Put the state of the Experiment Controller into a final state :
180             Either TERMINATED or FAILED
181
182         """
183         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
184
185     def wait_finished(self, guids):
186         """ Blocking method that wait until all the RM from the 'guid' list 
187             reached the state FINISHED
188
189         :param guids: List of guids
190         :type guids: list
191         """
192         return self.wait(guids)
193
194     def wait_started(self, guids):
195         """ Blocking method that wait until all the RM from the 'guid' list 
196             reached the state STARTED
197
198         :param guids: List of guids
199         :type guids: list
200         """
201         return self.wait(guids, states = [ResourceState.STARTED,
202             ResourceState.STOPPED,
203             ResourceState.FAILED,
204             ResourceState.FINISHED])
205
206     def wait_released(self, guids):
207         """ Blocking method that wait until all the RM from the 'guid' list 
208             reached the state RELEASED
209
210         :param guids: List of guids
211         :type guids: list
212         """
213         return self.wait(guids, states = [ResourceState.RELEASED,
214             ResourceState.STOPPED,
215             ResourceState.FAILED,
216             ResourceState.FINISHED])
217
218     def wait(self, guids, states = [ResourceState.FINISHED, 
219             ResourceState.FAILED,
220             ResourceState.STOPPED]):
221         """ Blocking method that waits until all the RM from the 'guid' list 
222             reached state 'state' or until a failure occurs
223             
224         :param guids: List of guids
225         :type guids: list
226         """
227         if isinstance(guids, int):
228             guids = [guids]
229
230         # we randomly alter the order of the guids to avoid ordering
231         # dependencies (e.g. LinuxApplication RMs runing on the same
232         # linux host will be synchronized by the LinuxNode SSH lock)
233         random.shuffle(guids)
234
235         while True:
236             # If no more guids to wait for or an error occured, then exit
237             if len(guids) == 0 or self.finished:
238                 break
239
240             # If a guid reached one of the target states, remove it from list
241             guid = guids[0]
242             state = self.state(guid)
243
244             if state in states:
245                 guids.remove(guid)
246             else:
247                 # Debug...
248                 self.logger.debug(" WAITING FOR %g - state %s " % (guid,
249                     self.state(guid, hr = True)))
250
251                 # Take the opportunity to 'refresh' the states of the RMs.
252                 # Query only the first up to N guids (not to overwhelm 
253                 # the local machine)
254                 n = 100
255                 lim = n if len(guids) > n else ( len(guids) -1 )
256                 nguids = guids[0: lim]
257
258                 # schedule state request for all guids (take advantage of
259                 # scheduler multi threading).
260                 for guid in nguids:
261                     callback = functools.partial(self.state, guid)
262                     self.schedule("0s", callback)
263
264                 # If the guid is not in one of the target states, wait and
265                 # continue quering. We keep the sleep big to decrease the
266                 # number of RM state queries
267                 time.sleep(2)
268   
269     def get_task(self, tid):
270         """ Get a specific task
271
272         :param tid: Id of the task
273         :type tid: int
274         :rtype: Task
275         """
276         return self._tasks.get(tid)
277
278     def get_resource(self, guid):
279         """ Get a specific Resource Manager
280
281         :param guid: Id of the task
282         :type guid: int
283         :rtype: ResourceManager
284         """
285         return self._resources.get(guid)
286
287     @property
288     def resources(self):
289         """ Returns the list of all the Resource Manager Id
290
291         :rtype: set
292
293         """
294         return self._resources.keys()
295
296     def register_resource(self, rtype, guid = None):
297         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
298         for the RM of type 'rtype' and add it to the list of Resources.
299
300         :param rtype: Type of the RM
301         :type rtype: str
302         :return: Id of the RM
303         :rtype: int
304         """
305         # Get next available guid
306         guid = self._guid_generator.next(guid)
307         
308         # Instantiate RM
309         rm = ResourceFactory.create(rtype, self, guid)
310
311         # Store RM
312         self._resources[guid] = rm
313
314         return guid
315
316     def get_attributes(self, guid):
317         """ Return all the attibutes of a specific RM
318
319         :param guid: Guid of the RM
320         :type guid: int
321         :return: List of attributes
322         :rtype: list
323         """
324         rm = self.get_resource(guid)
325         return rm.get_attributes()
326
327     def register_connection(self, guid1, guid2):
328         """ Registers a guid1 with a guid2. 
329             The declaration order is not important
330
331             :param guid1: First guid to connect
332             :type guid1: ResourceManager
333
334             :param guid2: Second guid to connect
335             :type guid: ResourceManager
336         """
337         rm1 = self.get_resource(guid1)
338         rm2 = self.get_resource(guid2)
339
340         rm1.register_connection(guid2)
341         rm2.register_connection(guid1)
342
343     def register_condition(self, guids1, action, guids2, state,
344             time = None):
345         """ Registers an action START or STOP for all RM on guids1 to occur 
346             time 'time' after all elements in guids2 reached state 'state'.
347
348             :param guids1: List of guids of RMs subjected to action
349             :type guids1: list
350
351             :param action: Action to register (either START or STOP)
352             :type action: ResourceAction
353
354             :param guids2: List of guids of RMs to we waited for
355             :type guids2: list
356
357             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
358             :type state: ResourceState
359
360             :param time: Time to wait after guids2 has reached status 
361             :type time: string
362
363         """
364         if isinstance(guids1, int):
365             guids1 = [guids1]
366         if isinstance(guids2, int):
367             guids2 = [guids2]
368
369         for guid1 in guids1:
370             rm = self.get_resource(guid1)
371             rm.register_condition(action, guids2, state, time)
372
373     def enable_trace(self, guid, name):
374         """ Enable trace
375
376         :param name: Name of the trace
377         :type name: str
378         """
379         rm = self.get_resource(guid)
380         rm.enable_trace(name)
381
382     def trace_enabled(self, guid, name):
383         """ Returns True if trace is enabled
384
385         :param name: Name of the trace
386         :type name: str
387         """
388         rm = self.get_resource(guid)
389         return rm.trace_enabled(name)
390
391     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
392         """ Get information on collected trace
393
394         :param name: Name of the trace
395         :type name: str
396
397         :param attr: Can be one of:
398                          - TraceAttr.ALL (complete trace content), 
399                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
400                          - TraceAttr.PATH (full path to the trace file),
401                          - TraceAttr.SIZE (size of trace file). 
402         :type attr: str
403
404         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
405         :type name: int
406
407         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
408         :type name: int
409
410         :rtype: str
411         """
412         rm = self.get_resource(guid)
413         return rm.trace(name, attr, block, offset)
414
415     def discover(self, guid):
416         """ Discover a specific RM defined by its 'guid'
417
418             :param guid: Guid of the RM
419             :type guid: int
420
421         """
422         rm = self.get_resource(guid)
423         return rm.discover()
424
425     def provision(self, guid):
426         """ Provision a specific RM defined by its 'guid'
427
428             :param guid: Guid of the RM
429             :type guid: int
430
431         """
432         rm = self.get_resource(guid)
433         return rm.provision()
434
435     def get(self, guid, name):
436         """ Get a specific attribute 'name' from the RM 'guid'
437
438             :param guid: Guid of the RM
439             :type guid: int
440
441             :param name: attribute's name
442             :type name: str
443
444         """
445         rm = self.get_resource(guid)
446         return rm.get(name)
447
448     def set(self, guid, name, value):
449         """ Set a specific attribute 'name' from the RM 'guid' 
450             with the value 'value' 
451
452             :param guid: Guid of the RM
453             :type guid: int
454
455             :param name: attribute's name
456             :type name: str
457
458             :param value: attribute's value
459
460         """
461         rm = self.get_resource(guid)
462         return rm.set(name, value)
463
464     def state(self, guid, hr = False):
465         """ Returns the state of a resource
466
467             :param guid: Resource guid
468             :type guid: integer
469
470             :param hr: Human readable. Forces return of a 
471                 status string instead of a number 
472             :type hr: boolean
473
474         """
475         rm = self.get_resource(guid)
476         state = rm.state
477
478         if hr:
479             return ResourceState2str.get(state)
480
481         return state
482
483     def stop(self, guid):
484         """ Stop a specific RM defined by its 'guid'
485
486             :param guid: Guid of the RM
487             :type guid: int
488
489         """
490         rm = self.get_resource(guid)
491         return rm.stop()
492
493     def start(self, guid):
494         """ Start a specific RM defined by its 'guid'
495
496             :param guid: Guid of the RM
497             :type guid: int
498
499         """
500         rm = self.get_resource(guid)
501         return rm.start()
502
503     def set_with_conditions(self, name, value, guids1, guids2, state,
504             time = None):
505         """ Set value 'value' on attribute with name 'name' on all RMs of
506             guids1 when 'time' has elapsed since all elements in guids2 
507             have reached state 'state'.
508
509             :param name: Name of attribute to set in RM
510             :type name: string
511
512             :param value: Value of attribute to set in RM
513             :type name: string
514
515             :param guids1: List of guids of RMs subjected to action
516             :type guids1: list
517
518             :param action: Action to register (either START or STOP)
519             :type action: ResourceAction
520
521             :param guids2: List of guids of RMs to we waited for
522             :type guids2: list
523
524             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
525             :type state: ResourceState
526
527             :param time: Time to wait after guids2 has reached status 
528             :type time: string
529
530         """
531         if isinstance(guids1, int):
532             guids1 = [guids1]
533         if isinstance(guids2, int):
534             guids2 = [guids2]
535
536         for guid1 in guids1:
537             rm = self.get_resource(guid)
538             rm.set_with_conditions(name, value, guids2, state, time)
539
540     def stop_with_conditions(self, guid):
541         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
542
543             :param guid: Guid of the RM
544             :type guid: int
545
546         """
547         rm = self.get_resource(guid)
548         return rm.stop_with_conditions()
549
550     def start_with_conditions(self, guid):
551         """ Start a specific RM defined by its 'guid' only if all the conditions are true
552
553             :param guid: Guid of the RM
554             :type guid: int
555
556         """
557         rm = self.get_resource(guid)
558         return rm.start_with_conditions()
559
560     def deploy(self, guids = None, wait_all_ready = True, group = None):
561         """ Deploy all resource manager in guids list
562
563         :param guids: List of guids of RMs to deploy
564         :type guids: list
565
566         :param wait_all_ready: Wait until all RMs are ready in
567             order to start the RMs
568         :type guid: int
569
570         :param group: Id of deployment group in which to deploy RMs
571         :type group: int
572
573         """
574         self.logger.debug(" ------- DEPLOY START ------ ")
575
576         if not guids:
577             # If no guids list was indicated, all 'NEW' RMs will be deployed
578             guids = []
579             for guid in self.resources:
580                 if self.state(guid) == ResourceState.NEW:
581                     guids.append(guid)
582                 
583         if isinstance(guids, int):
584             guids = [guids]
585
586         # Create deployment group 
587         if not group:
588             group = self._group_id_generator.next(guid)
589
590         if group not in self._groups:
591             self._groups[group] = []
592
593         self._groups[group].extend(guids)
594
595         # Before starting deployment we disorder the guids list with the
596         # purpose of speeding up the whole deployment process.
597         # It is likely that the user inserted in the 'guids' list closely
598         # resources one after another (e.g. all applications
599         # connected to the same node can likely appear one after another).
600         # This can originate a slow down in the deployment since the N 
601         # threads the parallel runner uses to processes tasks may all
602         # be taken up by the same family of resources waiting for the 
603         # same conditions (e.g. LinuxApplications running on a same 
604         # node share a single lock, so they will tend to be serialized).
605         # If we disorder the guids list, this problem can be mitigated.
606         random.shuffle(guids)
607
608         def wait_all_and_start(group):
609             reschedule = False
610             
611             # Get all guids in group
612             guids = self._groups[group]
613
614             for guid in guids:
615                 if self.state(guid) < ResourceState.READY:
616                     reschedule = True
617                     break
618
619             if reschedule:
620                 callback = functools.partial(wait_all_and_start, group)
621                 self.schedule("1s", callback)
622             else:
623                 # If all resources are read, we schedule the start
624                 for guid in guids:
625                     rm = self.get_resource(guid)
626                     self.schedule("0s", rm.start_with_conditions)
627
628         if wait_all_ready:
629             # Schedule a function to check that all resources are
630             # READY, and only then schedule the start.
631             # This aimes at reducing the number of tasks looping in the 
632             # scheduler. 
633             # Intead of having N start tasks, we will have only one for 
634             # the whole group.
635             callback = functools.partial(wait_all_and_start, group)
636             self.schedule("1s", callback)
637
638         for guid in guids:
639             rm = self.get_resource(guid)
640             rm.deployment_group = group
641             self.schedule("0s", rm.deploy)
642
643             if not wait_all_ready:
644                 self.schedule("1s", rm.start_with_conditions)
645
646             if rm.conditions.get(ResourceAction.STOP):
647                 # Only if the RM has STOP conditions we
648                 # schedule a stop. Otherwise the RM will stop immediately
649                 self.schedule("2s", rm.stop_with_conditions)
650
651     def release(self, guids = None):
652         """ Release al RMs on the guids list or 
653         all the resources if no list is specified
654
655             :param guids: List of RM guids
656             :type guids: list
657
658         """
659         if not guids:
660             guids = self.resources
661
662         for guid in guids:
663             rm = self.get_resource(guid)
664             self.schedule("0s", rm.release)
665
666         self.wait_released(guids)
667         
668     def shutdown(self):
669         """ Shutdown the Experiment Controller. 
670         Releases all the resources and stops task processing thread
671
672         """
673         self.release()
674
675         # Mark the EC state as TERMINATED
676         self._state = ECState.TERMINATED
677
678         # Notify condition to wake up the processing thread
679         self._notify()
680         
681         if self._thread.is_alive():
682            self._thread.join()
683
684     def schedule(self, date, callback, track = False):
685         """ Schedule a callback to be executed at time date.
686
687             :param date: string containing execution time for the task.
688                     It can be expressed as an absolute time, using
689                     timestamp format, or as a relative time matching
690                     ^\d+.\d+(h|m|s|ms|us)$
691
692             :param callback: code to be executed for the task. Must be a
693                         Python function, and receives args and kwargs
694                         as arguments.
695
696             :param track: if set to True, the task will be retrivable with
697                     the get_task() method
698
699             :return : The Id of the task
700         """
701         timestamp = stabsformat(date)
702         task = Task(timestamp, callback)
703         task = self._scheduler.schedule(task)
704
705         if track:
706             self._tasks[task.id] = task
707
708         # Notify condition to wake up the processing thread
709         self._notify()
710
711         return task.id
712      
713     def _process(self):
714         """ Process scheduled tasks.
715
716         .. note::
717
718         The _process method is executed in an independent thread held by the 
719         ExperimentController for as long as the experiment is running.
720         
721         Tasks are scheduled by invoking the schedule method with a target callback. 
722         The schedule method is given a execution time which controls the
723         order in which tasks are processed. 
724
725         Tasks are processed in parallel using multithreading. 
726         The environmental variable NEPI_NTHREADS can be used to control
727         the number of threads used to process tasks. The default value is 50.
728
729         Exception handling:
730
731         To execute tasks in parallel, an ParallelRunner (PR) object, holding
732         a pool of threads (workers), is used.
733         For each available thread in the PR, the next task popped from 
734         the scheduler queue is 'put' in the PR.
735         Upon receiving a task to execute, each PR worker (thread) invokes the 
736         _execute method of the EC, passing the task as argument. 
737         This method, calls task.callback inside a try/except block. If an 
738         exception is raised by the tasks.callback, it will be trapped by the 
739         try block, logged to standard error (usually the console), and the EC 
740         state will be set to ECState.FAILED.
741         The invocation of _notify immediately after, forces the processing
742         loop in the _process method, to wake up if it was blocked waiting for new 
743         tasks to arrived, and to check the EC state.
744         As the EC is in FAILED state, the processing loop exits and the 
745         'finally' block is invoked. In the 'finally' block, the 'sync' method
746         of the PR is invoked, which forces the PR to raise any unchecked errors
747         that might have been raised by the workers.
748
749         """
750         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
751
752         runner = ParallelRun(maxthreads = nthreads)
753         runner.start()
754
755         try:
756             while not self.finished:
757                 self._cond.acquire()
758
759                 task = self._scheduler.next()
760                 
761                 if not task:
762                     # No task to execute. Wait for a new task to be scheduled.
763                     self._cond.wait()
764                 else:
765                     # The task timestamp is in the future. Wait for timeout 
766                     # or until another task is scheduled.
767                     now = tnow()
768                     if now < task.timestamp:
769                         # Calculate timeout in seconds
770                         timeout = tdiffsec(task.timestamp, now)
771
772                         # Re-schedule task with the same timestamp
773                         self._scheduler.schedule(task)
774                         
775                         task = None
776
777                         # Wait timeout or until a new task awakes the condition
778                         self._cond.wait(timeout)
779                
780                 self._cond.release()
781
782                 if task:
783                     # Process tasks in parallel
784                     runner.put(self._execute, task)
785         except: 
786             import traceback
787             err = traceback.format_exc()
788             self.logger.error("Error while processing tasks in the EC: %s" % err)
789
790             self._state = ECState.FAILED
791         finally:   
792             self.logger.debug("Exiting the task processing loop ... ")
793             runner.sync()
794
795     def _execute(self, task):
796         """ Executes a single task. 
797
798             :param task: Object containing the callback to execute
799             :type task: Task
800
801         .. note::
802
803         If the invokation of the task callback raises an
804         exception, the processing thread of the ExperimentController
805         will be stopped and the experiment will be aborted.
806
807         """
808         # Invoke callback
809         task.status = TaskStatus.DONE
810
811         try:
812             task.result = task.callback()
813         except:
814             import traceback
815             err = traceback.format_exc()
816             task.result = err
817             task.status = TaskStatus.ERROR
818             
819             self.logger.error("Error occurred while executing task: %s" % err)
820
821             # Set the EC to FAILED state (this will force to exit the task
822             # processing thread)
823             self._state = ECState.FAILED
824
825             # Notify condition to wake up the processing thread
826             self._notify()
827
828             # Propage error to the ParallelRunner
829             raise
830
831     def _notify(self):
832         """ Awakes the processing thread in case it is blocked waiting
833         for a new task to be scheduled.
834         """
835         self._cond.acquire()
836         self._cond.notify()
837         self._cond.release()
838