6ebcfea3f1128bacd746fe8a2719687bb9f92450
[nepi.git] / src / nepi / execution / scheduler.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18
19 import itertools
20 import heapq
21
22 class TaskStatus:
23     """ Execution state of the Task
24     """
25     NEW = 0
26     DONE = 1
27     ERROR = 2
28
29 class Task(object):
30     """ A Task represents an operation to be executed by the 
31     ExperimentController scheduler
32     """
33
34     def __init__(self, timestamp, callback):
35         """
36         :param timestamp: Future execution date of the operation
37         :type timestamp: str
38
39         :param callback: A function to invoke in order to execute the operation
40         :type callback: function
41
42         """ 
43         self.id = None 
44         self.timestamp = timestamp
45         self.callback = callback
46         self.result = None
47         self.status = TaskStatus.NEW
48
49 class HeapScheduler(object):
50     """ Create a Heap Scheduler
51
52     .. note::
53
54         This class is thread safe.
55         All calls to C Extensions are made atomic by the GIL in the CPython implementation.
56         heapq.heappush, heapq.heappop, and list access are therefore thread-safe.
57
58     """
59
60     def __init__(self):
61         super(HeapScheduler, self).__init__()
62         self._queue = list() 
63         self._valid = set()
64         self._idgen = itertools.count(1)
65
66     @property
67     def pending(self):
68         """ Returns the list of pending task ids """
69         return self._valid
70
71     def schedule(self, task):
72         """ Add a task to the queue ordered by task.timestamp and arrival order
73
74         :param task: task to schedule
75         :type task: task
76         """
77         if task.id == None:
78             task.id = self._idgen.next()
79
80         entry = (task.timestamp, task.id, task)
81         self._valid.add(task.id)
82         heapq.heappush(self._queue, entry)
83         return task
84
85     def remove(self, tid):
86         """ Remove a task form the queue
87
88         :param tid: Id of the task to be removed
89         :type tid: int
90
91         """
92         try:
93             self._valid.remove(tid)
94         except:
95             pass
96
97     def next(self):
98         """ Get the next task in the queue by timestamp and arrival order
99         """
100         while self._queue:
101             try:
102                 timestamp, tid, task = heapq.heappop(self._queue)
103                 if tid in self._valid:
104                     self.remove(tid)
105                     return task
106             except IndexError:
107                 # heap empty
108                 pass
109         return None
110