Minor typos
[nepi.git] / src / nepi / execution / scheduler.py
1 import itertools
2 import heapq
3
4 class TaskStatus:
5     NEW = 0
6     DONE = 1
7     ERROR = 2
8
9
10 class Task(object):
11     def __init__(self, timestamp, callback):
12         self.id = None 
13         self.timestamp = timestamp
14         self.callback = callback
15         self.result = None
16         self.status = TaskStatus.NEW
17
18 class HeapScheduler(object):
19     """ This class is thread safe.
20     All calls to C Extensions are made atomic by the GIL in the CPython implementation.
21     heapq.heappush, heapq.heappop, and list access are therefore thread-safe """
22
23     def __init__(self):
24         super(HeapScheduler, self).__init__()
25         self._queue = list() 
26         self._valid = set()
27         self._idgen = itertools.count(1)
28
29     def schedule(self, task):
30         if task.id == None:
31             task.id = self._idgen.next()
32         entry = (task.timestamp, task.id, task)
33         self._valid.add(task.id)
34         heapq.heappush(self._queue, entry)
35         return task
36
37     def remove(self, tid):
38         try:
39             self._valid.remove(tid)
40         except:
41             pass
42
43     def next(self):
44         while self._queue:
45             try:
46                 timestamp, tid, task = heapq.heappop(self._queue)
47                 if tid in self._valid:
48                     self.remove(tid)
49                     return task
50             except IndexError:
51                 # heap empty
52                 pass
53         return None
54