202b711ae1a9bd38793405ba9eefba037db7b1fc
[nepi.git] / src / neco / execution / scheduler.py
1 import itertools
2 import heapq
3
4 class HeapScheduler(object):
5     """ This class is thread safe.
6     All calls to C Extensions are made atomic by the GIL in the CPython implementation.
7     heapq.heappush, heapq.heappop, and list access are therefore thread-safe """
8
9     def __init__(self):
10         super(HeapScheduler, self).__init__()
11         self._queue = list() 
12         self._valid = set()
13         self._idgen = itertools.count(1)
14
15     def schedule(self, task):
16         if task.id == None:
17             task.id = self._idgen.next()
18         entry = (task.timestamp, task.id, task)
19         self._valid.add(task.id)
20         heapq.heappush(self._queue, entry)
21         return task
22
23     def remove(self, tid):
24         try:
25             self._valid.remove(tid)
26         except:
27             pass
28
29     def next(self):
30         while self._queue:
31             try:
32                 timestamp, tid, task = heapq.heappop(self._queue)
33                 if tid in self._valid:
34                     self.remove(tid)
35                     return task
36             except IndexError:
37                 # heap empty
38                 pass
39         return None
40