de013d1c2de16a9ab10ef5c7f6abc29559274305
[nepi.git] / src / nepi / execution / ec.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import functools
21 import logging
22 import os
23 import random
24 import sys
25 import time
26 import threading
27
28 from nepi.util import guid
29 from nepi.util.parallel import ParallelRun
30 from nepi.util.timefuncs import tnow, tdiffsec, stabsformat, tsformat 
31 from nepi.execution.resource import ResourceFactory, ResourceAction, \
32         ResourceState, ResourceState2str
33 from nepi.execution.scheduler import HeapScheduler, Task, TaskStatus
34 from nepi.execution.trace import TraceAttr
35
36 # TODO: use multiprocessing instead of threading
37 # TODO: When a failure occurs during deployment, scp and ssh processes are left running behind!!
38 # TODO: Allow to reconnect to a running experiment instance! (reconnect mode vs deploy mode)
39
40 class ECState(object):
41     """ State of the Experiment Controller
42    
43     """
44     RUNNING = 1
45     FAILED = 2
46     TERMINATED = 3
47
48 class ExperimentController(object):
49     """
50     .. class:: Class Args :
51       
52         :param exp_id: Human readable identifier for the experiment scenario. 
53                        It will be used in the name of the directory 
54                         where experiment related information is stored
55         :type exp_id: str
56
57     .. note::
58
59         An experiment, or scenario, is defined by a concrete use, behavior,
60         configuration and interconnection of resources that describe a single
61         experiment case (We call this the experiment description). 
62         A same experiment (scenario) can be run many times.
63
64         The ExperimentController (EC), is the entity responsible for 
65         managing an experiment instance (run). The same scenario can be 
66         recreated (and re-run) by instantiating an EC and recreating 
67         the same experiment description. 
68
69         In NEPI, an experiment is represented as a graph of interconnected
70         resources. A resource is a generic concept in the sense that any
71         component taking part of an experiment, whether physical of
72         virtual, is considered a resource. A resources could be a host, 
73         a virtual machine, an application, a simulator, a IP address.
74
75         A ResourceManager (RM), is the entity responsible for managing a 
76         single resource. ResourceManagers are specific to a resource
77         type (i.e. An RM to control a Linux application will not be
78         the same as the RM used to control a ns-3 simulation).
79         In order for a new type of resource to be supported in NEPI
80         a new RM must be implemented. NEPI already provides different
81         RMs to control basic resources, and new can be extended from
82         the existing ones.
83
84         Through the EC interface the user can create ResourceManagers (RMs),
85         configure them and interconnect them, in order to describe an experiment.
86         Describing an experiment through the EC does not run the experiment.
87         Only when the 'deploy()' method is invoked on the EC, will the EC take 
88         actions to transform the 'described' experiment into a 'running' experiment.
89
90         While the experiment is running, it is possible to continue to
91         create/configure/connect RMs, and to deploy them to involve new
92         resources in the experiment (this is known as 'interactive' deployment).
93         
94         An experiments in NEPI is identified by a string id, 
95         which is either given by the user, or automatically generated by NEPI.  
96         The purpose of this identifier is to separate files and results that 
97         belong to different experiment scenarios. 
98         However, since a same 'experiment' can be run many times, the experiment
99         id is not enough to identify an experiment instance (run).
100         For this reason, the ExperimentController has two identifier, the 
101         exp_id, which can be re-used by different ExperimentController instances,
102         and the run_id, which unique to a ExperimentController instance, and
103         is automatically generated by NEPI.
104         
105     """
106
107     def __init__(self, exp_id = None): 
108         super(ExperimentController, self).__init__()
109         # Logging
110         self._logger = logging.getLogger("ExperimentController")
111
112         # Run identifier. It identifies a concrete instance (run) of an experiment.
113         # Since a same experiment (same configuration) can be run many times,
114         # this id permits to identify concrete exoeriment run
115         self._run_id = tsformat()
116
117         # Experiment identifier. Usually assigned by the user
118         self._exp_id = exp_id or "exp-%s" % os.urandom(8).encode('hex')
119
120         # generator of globally unique ids
121         self._guid_generator = guid.GuidGenerator()
122         
123         # Resource managers
124         self._resources = dict()
125
126         # Scheduler
127         self._scheduler = HeapScheduler()
128
129         # Tasks
130         self._tasks = dict()
131
132         # RM groups 
133         self._groups = dict()
134
135         # generator of globally unique id for groups
136         self._group_id_generator = guid.GuidGenerator()
137  
138         # Event processing thread
139         self._cond = threading.Condition()
140         self._thread = threading.Thread(target = self._process)
141         self._thread.setDaemon(True)
142         self._thread.start()
143
144         # EC state
145         self._state = ECState.RUNNING
146
147     @property
148     def logger(self):
149         """ Return the logger of the Experiment Controller
150
151         """
152         return self._logger
153
154     @property
155     def ecstate(self):
156         """ Return the state of the Experiment Controller
157
158         """
159         return self._state
160
161     @property
162     def exp_id(self):
163         """ Return the experiment id assigned by the user
164
165         """
166         return self._exp_id
167
168     @property
169     def run_id(self):
170         """ Return the experiment instance (run) identifier  
171
172         """
173         return self._run_id
174
175     @property
176     def finished(self):
177         """ Put the state of the Experiment Controller into a final state :
178             Either TERMINATED or FAILED
179
180         """
181         return self.ecstate in [ECState.FAILED, ECState.TERMINATED]
182
183     def wait_finished(self, guids):
184         """ Blocking method that wait until all the RM from the 'guid' list 
185             reached the state FINISHED
186
187         :param guids: List of guids
188         :type guids: list
189         """
190         return self.wait(guids)
191
192     def wait_started(self, guids):
193         """ Blocking method that wait until all the RM from the 'guid' list 
194             reached the state STARTED
195
196         :param guids: List of guids
197         :type guids: list
198         """
199         return self.wait(guids, states = [ResourceState.STARTED,
200             ResourceState.STOPPED,
201             ResourceState.FAILED,
202             ResourceState.FINISHED])
203
204     def wait_released(self, guids):
205         """ Blocking method that wait until all the RM from the 'guid' list 
206             reached the state RELEASED
207
208         :param guids: List of guids
209         :type guids: list
210         """
211         return self.wait(guids, states = [ResourceState.RELEASED,
212             ResourceState.STOPPED,
213             ResourceState.FAILED,
214             ResourceState.FINISHED])
215
216     def wait(self, guids, states = [ResourceState.FINISHED, 
217             ResourceState.FAILED,
218             ResourceState.STOPPED]):
219         """ Blocking method that waits until all the RM from the 'guid' list 
220             reached state 'state' or until a failure occurs
221             
222         :param guids: List of guids
223         :type guids: list
224         """
225         if isinstance(guids, int):
226             guids = [guids]
227
228         # we randomly alter the order of the guids to avoid ordering
229         # dependencies (e.g. LinuxApplication RMs runing on the same
230         # linux host will be synchronized by the LinuxNode SSH lock)
231         random.shuffle(guids)
232
233         while True:
234             # If no more guids to wait for or an error occured, then exit
235             if len(guids) == 0 or self.finished:
236                 break
237
238             # If a guid reached one of the target states, remove it from list
239             guid = guids[0]
240             state = self.state(guid)
241
242             if state in states:
243                 guids.remove(guid)
244             else:
245                 # Debug...
246                 self.logger.debug(" WAITING FOR %g - state %s " % (guid,
247                     self.state(guid, hr = True)))
248
249                 # Take the opportunity to 'refresh' the states of the RMs.
250                 # Query only the first up to N guids (not to overwhelm 
251                 # the local machine)
252                 n = 100
253                 lim = n if len(guids) > n else ( len(guids) -1 )
254                 nguids = guids[0: lim]
255
256                 # schedule state request for all guids (take advantage of
257                 # scheduler multi threading).
258                 for guid in nguids:
259                     callback = functools.partial(self.state, guid)
260                     self.schedule("0s", callback)
261
262                 # If the guid is not in one of the target states, wait and
263                 # continue quering. We keep the sleep big to decrease the
264                 # number of RM state queries
265                 time.sleep(2)
266   
267     def get_task(self, tid):
268         """ Get a specific task
269
270         :param tid: Id of the task
271         :type tid: int
272         :rtype: Task
273         """
274         return self._tasks.get(tid)
275
276     def get_resource(self, guid):
277         """ Get a specific Resource Manager
278
279         :param guid: Id of the task
280         :type guid: int
281         :rtype: ResourceManager
282         """
283         return self._resources.get(guid)
284
285     @property
286     def resources(self):
287         """ Returns the list of all the Resource Manager Id
288
289         :rtype: set
290
291         """
292         return self._resources.keys()
293
294     def register_resource(self, rtype, guid = None):
295         """ Register a Resource Manager. It creates a new 'guid', if it is not specified, 
296         for the RM of type 'rtype' and add it to the list of Resources.
297
298         :param rtype: Type of the RM
299         :type rtype: str
300         :return: Id of the RM
301         :rtype: int
302         """
303         # Get next available guid
304         guid = self._guid_generator.next(guid)
305         
306         # Instantiate RM
307         rm = ResourceFactory.create(rtype, self, guid)
308
309         # Store RM
310         self._resources[guid] = rm
311
312         return guid
313
314     def get_attributes(self, guid):
315         """ Return all the attibutes of a specific RM
316
317         :param guid: Guid of the RM
318         :type guid: int
319         :return: List of attributes
320         :rtype: list
321         """
322         rm = self.get_resource(guid)
323         return rm.get_attributes()
324
325     def register_connection(self, guid1, guid2):
326         """ Registers a guid1 with a guid2. 
327             The declaration order is not important
328
329             :param guid1: First guid to connect
330             :type guid1: ResourceManager
331
332             :param guid2: Second guid to connect
333             :type guid: ResourceManager
334         """
335         rm1 = self.get_resource(guid1)
336         rm2 = self.get_resource(guid2)
337
338         rm1.register_connection(guid2)
339         rm2.register_connection(guid1)
340
341     def register_condition(self, guids1, action, guids2, state,
342             time = None):
343         """ Registers an action START or STOP for all RM on guids1 to occur 
344             time 'time' after all elements in guids2 reached state 'state'.
345
346             :param guids1: List of guids of RMs subjected to action
347             :type guids1: list
348
349             :param action: Action to register (either START or STOP)
350             :type action: ResourceAction
351
352             :param guids2: List of guids of RMs to we waited for
353             :type guids2: list
354
355             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
356             :type state: ResourceState
357
358             :param time: Time to wait after guids2 has reached status 
359             :type time: string
360
361         """
362         if isinstance(guids1, int):
363             guids1 = [guids1]
364         if isinstance(guids2, int):
365             guids2 = [guids2]
366
367         for guid1 in guids1:
368             rm = self.get_resource(guid1)
369             rm.register_condition(action, guids2, state, time)
370
371     def enable_trace(self, guid, name):
372         """ Enable trace
373
374         :param name: Name of the trace
375         :type name: str
376         """
377         rm = self.get_resource(guid)
378         rm.enable_trace(name)
379
380     def trace_enabled(self, guid, name):
381         """ Returns True if trace is enabled
382
383         :param name: Name of the trace
384         :type name: str
385         """
386         rm = self.get_resource(guid)
387         return rm.trace_enabled(name)
388
389     def trace(self, guid, name, attr = TraceAttr.ALL, block = 512, offset = 0):
390         """ Get information on collected trace
391
392         :param name: Name of the trace
393         :type name: str
394
395         :param attr: Can be one of:
396                          - TraceAttr.ALL (complete trace content), 
397                          - TraceAttr.STREAM (block in bytes to read starting at offset), 
398                          - TraceAttr.PATH (full path to the trace file),
399                          - TraceAttr.SIZE (size of trace file). 
400         :type attr: str
401
402         :param block: Number of bytes to retrieve from trace, when attr is TraceAttr.STREAM 
403         :type name: int
404
405         :param offset: Number of 'blocks' to skip, when attr is TraceAttr.STREAM 
406         :type name: int
407
408         :rtype: str
409         """
410         rm = self.get_resource(guid)
411         return rm.trace(name, attr, block, offset)
412
413     def discover(self, guid):
414         """ Discover a specific RM defined by its 'guid'
415
416             :param guid: Guid of the RM
417             :type guid: int
418
419         """
420         rm = self.get_resource(guid)
421         return rm.discover()
422
423     def provision(self, guid):
424         """ Provision a specific RM defined by its 'guid'
425
426             :param guid: Guid of the RM
427             :type guid: int
428
429         """
430         rm = self.get_resource(guid)
431         return rm.provision()
432
433     def get(self, guid, name):
434         """ Get a specific attribute 'name' from the RM 'guid'
435
436             :param guid: Guid of the RM
437             :type guid: int
438
439             :param name: attribute's name
440             :type name: str
441
442         """
443         rm = self.get_resource(guid)
444         return rm.get(name)
445
446     def set(self, guid, name, value):
447         """ Set a specific attribute 'name' from the RM 'guid' 
448             with the value 'value' 
449
450             :param guid: Guid of the RM
451             :type guid: int
452
453             :param name: attribute's name
454             :type name: str
455
456             :param value: attribute's value
457
458         """
459         rm = self.get_resource(guid)
460         return rm.set(name, value)
461
462     def state(self, guid, hr = False):
463         """ Returns the state of a resource
464
465             :param guid: Resource guid
466             :type guid: integer
467
468             :param hr: Human readable. Forces return of a 
469                 status string instead of a number 
470             :type hr: boolean
471
472         """
473         rm = self.get_resource(guid)
474         state = rm.state
475
476         if hr:
477             return ResourceState2str.get(state)
478
479         return state
480
481     def stop(self, guid):
482         """ Stop a specific RM defined by its 'guid'
483
484             :param guid: Guid of the RM
485             :type guid: int
486
487         """
488         rm = self.get_resource(guid)
489         return rm.stop()
490
491     def start(self, guid):
492         """ Start a specific RM defined by its 'guid'
493
494             :param guid: Guid of the RM
495             :type guid: int
496
497         """
498         rm = self.get_resource(guid)
499         return rm.start()
500
501     def set_with_conditions(self, name, value, guids1, guids2, state,
502             time = None):
503         """ Set value 'value' on attribute with name 'name' on all RMs of
504             guids1 when 'time' has elapsed since all elements in guids2 
505             have reached state 'state'.
506
507             :param name: Name of attribute to set in RM
508             :type name: string
509
510             :param value: Value of attribute to set in RM
511             :type name: string
512
513             :param guids1: List of guids of RMs subjected to action
514             :type guids1: list
515
516             :param action: Action to register (either START or STOP)
517             :type action: ResourceAction
518
519             :param guids2: List of guids of RMs to we waited for
520             :type guids2: list
521
522             :param state: State to wait for on RMs (STARTED, STOPPED, etc)
523             :type state: ResourceState
524
525             :param time: Time to wait after guids2 has reached status 
526             :type time: string
527
528         """
529         if isinstance(guids1, int):
530             guids1 = [guids1]
531         if isinstance(guids2, int):
532             guids2 = [guids2]
533
534         for guid1 in guids1:
535             rm = self.get_resource(guid)
536             rm.set_with_conditions(name, value, guids2, state, time)
537
538     def stop_with_conditions(self, guid):
539         """ Stop a specific RM defined by its 'guid' only if all the conditions are true
540
541             :param guid: Guid of the RM
542             :type guid: int
543
544         """
545         rm = self.get_resource(guid)
546         return rm.stop_with_conditions()
547
548     def start_with_conditions(self, guid):
549         """ Start a specific RM defined by its 'guid' only if all the conditions are true
550
551             :param guid: Guid of the RM
552             :type guid: int
553
554         """
555         rm = self.get_resource(guid)
556         return rm.start_with_conditions()
557
558     def deploy(self, guids = None, wait_all_ready = True, group = None):
559         """ Deploy all resource manager in guids list
560
561         :param guids: List of guids of RMs to deploy
562         :type guids: list
563
564         :param wait_all_ready: Wait until all RMs are ready in
565             order to start the RMs
566         :type guid: int
567
568         :param group: Id of deployment group in which to deploy RMs
569         :type group: int
570
571         """
572         self.logger.debug(" ------- DEPLOY START ------ ")
573
574         if not guids:
575             # If no guids list was indicated, all 'NEW' RMs will be deployed
576             guids = []
577             for guid in self.resources:
578                 if self.state(guid) == ResourceState.NEW:
579                     guids.append(guid)
580                 
581         if isinstance(guids, int):
582             guids = [guids]
583
584         # Create deployment group
585         new_group = False
586         if not group:
587             new_group = True
588             group = self._group_id_generator.next(guid)
589
590         if group not in self._groups:
591             self._groups[group] = []
592
593         self._groups[group].extend(guids)
594
595         # Before starting deployment we disorder the guids list with the
596         # purpose of speeding up the whole deployment process.
597         # It is likely that the user inserted in the 'guids' list closely
598         # resources one after another (e.g. all applications
599         # connected to the same node can likely appear one after another).
600         # This can originate a slow down in the deployment since the N 
601         # threads the parallel runner uses to processes tasks may all
602         # be taken up by the same family of resources waiting for the 
603         # same conditions (e.g. LinuxApplications running on a same 
604         # node share a single lock, so they will tend to be serialized).
605         # If we disorder the guids list, this problem can be mitigated.
606         random.shuffle(guids)
607
608         def wait_all_and_start(group):
609             reschedule = False
610             
611             # Get all guids in group
612             guids = self._groups[group]
613
614             for guid in guids:
615                 if self.state(guid) < ResourceState.READY:
616                     reschedule = True
617                     break
618
619             if reschedule:
620                 callback = functools.partial(wait_all_and_start, group)
621                 self.schedule("1s", callback)
622             else:
623                 # If all resources are read, we schedule the start
624                 for guid in guids:
625                     rm = self.get_resource(guid)
626                     self.schedule("0s", rm.start_with_conditions)
627
628         if wait_all_ready and new_group:
629             # Schedule a function to check that all resources are
630             # READY, and only then schedule the start.
631             # This aimes at reducing the number of tasks looping in the 
632             # scheduler. 
633             # Intead of having N start tasks, we will have only one for 
634             # the whole group.
635             callback = functools.partial(wait_all_and_start, group)
636             self.schedule("1s", callback)
637
638         for guid in guids:
639             rm = self.get_resource(guid)
640             rm.deployment_group = group
641             self.schedule("0s", rm.deploy)
642
643             if not wait_all_ready:
644                 self.schedule("1s", rm.start_with_conditions)
645
646             if rm.conditions.get(ResourceAction.STOP):
647                 # Only if the RM has STOP conditions we
648                 # schedule a stop. Otherwise the RM will stop immediately
649                 self.schedule("2s", rm.stop_with_conditions)
650
651     def release(self, guids = None):
652         """ Release al RMs on the guids list or 
653         all the resources if no list is specified
654
655             :param guids: List of RM guids
656             :type guids: list
657
658         """
659         if not guids:
660             guids = self.resources
661
662         for guid in guids:
663             rm = self.get_resource(guid)
664             self.schedule("0s", rm.release)
665
666         self.wait_released(guids)
667         
668     def shutdown(self):
669         """ Shutdown the Experiment Controller. 
670         Releases all the resources and stops task processing thread
671
672         """
673         self.release()
674
675         # Mark the EC state as TERMINATED
676         self._state = ECState.TERMINATED
677
678         # Notify condition to wake up the processing thread
679         self._notify()
680         
681         if self._thread.is_alive():
682            self._thread.join()
683
684     def schedule(self, date, callback, track = False):
685         """ Schedule a callback to be executed at time date.
686
687             :param date: string containing execution time for the task.
688                     It can be expressed as an absolute time, using
689                     timestamp format, or as a relative time matching
690                     ^\d+.\d+(h|m|s|ms|us)$
691
692             :param callback: code to be executed for the task. Must be a
693                         Python function, and receives args and kwargs
694                         as arguments.
695
696             :param track: if set to True, the task will be retrivable with
697                     the get_task() method
698
699             :return : The Id of the task
700         """
701         timestamp = stabsformat(date)
702         task = Task(timestamp, callback)
703         task = self._scheduler.schedule(task)
704
705         if track:
706             self._tasks[task.id] = task
707
708         # Notify condition to wake up the processing thread
709         self._notify()
710
711         return task.id
712      
713     def _process(self):
714         """ Process scheduled tasks.
715
716         .. note::
717
718         The _process method is executed in an independent thread held by the 
719         ExperimentController for as long as the experiment is running.
720         
721         Tasks are scheduled by invoking the schedule method with a target callback. 
722         The schedule method is given a execution time which controls the
723         order in which tasks are processed. 
724
725         Tasks are processed in parallel using multithreading. 
726         The environmental variable NEPI_NTHREADS can be used to control
727         the number of threads used to process tasks. The default value is 50.
728
729         Exception handling:
730
731         To execute tasks in parallel, an ParallelRunner (PR) object, holding
732         a pool of threads (workers), is used.
733         For each available thread in the PR, the next task popped from 
734         the scheduler queue is 'put' in the PR.
735         Upon receiving a task to execute, each PR worker (thread) invokes the 
736         _execute method of the EC, passing the task as argument. 
737         This method, calls task.callback inside a try/except block. If an 
738         exception is raised by the tasks.callback, it will be trapped by the 
739         try block, logged to standard error (usually the console), and the EC 
740         state will be set to ECState.FAILED.
741         The invocation of _notify immediately after, forces the processing
742         loop in the _process method, to wake up if it was blocked waiting for new 
743         tasks to arrived, and to check the EC state.
744         As the EC is in FAILED state, the processing loop exits and the 
745         'finally' block is invoked. In the 'finally' block, the 'sync' method
746         of the PR is invoked, which forces the PR to raise any unchecked errors
747         that might have been raised by the workers.
748
749         """
750         nthreads = int(os.environ.get("NEPI_NTHREADS", "50"))
751
752         runner = ParallelRun(maxthreads = nthreads)
753         runner.start()
754
755         try:
756             while not self.finished:
757                 self._cond.acquire()
758
759                 task = self._scheduler.next()
760                 
761                 if not task:
762                     # No task to execute. Wait for a new task to be scheduled.
763                     self._cond.wait()
764                 else:
765                     # The task timestamp is in the future. Wait for timeout 
766                     # or until another task is scheduled.
767                     now = tnow()
768                     if now < task.timestamp:
769                         # Calculate timeout in seconds
770                         timeout = tdiffsec(task.timestamp, now)
771
772                         # Re-schedule task with the same timestamp
773                         self._scheduler.schedule(task)
774                         
775                         task = None
776
777                         # Wait timeout or until a new task awakes the condition
778                         self._cond.wait(timeout)
779                
780                 self._cond.release()
781
782                 if task:
783                     # Process tasks in parallel
784                     runner.put(self._execute, task)
785         except: 
786             import traceback
787             err = traceback.format_exc()
788             self.logger.error("Error while processing tasks in the EC: %s" % err)
789
790             self._state = ECState.FAILED
791         finally:   
792             self.logger.debug("Exiting the task processing loop ... ")
793             runner.sync()
794             runner.destroy()
795
796     def _execute(self, task):
797         """ Executes a single task. 
798
799             :param task: Object containing the callback to execute
800             :type task: Task
801
802         .. note::
803
804         If the invokation of the task callback raises an
805         exception, the processing thread of the ExperimentController
806         will be stopped and the experiment will be aborted.
807
808         """
809         # Invoke callback
810         task.status = TaskStatus.DONE
811
812         try:
813             task.result = task.callback()
814         except:
815             import traceback
816             err = traceback.format_exc()
817             task.result = err
818             task.status = TaskStatus.ERROR
819             
820             self.logger.error("Error occurred while executing task: %s" % err)
821
822             # Set the EC to FAILED state (this will force to exit the task
823             # processing thread)
824             self._state = ECState.FAILED
825
826             # Notify condition to wake up the processing thread
827             self._notify()
828
829             # Propage error to the ParallelRunner
830             raise
831
832     def _notify(self):
833         """ Awakes the processing thread in case it is blocked waiting
834         for a new task to be scheduled.
835         """
836         self._cond.acquire()
837         self._cond.notify()
838         self._cond.release()
839