2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Claudio Freire <claudio-daniel.freire@inria.fr>
18 # Alina Quereilhac <alina.quereilhac@inria.fr>
29 class WorkerThread(threading.Thread):
35 task = self.queue.get()
38 self.queue.task_done()
43 callable, args, kwargs = task
44 rv = callable(*args, **kwargs)
46 if self.rvqueue is not None:
49 self.queue.task_done()
51 traceback.print_exc(file = sys.stderr)
52 self.delayed_exceptions.append(sys.exc_info())
54 def attach(self, queue, rvqueue, delayed_exceptions):
56 self.rvqueue = rvqueue
57 self.delayed_exceptions = delayed_exceptions
60 self.queue.put(self.QUIT)
62 class ParallelRun(object):
63 def __init__(self, maxthreads = None, maxqueue = None, results = True):
64 self.maxqueue = maxqueue
65 self.maxthreads = maxthreads
67 self.queue = queue.Queue(self.maxqueue or 0)
69 self.delayed_exceptions = []
72 self.rvqueue = queue.Queue()
76 self.initialize_workers()
78 def initialize_workers(self):
81 maxthreads = self.maxthreads
83 # Compute maximum number of threads allowed by the system
84 if maxthreads is None:
87 with open("/proc/cpuinfo") as f:
88 N_PROCS = sum("processor" in l for l in f)
93 if maxthreads is None:
99 for x in range(maxthreads):
100 worker = WorkerThread()
101 worker.attach(self.queue, self.rvqueue, self.delayed_exceptions)
102 worker.setDaemon(True)
104 self.workers.append(worker)
112 self.queue.get(block = False)
113 self.queue.task_done()
122 def put(self, callable, *args, **kwargs):
123 self.queue.put((callable, args, kwargs))
125 def put_nowait(self, callable, *args, **kwargs):
126 self.queue.put_nowait((callable, args, kwargs))
129 for worker in self.workers:
130 if not worker.isAlive():
134 # Wait until all queued tasks have been processed
137 for worker in self.workers:
140 for worker in self.workers:
144 if self.delayed_exceptions:
145 typ,val,loc = self.delayed_exceptions[0]
146 del self.delayed_exceptions[:]
147 raise typ(val).with_traceback(loc)
150 if self.rvqueue is not None:
153 yield self.rvqueue.get_nowait()
157 yield self.rvqueue.get_nowait()