cbd9874f19f550c809a7fa3a23673bacfe030ba0
[nepi.git] / src / nepi / execution / scheduler.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19
20 import itertools
21 import heapq
22
23 class TaskStatus:
24     NEW = 0
25     DONE = 1
26     ERROR = 2
27
28 class Task(object):
29     """ This class is to define a task, that is represented by an id,
30     an execution time 'timestamp' and an action 'callback """
31
32     def __init__(self, timestamp, callback):
33         self.id = None 
34         self.timestamp = timestamp
35         self.callback = callback
36         self.result = None
37         self.status = TaskStatus.NEW
38
39 class HeapScheduler(object):
40     """ Create a Heap Scheduler.
41
42     .. note::
43
44         This class is thread safe.
45         All calls to C Extensions are made atomic by the GIL in the CPython implementation.
46         heapq.heappush, heapq.heappop, and list access are therefore thread-safe.
47
48     """
49
50     def __init__(self):
51         super(HeapScheduler, self).__init__()
52         self._queue = list() 
53         self._valid = set()
54         self._idgen = itertools.count(1)
55
56     @property
57     def pending(self):
58         """ Returns the list of pending task ids """
59         return self._valid
60
61     def schedule(self, task):
62         """ Add the task 'task' in the heap of the scheduler
63
64         :param task: task that need to be schedule
65         :type task: task
66         """
67         if task.id == None:
68             task.id = self._idgen.next()
69
70         entry = (task.timestamp, task.id, task)
71         self._valid.add(task.id)
72         heapq.heappush(self._queue, entry)
73         return task
74
75     def remove(self, tid):
76         """ Remove a task form the heap
77
78         :param tid: Id of the task that need to be removed
79         :type tid: int
80         """
81         try:
82             self._valid.remove(tid)
83         except:
84             pass
85
86     def next(self):
87         """ Get the next task in the scheduler
88
89         """
90         while self._queue:
91             try:
92                 timestamp, tid, task = heapq.heappop(self._queue)
93                 if tid in self._valid:
94                     self.remove(tid)
95                     return task
96             except IndexError:
97                 # heap empty
98                 pass
99         return None
100