2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Claudio Freire <claudio-daniel.freire@inria.fr>
19 # Alina Quereilhac <alina.quereilhac@inria.fr>
30 class WorkerThread(threading.Thread):
36 task = self.queue.get()
39 self.queue.task_done()
44 callable, args, kwargs = task
45 rv = callable(*args, **kwargs)
47 if self.rvqueue is not None:
50 self.queue.task_done()
52 traceback.print_exc(file = sys.stderr)
53 self.delayed_exceptions.append(sys.exc_info())
55 def attach(self, queue, rvqueue, delayed_exceptions):
57 self.rvqueue = rvqueue
58 self.delayed_exceptions = delayed_exceptions
61 self.queue.put(self.QUIT)
63 class ParallelRun(object):
64 def __init__(self, maxthreads = None, maxqueue = None, results = True):
65 self.maxqueue = maxqueue
66 self.maxthreads = maxthreads
68 self.queue = Queue.Queue(self.maxqueue or 0)
70 self.delayed_exceptions = []
73 self.rvqueue = Queue.Queue()
77 self.initialize_workers()
79 def initialize_workers(self):
82 maxthreads = self.maxthreads
84 # Compute maximum number of threads allowed by the system
85 if maxthreads is None:
88 f = open("/proc/cpuinfo")
90 N_PROCS = sum("processor" in l for l in f)
97 if maxthreads is None:
103 for x in xrange(maxthreads):
104 worker = WorkerThread()
105 worker.attach(self.queue, self.rvqueue, self.delayed_exceptions)
106 worker.setDaemon(True)
108 self.workers.append(worker)
116 self.queue.get(block = False)
117 self.queue.task_done()
126 def put(self, callable, *args, **kwargs):
127 self.queue.put((callable, args, kwargs))
129 def put_nowait(self, callable, *args, **kwargs):
130 self.queue.put_nowait((callable, args, kwargs))
133 for worker in self.workers:
134 if not worker.isAlive():
138 # Wait until all queued tasks have been processed
141 for worker in self.workers:
144 for worker in self.workers:
148 if self.delayed_exceptions:
149 typ,val,loc = self.delayed_exceptions[0]
150 del self.delayed_exceptions[:]
154 if self.rvqueue is not None:
157 yield self.rvqueue.get_nowait()
161 yield self.rvqueue.get_nowait()