README moves to markdown
[nepi.git] / src / nepi / execution / scheduler.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 from six import next
20
21 import itertools
22 import heapq
23
24 class TaskStatus:
25     """ Execution state of the Task
26     """
27     NEW = 0
28     DONE = 1
29     ERROR = 2
30
31 class Task(object):
32     """ A Task represents an operation to be executed by the 
33     ExperimentController scheduler
34     """
35
36     def __init__(self, timestamp, callback):
37         """
38         :param timestamp: Future execution date of the operation
39         :type timestamp: str
40
41         :param callback: A function to invoke in order to execute the operation
42         :type callback: function
43
44         """ 
45         self.id = None 
46         self.timestamp = timestamp
47         self.callback = callback
48         self.result = None
49         self.status = TaskStatus.NEW
50
51 class HeapScheduler(object):
52     """ Create a Heap Scheduler
53
54     .. note::
55
56         This class is thread safe.
57         All calls to C Extensions are made atomic by the GIL in the CPython implementation.
58         heapq.heappush, heapq.heappop, and list access are therefore thread-safe.
59
60     """
61
62     def __init__(self):
63         super(HeapScheduler, self).__init__()
64         self._queue = list() 
65         self._valid = set()
66         self._idgen = itertools.count(1)
67
68     @property
69     def pending(self):
70         """ Returns the list of pending task ids """
71         return self._valid
72
73     def schedule(self, task):
74         """ Add a task to the queue ordered by task.timestamp and arrival order
75
76         :param task: task to schedule
77         :type task: task
78         """
79         if task.id == None:
80             task.id = next(self._idgen)
81
82         entry = (task.timestamp, task.id, task)
83         self._valid.add(task.id)
84         heapq.heappush(self._queue, entry)
85         return task
86
87     def remove(self, tid):
88         """ Remove a task form the queue
89
90         :param tid: Id of the task to be removed
91         :type tid: int
92
93         """
94         try:
95             self._valid.remove(tid)
96         except:
97             pass
98
99     # py3 compat
100     def __next__(self):
101         return self.next()
102
103     def next(self):
104         """ Get the next task in the queue by timestamp and arrival order
105         """
106         while self._queue:
107             try:
108                 timestamp, tid, task = heapq.heappop(self._queue)
109                 if tid in self._valid:
110                     self.remove(tid)
111                     return task
112             except IndexError:
113                 # heap empty
114                 pass
115         return None
116