2 # -*- coding: utf-8 -*-
11 class ParallelMap(object):
12 def __init__(self, maxthreads = None, maxqueue = None, results = True):
15 if maxthreads is None:
18 f = open("/proc/cpuinfo")
20 N_PROCS = sum("processor" in l for l in f)
27 if maxthreads is None:
30 self.queue = Queue.Queue(maxqueue or 0)
32 self.workers = [ threading.Thread(target = self.worker)
33 for x in xrange(maxthreads) ]
36 self.rvqueue = Queue.Queue()
40 def put(self, callable, *args, **kwargs):
41 self.queue.put((callable, args, kwargs))
43 def put_nowait(self, callable, *args, **kwargs):
44 self.queue.put_nowait((callable, args, kwargs))
47 for thread in self.workers:
51 for thread in self.workers:
52 # That's the shutdown signal
56 for thread in self.workers:
61 task = self.queue.get()
63 self.queue.task_done()
68 callable, args, kwargs = task
69 rv = callable(*args, **kwargs)
71 if self.rvqueue is not None:
74 self.queue.task_done()
76 traceback.print_exc(file = sys.stderr)
79 if self.rvqueue is not None:
82 yield self.rvqueue.get_nowait()
86 yield self.rvqueue.get_nowait()
91 class ParallelFilter(ParallelMap):
95 def __filter(self, x):
96 if self.filter_condition(x):
101 def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
102 super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
103 self.filter_condition = filter_condition
106 super(ParallelFilter, self).put(self.__filter, what)
108 def put_nowait(self, what):
109 super(ParallelFilter, self).put_nowait(self.__filter, what)
112 for rv in super(ParallelFilter, self).__iter__():
113 if rv is not self._FILTERED:
117 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
118 mapper = ParallelMap(
119 maxthreads = maxthreads,
123 for elem in iterable:
127 def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
128 filtrer = ParallelFilter(
130 maxthreads = maxthreads,
133 for elem in iterable: