Source code for nepi.execution.scheduler

#
#    NEPI, a framework to manage network experiments
#    Copyright (C) 2013 INRIA
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License version 2 as
#    published by the Free Software Foundation;
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Author: Alina Quereilhac <alina.quereilhac@inria.fr>

import itertools
import heapq

[docs]class TaskStatus: """ Execution state of the Task """ NEW = 0 DONE = 1 ERROR = 2
[docs]class Task(object): """ A Task represents an operation to be executed by the ExperimentController scheduler """ def __init__(self, timestamp, callback): """ :param timestamp: Future execution date of the operation :type timestamp: str :param callback: A function to invoke in order to execute the operation :type callback: function """ self.id = None self.timestamp = timestamp self.callback = callback self.result = None self.status = TaskStatus.NEW
[docs]class HeapScheduler(object): """ Create a Heap Scheduler .. note:: This class is thread safe. All calls to C Extensions are made atomic by the GIL in the CPython implementation. heapq.heappush, heapq.heappop, and list access are therefore thread-safe. """ def __init__(self): super(HeapScheduler, self).__init__() self._queue = list() self._valid = set() self._idgen = itertools.count(1) @property
[docs] def pending(self): """ Returns the list of pending task ids """ return self._valid
[docs] def schedule(self, task): """ Add a task to the queue ordered by task.timestamp and arrival order :param task: task to schedule :type task: task """ if task.id == None: task.id = self._idgen.next() entry = (task.timestamp, task.id, task) self._valid.add(task.id) heapq.heappush(self._queue, entry) return task
[docs] def remove(self, tid): """ Remove a task form the queue :param tid: Id of the task to be removed :type tid: int """ try: self._valid.remove(tid) except: pass
[docs] def next(self): """ Get the next task in the queue by timestamp and arrival order """ while self._queue: try: timestamp, tid, task = heapq.heappop(self._queue) if tid in self._valid: self.remove(tid) return task except IndexError: # heap empty pass return None