2 # -*- coding: utf-8 -*-
11 class ParallelMap(object):
12 def __init__(self, maxthreads = None, maxqueue = None, results = True):
15 if maxthreads is None:
18 f = open("/proc/cpuinfo")
20 N_PROCS = sum("processor" in l for l in f)
27 if maxthreads is None:
30 self.queue = Queue.Queue(maxqueue or 0)
32 self.workers = [ threading.Thread(target = self.worker)
33 for x in xrange(maxthreads) ]
36 self.rvqueue = Queue.Queue()
40 def put(self, callable, *args, **kwargs):
41 self.queue.put((callable, args, kwargs))
43 def put_nowait(self, callable, *args, **kwargs):
44 self.queue.put_nowait((callable, args, kwargs))
47 for thread in self.workers:
51 for thread in self.workers:
52 # That's the shutdown signal
56 for thread in self.workers:
61 task = self.queue.get()
63 self.queue.task_done()
68 callable, args, kwargs = task
69 rv = callable(*args, **kwargs)
71 if self.rvqueue is not None:
74 self.queue.task_done()
76 traceback.print_exc(file = sys.stderr)
79 if self.rvqueue is not None:
82 yield self.rvqueue.get_nowait()
86 yield self.rvqueue.get_nowait()