X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Futil%2Fparallel.py;h=6868c4aab0086929e95c23600e8368ee7a6628f8;hb=386498468dfb01f71b0efbbe0c208819f18f82ec;hp=3ca20cd5326abbae3fd0bde0ffe810e3a767942c;hpb=4b0e922489532434f0968647886021542b77cece;p=nepi.git diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index 3ca20cd5..6868c4aa 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -28,9 +28,6 @@ import os N_PROCS = None -#THREADCACHE = [] -#THREADCACHEPID = None - class WorkerThread(threading.Thread): class QUIT: pass @@ -100,9 +97,8 @@ class WorkerThread(threading.Thread): class ParallelMap(object): def __init__(self, maxthreads = None, maxqueue = None, results = True): global N_PROCS - #global THREADCACHE - #global THREADCACHEPID - + + # Compute maximum number of threads allowed by the system if maxthreads is None: if N_PROCS is None: try: @@ -126,25 +122,18 @@ class ParallelMap(object): self.rvqueue = Queue.Queue() else: self.rvqueue = None - - # Check threadcache - #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - # del THREADCACHE[:] - # THREADCACHEPID = os.getpid() self.workers = [] + + # initialize workers for x in xrange(maxthreads): t = None - #if THREADCACHE: - # try: - # t = THREADCACHE.pop() - # except: - # pass if t is None: t = WorkerThread() t.setDaemon(True) else: t.waitdone() + t.attach(self.queue, self.rvqueue, self.delayed_exceptions) self.workers.append(t) @@ -152,13 +141,6 @@ class ParallelMap(object): self.destroy() def destroy(self): - # Check threadcache - #global THREADCACHE - #global THREADCACHEPID - #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - # del THREADCACHE[:] - # THREADCACHEPID = os.getpid() - for worker in self.workers: worker.waitdone() for worker in self.workers: @@ -168,9 +150,6 @@ class ParallelMap(object): for worker in self.workers: worker.quit() - # TO FIX: - # THREADCACHE.extend(self.workers) - del self.workers[:] def put(self, callable, *args, **kwargs): @@ -219,32 +198,6 @@ class ParallelMap(object): except Queue.Empty: raise StopIteration - -class ParallelFilter(ParallelMap): - class _FILTERED: - pass - - def __filter(self, x): - if self.filter_condition(x): - return x - else: - return self._FILTERED - - def __init__(self, filter_condition, maxthreads = None, maxqueue = None): - super(ParallelFilter, self).__init__(maxthreads, maxqueue, True) - self.filter_condition = filter_condition - - def put(self, what): - super(ParallelFilter, self).put(self.__filter, what) - - def put_nowait(self, what): - super(ParallelFilter, self).put_nowait(self.__filter, what) - - def __iter__(self): - for rv in super(ParallelFilter, self).__iter__(): - if rv is not self._FILTERED: - yield rv - class ParallelRun(ParallelMap): def __run(self, x): fn, args, kwargs = x @@ -260,27 +213,3 @@ class ParallelRun(ParallelMap): super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs)) -def pmap(mapping, iterable, maxthreads = None, maxqueue = None): - mapper = ParallelMap( - maxthreads = maxthreads, - maxqueue = maxqueue, - results = True) - mapper.start() - for elem in iterable: - mapper.put(elem) - rv = list(mapper) - mapper.join() - return rv - -def pfilter(condition, iterable, maxthreads = None, maxqueue = None): - filtrer = ParallelFilter( - condition, - maxthreads = maxthreads, - maxqueue = maxqueue) - filtrer.start() - for elem in iterable: - filtrer.put(elem) - rv = list(filtrer) - filtrer.join() - return rv -