33d936e53d4a74c0f155b9c33dd08d0a0119e16a
[nepi.git] / src / nepi / execution / scheduler.py
1 """
2     NEPI, a framework to manage network experiments
3     Copyright (C) 2013 INRIA
4
5     This program is free software: you can redistribute it and/or modify
6     it under the terms of the GNU General Public License as published by
7     the Free Software Foundation, either version 3 of the License, or
8     (at your option) any later version.
9
10     This program is distributed in the hope that it will be useful,
11     but WITHOUT ANY WARRANTY; without even the implied warranty of
12     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 """
19
20 import itertools
21 import heapq
22
23 class TaskStatus:
24     NEW = 0
25     DONE = 1
26     ERROR = 2
27
28
29 class Task(object):
30     def __init__(self, timestamp, callback):
31         self.id = None 
32         self.timestamp = timestamp
33         self.callback = callback
34         self.result = None
35         self.status = TaskStatus.NEW
36
37 class HeapScheduler(object):
38     """ This class is thread safe.
39     All calls to C Extensions are made atomic by the GIL in the CPython implementation.
40     heapq.heappush, heapq.heappop, and list access are therefore thread-safe """
41
42     def __init__(self):
43         super(HeapScheduler, self).__init__()
44         self._queue = list() 
45         self._valid = set()
46         self._idgen = itertools.count(1)
47
48     def schedule(self, task):
49         if task.id == None:
50             task.id = self._idgen.next()
51         entry = (task.timestamp, task.id, task)
52         self._valid.add(task.id)
53         heapq.heappush(self._queue, entry)
54         return task
55
56     def remove(self, tid):
57         try:
58             self._valid.remove(tid)
59         except:
60             pass
61
62     def next(self):
63         while self._queue:
64             try:
65                 timestamp, tid, task = heapq.heappop(self._queue)
66                 if tid in self._valid:
67                     self.remove(tid)
68                     return task
69             except IndexError:
70                 # heap empty
71                 pass
72         return None
73