1 # -*- coding: utf-8 -*-
14 class WorkerThread(threading.Thread):
22 task = self.queue.get()
25 self.queue.task_done()
27 elif task is self.QUIT:
29 self.queue.task_done()
31 elif task is self.REASSIGNED:
38 callable, args, kwargs = task
39 rv = callable(*args, **kwargs)
41 if self.rvqueue is not None:
44 self.queue.task_done()
46 traceback.print_exc(file = sys.stderr)
47 self.delayed_exceptions.append(sys.exc_info())
50 while not self.queue.empty() and not self.done:
53 def attach(self, queue, rvqueue, delayed_exceptions):
58 self.rvqueue = rvqueue
59 self.delayed_exceptions = delayed_exceptions
61 oldqueue.put(self.REASSIGNED)
66 self.oldqueue = self.queue
67 self.queue = Queue.Queue()
69 self.delayed_exceptions = []
71 def detach_signal(self):
73 self.oldqueue.put(self.REASSIGNED)
77 self.queue.put(self.QUIT)
80 class ParallelMap(object):
81 def __init__(self, maxthreads = None, maxqueue = None, results = True):
86 if maxthreads is None:
89 f = open("/proc/cpuinfo")
91 N_PROCS = sum("processor" in l for l in f)
98 if maxthreads is None:
101 self.queue = Queue.Queue(maxqueue or 0)
103 self.delayed_exceptions = []
106 self.rvqueue = Queue.Queue()
111 if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
113 THREADCACHEPID = os.getpid()
116 for x in xrange(maxthreads):
120 t = THREADCACHE.pop()
128 t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
129 self.workers.append(t)
137 global THREADCACHEPID
138 if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
140 THREADCACHEPID = os.getpid()
142 for worker in self.workers:
144 for worker in self.workers:
146 for worker in self.workers:
147 worker.detach_signal()
148 THREADCACHE.extend(self.workers)
151 def put(self, callable, *args, **kwargs):
152 self.queue.put((callable, args, kwargs))
154 def put_nowait(self, callable, *args, **kwargs):
155 self.queue.put_nowait((callable, args, kwargs))
158 for thread in self.workers:
159 if not thread.isAlive():
163 for thread in self.workers:
164 # That's the sync signal
168 for thread in self.workers:
171 if self.delayed_exceptions:
172 typ,val,loc = self.delayed_exceptions[0]
173 del self.delayed_exceptions[:]
180 if self.delayed_exceptions:
181 typ,val,loc = self.delayed_exceptions[0]
182 del self.delayed_exceptions[:]
186 if self.rvqueue is not None:
189 yield self.rvqueue.get_nowait()
193 yield self.rvqueue.get_nowait()
198 class ParallelFilter(ParallelMap):
202 def __filter(self, x):
203 if self.filter_condition(x):
206 return self._FILTERED
208 def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
209 super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
210 self.filter_condition = filter_condition
213 super(ParallelFilter, self).put(self.__filter, what)
215 def put_nowait(self, what):
216 super(ParallelFilter, self).put_nowait(self.__filter, what)
219 for rv in super(ParallelFilter, self).__iter__():
220 if rv is not self._FILTERED:
223 class ParallelRun(ParallelMap):
226 return fn(*args, **kwargs)
228 def __init__(self, maxthreads = None, maxqueue = None):
229 super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
231 def put(self, what, *args, **kwargs):
232 super(ParallelRun, self).put(self.__run, (what, args, kwargs))
234 def put_nowait(self, what, *args, **kwargs):
235 super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
238 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
239 mapper = ParallelMap(
240 maxthreads = maxthreads,
244 for elem in iterable:
250 def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
251 filtrer = ParallelFilter(
253 maxthreads = maxthreads,
256 for elem in iterable: