2 # -*- coding: utf-8 -*-
15 class WorkerThread(threading.Thread):
23 task = self.queue.get()
26 self.queue.task_done()
28 elif task is self.QUIT:
30 self.queue.task_done()
32 elif task is self.REASSIGNED:
39 callable, args, kwargs = task
40 rv = callable(*args, **kwargs)
42 if self.rvqueue is not None:
45 self.queue.task_done()
47 traceback.print_exc(file = sys.stderr)
48 self.delayed_exceptions.append(sys.exc_info())
51 while not self.queue.empty() and not self.done:
54 def attach(self, queue, rvqueue, delayed_exceptions):
59 self.rvqueue = rvqueue
60 self.delayed_exceptions = delayed_exceptions
62 oldqueue.put(self.REASSIGNED)
67 self.oldqueue = self.queue
68 self.queue = Queue.Queue()
70 self.delayed_exceptions = []
72 def detach_signal(self):
74 self.oldqueue.put(self.REASSIGNED)
78 self.queue.put(self.QUIT)
81 class ParallelMap(object):
82 def __init__(self, maxthreads = None, maxqueue = None, results = True):
87 if maxthreads is None:
90 f = open("/proc/cpuinfo")
92 N_PROCS = sum("processor" in l for l in f)
99 if maxthreads is None:
102 self.queue = Queue.Queue(maxqueue or 0)
104 self.delayed_exceptions = []
107 self.rvqueue = Queue.Queue()
112 if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
114 THREADCACHEPID = os.getpid()
117 for x in xrange(maxthreads):
121 t = THREADCACHE.pop()
129 t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
130 self.workers.append(t)
138 global THREADCACHEPID
139 if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
141 THREADCACHEPID = os.getpid()
143 for worker in self.workers:
145 for worker in self.workers:
147 for worker in self.workers:
148 worker.detach_signal()
149 THREADCACHE.extend(self.workers)
152 def put(self, callable, *args, **kwargs):
153 self.queue.put((callable, args, kwargs))
155 def put_nowait(self, callable, *args, **kwargs):
156 self.queue.put_nowait((callable, args, kwargs))
159 for thread in self.workers:
160 if not thread.isAlive():
164 for thread in self.workers:
165 # That's the sync signal
169 for thread in self.workers:
172 if self.delayed_exceptions:
173 typ,val,loc = self.delayed_exceptions[0]
174 del self.delayed_exceptions[:]
181 if self.delayed_exceptions:
182 typ,val,loc = self.delayed_exceptions[0]
183 del self.delayed_exceptions[:]
187 if self.rvqueue is not None:
190 yield self.rvqueue.get_nowait()
194 yield self.rvqueue.get_nowait()
199 class ParallelFilter(ParallelMap):
203 def __filter(self, x):
204 if self.filter_condition(x):
207 return self._FILTERED
209 def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
210 super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
211 self.filter_condition = filter_condition
214 super(ParallelFilter, self).put(self.__filter, what)
216 def put_nowait(self, what):
217 super(ParallelFilter, self).put_nowait(self.__filter, what)
220 for rv in super(ParallelFilter, self).__iter__():
221 if rv is not self._FILTERED:
224 class ParallelRun(ParallelMap):
227 return fn(*args, **kwargs)
229 def __init__(self, maxthreads = None, maxqueue = None):
230 super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
232 def put(self, what, *args, **kwargs):
233 super(ParallelRun, self).put(self.__run, (what, args, kwargs))
235 def put_nowait(self, what, *args, **kwargs):
236 super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
239 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
240 mapper = ParallelMap(
241 maxthreads = maxthreads,
245 for elem in iterable:
251 def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
252 filtrer = ParallelFilter(
254 maxthreads = maxthreads,
257 for elem in iterable: