Fixing 'error: can't start new thread' bug in ParallelRun
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 24 Jul 2013 21:04:30 +0000 (14:04 -0700)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Wed, 24 Jul 2013 21:04:30 +0000 (14:04 -0700)
src/nepi/execution/ec.py
src/nepi/util/parallel.py

index e1d6cb0..e81854b 100644 (file)
@@ -106,7 +106,8 @@ class ExperimentController(object):
 
     def __init__(self, exp_id = None): 
         super(ExperimentController, self).__init__()
-        # root directory to store files
+        # Logging
+        self._logger = logging.getLogger("ExperimentController")
 
         # Run identifier. It identifies a concrete instance (run) of an experiment.
         # Since a same experiment (same configuration) can be run many times,
@@ -143,9 +144,6 @@ class ExperimentController(object):
         # EC state
         self._state = ECState.RUNNING
 
-        # Logging
-        self._logger = logging.getLogger("ExperimentController")
-
     @property
     def logger(self):
         """ Return the logger of the Experiment Controller
@@ -791,6 +789,7 @@ class ExperimentController(object):
         finally:   
             self.logger.debug("Exiting the task processing loop ... ")
             runner.sync()
+            runner.destroy()
 
     def _execute(self, task):
         """ Executes a single task. 
index fffdea5..3ca20cd 100644 (file)
@@ -18,6 +18,8 @@
 # Author: Claudio Freire <claudio-daniel.freire@inria.fr>
 #
 
+# A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely!
+
 import threading
 import Queue
 import traceback
@@ -26,8 +28,8 @@ import os
 
 N_PROCS = None
 
-THREADCACHE = []
-THREADCACHEPID = None
+#THREADCACHE = []
+#THREADCACHEPID = None
 
 class WorkerThread(threading.Thread):
     class QUIT:
@@ -98,8 +100,8 @@ class WorkerThread(threading.Thread):
 class ParallelMap(object):
     def __init__(self, maxthreads = None, maxqueue = None, results = True):
         global N_PROCS
-        global THREADCACHE
-        global THREADCACHEPID
+        #global THREADCACHE
+        #global THREADCACHEPID
         
         if maxthreads is None:
             if N_PROCS is None:
@@ -126,18 +128,18 @@ class ParallelMap(object):
             self.rvqueue = None
         
         # Check threadcache
-        if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-            del THREADCACHE[:]
-            THREADCACHEPID = os.getpid()
+        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
+        #    del THREADCACHE[:]
+        #    THREADCACHEPID = os.getpid()
     
         self.workers = []
         for x in xrange(maxthreads):
             t = None
-            if THREADCACHE:
-                try:
-                    t = THREADCACHE.pop()
-                except:
-                    pass
+            #if THREADCACHE:
+            #    try:
+            #        t = THREADCACHE.pop()
+            #    except:
+            #        pass
             if t is None:
                 t = WorkerThread()
                 t.setDaemon(True)
@@ -151,11 +153,11 @@ class ParallelMap(object):
     
     def destroy(self):
         # Check threadcache
-        global THREADCACHE
-        global THREADCACHEPID
-        if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
-            del THREADCACHE[:]
-            THREADCACHEPID = os.getpid()
+        #global THREADCACHE
+        #global THREADCACHEPID
+        #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
+        #    del THREADCACHE[:]
+        #    THREADCACHEPID = os.getpid()
 
         for worker in self.workers:
             worker.waitdone()
@@ -163,7 +165,12 @@ class ParallelMap(object):
             worker.detach()
         for worker in self.workers:
             worker.detach_signal()
-        THREADCACHE.extend(self.workers)
+        for worker in self.workers:
+            worker.quit()
+
+        # TO FIX:
+        # THREADCACHE.extend(self.workers)
+
         del self.workers[:]
         
     def put(self, callable, *args, **kwargs):