Added scheduler and task processing thread to ec. Completed deploy and release methods.
[nepi.git] / src / neco / execution / scheduler.py
1 import itertools
2 import heapq
3
4 class Task(object):
5     def __init__(self, timestamp, callback):
6         self.id = None 
7         self.timestamp = timestamp
8         self.callback = callback
9
10 class HeapScheduler(object):
11     """ This class is thread safe.
12     All calls to C Extensions are made atomic by the GIL in the CPython implementation.
13     heapq.heappush, heapq.heappop, and list access are therefore thread-safe """
14
15     def __init__(self):
16         super(HeapScheduler, self).__init__()
17         self._queue = list() 
18         self._valid = set()
19         self._idgen = itertools.count(1)
20
21     def schedule(self, task):
22         if task.id == None:
23             task.id = self._idgen.next()
24         entry = (task.timestamp, task.id, task)
25         self._valid.add(task.id)
26         heapq.heappush(self._queue, entry)
27         return task
28
29     def remove(self, tid):
30         try:
31             self._valid.remove(tid)
32         except:
33             pass
34
35     def next(self):
36         while self._queue:
37             try:
38                 timestamp, tid, task = heapq.heappop(self._queue)
39                 if tid in self._valid:
40                     self.remove(tid)
41                     return task
42             except IndexError:
43                 # heap empty
44                 pass
45         return None
46