12 class WorkerThread(threading.Thread):
20 task = self.queue.get()
23 self.queue.task_done()
25 elif task is self.QUIT:
27 self.queue.task_done()
29 elif task is self.REASSIGNED:
36 callable, args, kwargs = task
37 rv = callable(*args, **kwargs)
39 if self.rvqueue is not None:
42 self.queue.task_done()
44 traceback.print_exc(file = sys.stderr)
45 self.delayed_exceptions.append(sys.exc_info())
48 while not self.queue.empty() and not self.done:
51 def attach(self, queue, rvqueue, delayed_exceptions):
56 self.rvqueue = rvqueue
57 self.delayed_exceptions = delayed_exceptions
59 oldqueue.put(self.REASSIGNED)
64 self.oldqueue = self.queue
65 self.queue = Queue.Queue()
67 self.delayed_exceptions = []
69 def detach_signal(self):
71 self.oldqueue.put(self.REASSIGNED)
75 self.queue.put(self.QUIT)
78 class ParallelMap(object):
79 def __init__(self, maxthreads = None, maxqueue = None, results = True):
84 if maxthreads is None:
87 f = open("/proc/cpuinfo")
89 N_PROCS = sum("processor" in l for l in f)
96 if maxthreads is None:
99 self.queue = Queue.Queue(maxqueue or 0)
101 self.delayed_exceptions = []
104 self.rvqueue = Queue.Queue()
109 if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
111 THREADCACHEPID = os.getpid()
114 for x in xrange(maxthreads):
118 t = THREADCACHE.pop()
126 t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
127 self.workers.append(t)
135 global THREADCACHEPID
136 if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
138 THREADCACHEPID = os.getpid()
140 for worker in self.workers:
142 for worker in self.workers:
144 for worker in self.workers:
145 worker.detach_signal()
146 THREADCACHE.extend(self.workers)
149 def put(self, callable, *args, **kwargs):
150 self.queue.put((callable, args, kwargs))
152 def put_nowait(self, callable, *args, **kwargs):
153 self.queue.put_nowait((callable, args, kwargs))
156 for thread in self.workers:
157 if not thread.isAlive():
161 for thread in self.workers:
162 # That's the sync signal
166 for thread in self.workers:
169 if self.delayed_exceptions:
170 typ,val,loc = self.delayed_exceptions[0]
171 del self.delayed_exceptions[:]
178 if self.delayed_exceptions:
179 typ,val,loc = self.delayed_exceptions[0]
180 del self.delayed_exceptions[:]
184 if self.rvqueue is not None:
187 yield self.rvqueue.get_nowait()
191 yield self.rvqueue.get_nowait()
196 class ParallelFilter(ParallelMap):
200 def __filter(self, x):
201 if self.filter_condition(x):
204 return self._FILTERED
206 def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
207 super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
208 self.filter_condition = filter_condition
211 super(ParallelFilter, self).put(self.__filter, what)
213 def put_nowait(self, what):
214 super(ParallelFilter, self).put_nowait(self.__filter, what)
217 for rv in super(ParallelFilter, self).__iter__():
218 if rv is not self._FILTERED:
221 class ParallelRun(ParallelMap):
224 return fn(*args, **kwargs)
226 def __init__(self, maxthreads = None, maxqueue = None):
227 super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
229 def put(self, what, *args, **kwargs):
230 super(ParallelRun, self).put(self.__run, (what, args, kwargs))
232 def put_nowait(self, what, *args, **kwargs):
233 super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
236 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
237 mapper = ParallelMap(
238 maxthreads = maxthreads,
242 for elem in iterable:
248 def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
249 filtrer = ParallelFilter(
251 maxthreads = maxthreads,
254 for elem in iterable: