From: Alina Quereilhac Date: Wed, 24 Jul 2013 21:04:30 +0000 (-0700) Subject: Fixing 'error: can't start new thread' bug in ParallelRun X-Git-Tag: nepi-3.0.0~59 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=fc94cf7c20a165cd9d053324b201c622fd6baa37;p=nepi.git Fixing 'error: can't start new thread' bug in ParallelRun --- diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index e1d6cb01..e81854ba 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -106,7 +106,8 @@ class ExperimentController(object): def __init__(self, exp_id = None): super(ExperimentController, self).__init__() - # root directory to store files + # Logging + self._logger = logging.getLogger("ExperimentController") # Run identifier. It identifies a concrete instance (run) of an experiment. # Since a same experiment (same configuration) can be run many times, @@ -143,9 +144,6 @@ class ExperimentController(object): # EC state self._state = ECState.RUNNING - # Logging - self._logger = logging.getLogger("ExperimentController") - @property def logger(self): """ Return the logger of the Experiment Controller @@ -791,6 +789,7 @@ class ExperimentController(object): finally: self.logger.debug("Exiting the task processing loop ... ") runner.sync() + runner.destroy() def _execute(self, task): """ Executes a single task. diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index fffdea5f..3ca20cd5 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -18,6 +18,8 @@ # Author: Claudio Freire # +# A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely! + import threading import Queue import traceback @@ -26,8 +28,8 @@ import os N_PROCS = None -THREADCACHE = [] -THREADCACHEPID = None +#THREADCACHE = [] +#THREADCACHEPID = None class WorkerThread(threading.Thread): class QUIT: @@ -98,8 +100,8 @@ class WorkerThread(threading.Thread): class ParallelMap(object): def __init__(self, maxthreads = None, maxqueue = None, results = True): global N_PROCS - global THREADCACHE - global THREADCACHEPID + #global THREADCACHE + #global THREADCACHEPID if maxthreads is None: if N_PROCS is None: @@ -126,18 +128,18 @@ class ParallelMap(object): self.rvqueue = None # Check threadcache - if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - del THREADCACHE[:] - THREADCACHEPID = os.getpid() + #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): + # del THREADCACHE[:] + # THREADCACHEPID = os.getpid() self.workers = [] for x in xrange(maxthreads): t = None - if THREADCACHE: - try: - t = THREADCACHE.pop() - except: - pass + #if THREADCACHE: + # try: + # t = THREADCACHE.pop() + # except: + # pass if t is None: t = WorkerThread() t.setDaemon(True) @@ -151,11 +153,11 @@ class ParallelMap(object): def destroy(self): # Check threadcache - global THREADCACHE - global THREADCACHEPID - if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): - del THREADCACHE[:] - THREADCACHEPID = os.getpid() + #global THREADCACHE + #global THREADCACHEPID + #if THREADCACHEPID is None or THREADCACHEPID != os.getpid(): + # del THREADCACHE[:] + # THREADCACHEPID = os.getpid() for worker in self.workers: worker.waitdone() @@ -163,7 +165,12 @@ class ParallelMap(object): worker.detach() for worker in self.workers: worker.detach_signal() - THREADCACHE.extend(self.workers) + for worker in self.workers: + worker.quit() + + # TO FIX: + # THREADCACHE.extend(self.workers) + del self.workers[:] def put(self, callable, *args, **kwargs):