2 # -*- coding: utf-8 -*-
11 class ParallelMap(object):
12 def __init__(self, maxthreads = None, maxqueue = None, results = True):
15 if maxthreads is None:
18 f = open("/proc/cpuinfo")
20 N_PROCS = sum("processor" in l for l in f)
27 if maxthreads is None:
30 self.queue = Queue.Queue(maxqueue or 0)
32 self.workers = [ threading.Thread(target = self.worker)
33 for x in xrange(maxthreads) ]
35 self.delayed_exceptions = []
38 self.rvqueue = Queue.Queue()
42 def put(self, callable, *args, **kwargs):
43 self.queue.put((callable, args, kwargs))
45 def put_nowait(self, callable, *args, **kwargs):
46 self.queue.put_nowait((callable, args, kwargs))
49 for thread in self.workers:
53 for thread in self.workers:
54 # That's the shutdown signal
58 for thread in self.workers:
61 if self.delayed_exceptions:
62 typ,val,loc = self.delayed_exceptions[0]
70 task = self.queue.get()
72 self.queue.task_done()
77 callable, args, kwargs = task
78 rv = callable(*args, **kwargs)
80 if self.rvqueue is not None:
83 self.queue.task_done()
85 traceback.print_exc(file = sys.stderr)
86 self.delayed_exceptions.append(sys.exc_info())
89 if self.rvqueue is not None:
92 yield self.rvqueue.get_nowait()
96 yield self.rvqueue.get_nowait()
101 class ParallelFilter(ParallelMap):
105 def __filter(self, x):
106 if self.filter_condition(x):
109 return self._FILTERED
111 def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
112 super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
113 self.filter_condition = filter_condition
116 super(ParallelFilter, self).put(self.__filter, what)
118 def put_nowait(self, what):
119 super(ParallelFilter, self).put_nowait(self.__filter, what)
122 for rv in super(ParallelFilter, self).__iter__():
123 if rv is not self._FILTERED:
126 class ParallelRun(ParallelMap):
129 return fn(*args, **kwargs)
131 def __init__(self, maxthreads = None, maxqueue = None):
132 super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
134 def put(self, what, *args, **kwargs):
135 super(ParallelRun, self).put(self.__run, (what, args, kwargs))
137 def put_nowait(self, what, *args, **kwargs):
138 super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
141 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
142 mapper = ParallelMap(
143 maxthreads = maxthreads,
147 for elem in iterable:
151 def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
152 filtrer = ParallelFilter(
154 maxthreads = maxthreads,
157 for elem in iterable: