2 # -*- coding: utf-8 -*-
11 class ParallelMap(object):
12 def __init__(self, maxthreads = None, maxqueue = None, results = True):
15 if maxthreads is None:
18 f = open("/proc/cpuinfo")
20 N_PROCS = sum("processor" in l for l in f)
27 if maxthreads is None:
30 self.queue = Queue.Queue(maxqueue or 0)
32 self.workers = [ threading.Thread(target = self.worker)
33 for x in xrange(maxthreads) ]
35 self.delayed_exceptions = []
38 self.rvqueue = Queue.Queue()
42 def put(self, callable, *args, **kwargs):
43 self.queue.put((callable, args, kwargs))
45 def put_nowait(self, callable, *args, **kwargs):
46 self.queue.put_nowait((callable, args, kwargs))
49 for thread in self.workers:
53 for thread in self.workers:
54 # That's the shutdown signal
58 for thread in self.workers:
61 if self.delayed_exceptions:
62 typ,val,loc = self.delayed_exceptions[0]
67 task = self.queue.get()
69 self.queue.task_done()
74 callable, args, kwargs = task
75 rv = callable(*args, **kwargs)
77 if self.rvqueue is not None:
80 self.queue.task_done()
82 traceback.print_exc(file = sys.stderr)
83 self.delayed_exceptions.append(sys.exc_info())
86 if self.rvqueue is not None:
89 yield self.rvqueue.get_nowait()
93 yield self.rvqueue.get_nowait()
98 class ParallelFilter(ParallelMap):
102 def __filter(self, x):
103 if self.filter_condition(x):
106 return self._FILTERED
108 def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
109 super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
110 self.filter_condition = filter_condition
113 super(ParallelFilter, self).put(self.__filter, what)
115 def put_nowait(self, what):
116 super(ParallelFilter, self).put_nowait(self.__filter, what)
119 for rv in super(ParallelFilter, self).__iter__():
120 if rv is not self._FILTERED:
123 class ParallelRun(ParallelMap):
126 return fn(*args, **kwargs)
128 def __init__(self, maxthreads = None, maxqueue = None):
129 super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
131 def put(self, what, *args, **kwargs):
132 super(ParallelRun, self).put(self.__run, (what, args, kwargs))
134 def put_nowait(self, what, *args, **kwargs):
135 super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
138 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
139 mapper = ParallelMap(
140 maxthreads = maxthreads,
144 for elem in iterable:
148 def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
149 filtrer = ParallelFilter(
151 maxthreads = maxthreads,
154 for elem in iterable: