-
-class ParallelFilter(ParallelMap):
- class _FILTERED:
- pass
-
- def __filter(self, x):
- if self.filter_condition(x):
- return x
- else:
- return self._FILTERED
-
- def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
- super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
- self.filter_condition = filter_condition
-
- def put(self, what):
- super(ParallelFilter, self).put(self.__filter, what)
-
- def put_nowait(self, what):
- super(ParallelFilter, self).put_nowait(self.__filter, what)
-
- def __iter__(self):
- for rv in super(ParallelFilter, self).__iter__():
- if rv is not self._FILTERED:
- yield rv
-
-class ParallelRun(ParallelMap):
- def __run(self, x):
- fn, args, kwargs = x
- return fn(*args, **kwargs)
-
- def __init__(self, maxthreads = None, maxqueue = None):
- super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
-
- def put(self, what, *args, **kwargs):
- super(ParallelRun, self).put(self.__run, (what, args, kwargs))
-
- def put_nowait(self, what, *args, **kwargs):
- super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
-
-
-def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
- mapper = ParallelMap(
- maxthreads = maxthreads,
- maxqueue = maxqueue,
- results = True)
- mapper.start()
- for elem in iterable:
- mapper.put(elem)
- rv = list(mapper)
- mapper.join()
- return rv
-
-def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
- filtrer = ParallelFilter(
- condition,
- maxthreads = maxthreads,
- maxqueue = maxqueue)
- filtrer.start()
- for elem in iterable:
- filtrer.put(elem)
- rv = list(filtrer)
- filtrer.join()
- return rv
-