From fc94cf7c20a165cd9d053324b201c622fd6baa37 Mon Sep 17 00:00:00 2001 From: Alina Quereilhac Date: Wed, 24 Jul 2013 14:04:30 -0700 Subject: [PATCH] Fixing 'error: can't start new thread' bug in ParallelRun --- src/nepi/execution/ec.py | 7 +++---- src/nepi/util/parallel.py | 43 +++++++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index e1d6cb01..e81854ba 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -106,7 +106,8 @@ class ExperimentController(object): def __init__(self, exp_id = None): super(ExperimentController, self).__init__() - # root directory to store files + # Logging + self._logger = logging.getLogger("ExperimentController") # Run identifier. It identifies a concrete instance (run) of an experiment. # Since a same experiment (same configuration) can be run many times, @@ -143,9 +144,6 @@ class ExperimentController(object): # EC state self._state = ECState.RUNNING - # Logging - self._logger = logging.getLogger("ExperimentController") - @property def logger(self): """ Return the logger of the Experiment Controller @@ -791,6 +789,7 @@ class ExperimentController(object): finally: self.logger.debug("Exiting the task processing loop ... ") runner.sync() + runner.destroy() def _execute(self, task): """ Executes a single task. diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index fffdea5f..3ca20cd5 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -18,6 +18,8 @@ # Author: Claudio Freire # +# A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely! + import threading import Queue import traceback @@ -26,8 +28,8 @@ import os N_PROCS = None -THREADCACHE = [] -THREADCACHEPID = None +#THREADCACHE = [] +#THREADCACHEPID = None class WorkerThread(threading.Thread): class QUIT: @@ -98,8 +100,8 @@ class WorkerThread(threading.Thread): class ParallelMap(object): def __init__(self, maxthreads = None, maxqueue = None, results = True): global N_PROCS - global THREADCACHE - global THREADCACHEPID + #global THREADCACHE + #global THREADCACHEPID if maxthreads is None: if N_PROCS is None: @@ -126,18 +128,18 @@ class ParallelMap(object): self.rvqueue = None # Check threadcache - if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - del THREADCACHE[:] - THREADCACHEPID = os.getpid() + #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): + # del THREADCACHE[:] + # THREADCACHEPID = os.getpid() self.workers = [] for x in xrange(maxthreads): t = None - if THREADCACHE: - try: - t = THREADCACHE.pop() - except: - pass + #if THREADCACHE: + # try: + # t = THREADCACHE.pop() + # except: + # pass if t is None: t = WorkerThread() t.setDaemon(True) @@ -151,11 +153,11 @@ class ParallelMap(object): def destroy(self): # Check threadcache - global THREADCACHE - global THREADCACHEPID - if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - del THREADCACHE[:] - THREADCACHEPID = os.getpid() + #global THREADCACHE + #global THREADCACHEPID + #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): + # del THREADCACHE[:] + # THREADCACHEPID = os.getpid() for worker in self.workers: worker.waitdone() @@ -163,7 +165,12 @@ class ParallelMap(object): worker.detach() for worker in self.workers: worker.detach_signal() - THREADCACHE.extend(self.workers) + for worker in self.workers: + worker.quit() + + # TO FIX: + # THREADCACHE.extend(self.workers) + del self.workers[:] def put(self, callable, *args, **kwargs): -- 2.47.0