ec_shutdown
[nepi.git] / src / nepi / util / parallel.py
index 3ca20cd..6868c4a 100644 (file)
@@ -28,9 +28,6 @@ import os
 
 N_PROCS = None
 
-#THREADCACHE = []
-#THREADCACHEPID = None
-
 class WorkerThread(threading.Thread):
     class QUIT:
         pass
@@ -100,9 +97,8 @@ class WorkerThread(threading.Thread):
 class ParallelMap(object):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
         global N_PROCS
-        #global THREADCACHE
-        #global THREADCACHEPID
-        
+       
+        # Compute maximum number of threads allowed by the system
         if maxthreads is None:
             if N_PROCS is None:
                 try:
@@ -126,25 +122,18 @@ class ParallelMap(object):
             self.rvqueue = Queue.Queue()
         else:
             self.rvqueue = None
-        
-        # Check threadcache
-        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-        #    del THREADCACHE[:]
-        #    THREADCACHEPID = os.getpid()
     
         self.workers = []
+
+        # initialize workers
         for x in xrange(maxthreads):
             t = None
-            #if THREADCACHE:
-            #    try:
-            #        t = THREADCACHE.pop()
-            #    except:
-            #        pass
             if t is None:
                 t = WorkerThread()
                 t.setDaemon(True)
             else:
                 t.waitdone()
+
             t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
             self.workers.append(t)
     
@@ -152,13 +141,6 @@ class ParallelMap(object):
         self.destroy()
     
     def destroy(self):
-        # Check threadcache
-        #global THREADCACHE
-        #global THREADCACHEPID
-        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-        #    del THREADCACHE[:]
-        #    THREADCACHEPID = os.getpid()
-
         for worker in self.workers:
             worker.waitdone()
         for worker in self.workers:
@@ -168,9 +150,6 @@ class ParallelMap(object):
         for worker in self.workers:
             worker.quit()
 
-        # TO FIX:
-        # THREADCACHE.extend(self.workers)
-
         del self.workers[:]
         
     def put(self, callable, *args, **kwargs):
@@ -219,32 +198,6 @@ class ParallelMap(object):
                     except Queue.Empty:
                         raise StopIteration
             
-    
-class ParallelFilter(ParallelMap):
-    class _FILTERED:
-        pass
-    
-    def __filter(self, x):
-        if self.filter_condition(x):
-            return x
-        else:
-            return self._FILTERED
-    
-    def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
-        super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
-        self.filter_condition = filter_condition
-
-    def put(self, what):
-        super(ParallelFilter, self).put(self.__filter, what)
-    
-    def put_nowait(self, what):
-        super(ParallelFilter, self).put_nowait(self.__filter, what)
-        
-    def __iter__(self):
-        for rv in super(ParallelFilter, self).__iter__():
-            if rv is not self._FILTERED:
-                yield rv
-
 class ParallelRun(ParallelMap):
     def __run(self, x):
         fn, args, kwargs = x
@@ -260,27 +213,3 @@ class ParallelRun(ParallelMap):
         super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
 
 
-def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
-    mapper = ParallelMap(
-        maxthreads = maxthreads,
-        maxqueue = maxqueue,
-        results = True)
-    mapper.start()
-    for elem in iterable:
-        mapper.put(elem)
-    rv = list(mapper)
-    mapper.join()
-    return rv
-
-def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
-    filtrer = ParallelFilter(
-        condition,
-        maxthreads = maxthreads,
-        maxqueue = maxqueue)
-    filtrer.start()
-    for elem in iterable:
-        filtrer.put(elem)
-    rv = list(filtrer)
-    filtrer.join()
-    return rv
-