# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Claudio Freire # # A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely! import threading import Queue import traceback import sys import os N_PROCS = None class WorkerThread(threading.Thread): class QUIT: pass class REASSIGNED: pass def run(self): while True: task = self.queue.get() if task is None: self.done = True self.queue.task_done() continue elif task is self.QUIT: self.done = True self.queue.task_done() break elif task is self.REASSIGNED: continue else: self.done = False try: try: callable, args, kwargs = task rv = callable(*args, **kwargs) if self.rvqueue is not None: self.rvqueue.put(rv) finally: self.queue.task_done() except: traceback.print_exc(file = sys.stderr) self.delayed_exceptions.append(sys.exc_info()) def waitdone(self): while not self.queue.empty() and not self.done: self.queue.join() def attach(self, queue, rvqueue, delayed_exceptions): if self.isAlive(): self.waitdone() oldqueue = self.queue self.queue = queue self.rvqueue = rvqueue self.delayed_exceptions = delayed_exceptions if self.isAlive(): oldqueue.put(self.REASSIGNED) def detach(self): if self.isAlive(): self.waitdone() self.oldqueue = self.queue self.queue = Queue.Queue() self.rvqueue = None self.delayed_exceptions = [] def detach_signal(self): if self.isAlive(): self.oldqueue.put(self.REASSIGNED) del self.oldqueue def quit(self): self.queue.put(self.QUIT) self.join() class ParallelMap(object): def __init__(self, maxthreads = None, maxqueue = None, results = True): global N_PROCS # Compute maximum number of threads allowed by the system if maxthreads is None: if N_PROCS is None: try: f = open("/proc/cpuinfo") try: N_PROCS = sum("processor" in l for l in f) finally: f.close() except: pass maxthreads = N_PROCS if maxthreads is None: maxthreads = 4 self.queue = Queue.Queue(maxqueue or 0) self.delayed_exceptions = [] if results: self.rvqueue = Queue.Queue() else: self.rvqueue = None self.workers = [] # initialize workers for x in xrange(maxthreads): t = None if t is None: t = WorkerThread() t.setDaemon(True) else: t.waitdone() t.attach(self.queue, self.rvqueue, self.delayed_exceptions) self.workers.append(t) def __del__(self): self.destroy() def destroy(self): for worker in self.workers: worker.waitdone() for worker in self.workers: worker.detach() for worker in self.workers: worker.detach_signal() for worker in self.workers: worker.quit() del self.workers[:] def put(self, callable, *args, **kwargs): self.queue.put((callable, args, kwargs)) def put_nowait(self, callable, *args, **kwargs): self.queue.put_nowait((callable, args, kwargs)) def start(self): for thread in self.workers: if not thread.isAlive(): thread.start() def join(self): for thread in self.workers: # That's the sync signal self.queue.put(None) self.queue.join() for thread in self.workers: thread.waitdone() if self.delayed_exceptions: typ,val,loc = self.delayed_exceptions[0] del self.delayed_exceptions[:] raise typ,val,loc self.destroy() def sync(self): self.queue.join() if self.delayed_exceptions: typ,val,loc = self.delayed_exceptions[0] del self.delayed_exceptions[:] raise typ,val,loc def __iter__(self): if self.rvqueue is not None: while True: try: yield self.rvqueue.get_nowait() except Queue.Empty: self.queue.join() try: yield self.rvqueue.get_nowait() except Queue.Empty: raise StopIteration class ParallelRun(ParallelMap): def __run(self, x): fn, args, kwargs = x return fn(*args, **kwargs) def __init__(self, maxthreads = None, maxqueue = None): super(ParallelRun, self).__init__(maxthreads, maxqueue, True) def put(self, what, *args, **kwargs): super(ParallelRun, self).put(self.__run, (what, args, kwargs)) def put_nowait(self, what, *args, **kwargs): super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))