N_PROCS = None
-#THREADCACHE = []
-#THREADCACHEPID = None
-
class WorkerThread(threading.Thread):
class QUIT:
pass
class ParallelMap(object):
def __init__(self, maxthreads = None, maxqueue = None, results = True):
global N_PROCS
- #global THREADCACHE
- #global THREADCACHEPID
-
+
+ # Compute maximum number of threads allowed by the system
if maxthreads is None:
if N_PROCS is None:
try:
self.rvqueue = Queue.Queue()
else:
self.rvqueue = None
-
- # Check threadcache
- #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
- # del THREADCACHE[:]
- # THREADCACHEPID = os.getpid()
self.workers = []
+
+ # initialize workers
for x in xrange(maxthreads):
t = None
- #if THREADCACHE:
- # try:
- # t = THREADCACHE.pop()
- # except:
- # pass
if t is None:
t = WorkerThread()
t.setDaemon(True)
else:
t.waitdone()
+
t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
self.workers.append(t)
self.destroy()
def destroy(self):
- # Check threadcache
- #global THREADCACHE
- #global THREADCACHEPID
- #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
- # del THREADCACHE[:]
- # THREADCACHEPID = os.getpid()
-
for worker in self.workers:
worker.waitdone()
for worker in self.workers:
for worker in self.workers:
worker.quit()
- # TO FIX:
- # THREADCACHE.extend(self.workers)
-
del self.workers[:]
def put(self, callable, *args, **kwargs):
except Queue.Empty:
raise StopIteration
-
-class ParallelFilter(ParallelMap):
- class _FILTERED:
- pass
-
- def __filter(self, x):
- if self.filter_condition(x):
- return x
- else:
- return self._FILTERED
-
- def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
- super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
- self.filter_condition = filter_condition
-
- def put(self, what):
- super(ParallelFilter, self).put(self.__filter, what)
-
- def put_nowait(self, what):
- super(ParallelFilter, self).put_nowait(self.__filter, what)
-
- def __iter__(self):
- for rv in super(ParallelFilter, self).__iter__():
- if rv is not self._FILTERED:
- yield rv
-
class ParallelRun(ParallelMap):
def __run(self, x):
fn, args, kwargs = x
super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
-def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
- mapper = ParallelMap(
- maxthreads = maxthreads,
- maxqueue = maxqueue,
- results = True)
- mapper.start()
- for elem in iterable:
- mapper.put(elem)
- rv = list(mapper)
- mapper.join()
- return rv
-
-def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
- filtrer = ParallelFilter(
- condition,
- maxthreads = maxthreads,
- maxqueue = maxqueue)
- filtrer.start()
- for elem in iterable:
- filtrer.put(elem)
- rv = list(filtrer)
- filtrer.join()
- return rv
-