2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Claudio Freire <claudio-daniel.freire@inria.fr>
21 # A.Q. TODO: BUG FIX THREADCACHE. Not needed!! remove it completely!
32 #THREADCACHEPID = None
34 class WorkerThread(threading.Thread):
42 task = self.queue.get()
45 self.queue.task_done()
47 elif task is self.QUIT:
49 self.queue.task_done()
51 elif task is self.REASSIGNED:
58 callable, args, kwargs = task
59 rv = callable(*args, **kwargs)
61 if self.rvqueue is not None:
64 self.queue.task_done()
66 traceback.print_exc(file = sys.stderr)
67 self.delayed_exceptions.append(sys.exc_info())
70 while not self.queue.empty() and not self.done:
73 def attach(self, queue, rvqueue, delayed_exceptions):
78 self.rvqueue = rvqueue
79 self.delayed_exceptions = delayed_exceptions
81 oldqueue.put(self.REASSIGNED)
86 self.oldqueue = self.queue
87 self.queue = Queue.Queue()
89 self.delayed_exceptions = []
91 def detach_signal(self):
93 self.oldqueue.put(self.REASSIGNED)
97 self.queue.put(self.QUIT)
100 class ParallelMap(object):
101 def __init__(self, maxthreads = None, maxqueue = None, results = True):
104 #global THREADCACHEPID
106 if maxthreads is None:
109 f = open("/proc/cpuinfo")
111 N_PROCS = sum("processor" in l for l in f)
118 if maxthreads is None:
121 self.queue = Queue.Queue(maxqueue or 0)
123 self.delayed_exceptions = []
126 self.rvqueue = Queue.Queue()
131 #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
133 # THREADCACHEPID = os.getpid()
136 for x in xrange(maxthreads):
140 # t = THREADCACHE.pop()
148 t.attach(self.queue, self.rvqueue, self.delayed_exceptions)
149 self.workers.append(t)
157 #global THREADCACHEPID
158 #if THREADCACHEPID is None or THREADCACHEPID != os.getpid():
160 # THREADCACHEPID = os.getpid()
162 for worker in self.workers:
164 for worker in self.workers:
166 for worker in self.workers:
167 worker.detach_signal()
168 for worker in self.workers:
172 # THREADCACHE.extend(self.workers)
176 def put(self, callable, *args, **kwargs):
177 self.queue.put((callable, args, kwargs))
179 def put_nowait(self, callable, *args, **kwargs):
180 self.queue.put_nowait((callable, args, kwargs))
183 for thread in self.workers:
184 if not thread.isAlive():
188 for thread in self.workers:
189 # That's the sync signal
193 for thread in self.workers:
196 if self.delayed_exceptions:
197 typ,val,loc = self.delayed_exceptions[0]
198 del self.delayed_exceptions[:]
205 if self.delayed_exceptions:
206 typ,val,loc = self.delayed_exceptions[0]
207 del self.delayed_exceptions[:]
211 if self.rvqueue is not None:
214 yield self.rvqueue.get_nowait()
218 yield self.rvqueue.get_nowait()
223 class ParallelFilter(ParallelMap):
227 def __filter(self, x):
228 if self.filter_condition(x):
231 return self._FILTERED
233 def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
234 super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
235 self.filter_condition = filter_condition
238 super(ParallelFilter, self).put(self.__filter, what)
240 def put_nowait(self, what):
241 super(ParallelFilter, self).put_nowait(self.__filter, what)
244 for rv in super(ParallelFilter, self).__iter__():
245 if rv is not self._FILTERED:
248 class ParallelRun(ParallelMap):
251 return fn(*args, **kwargs)
253 def __init__(self, maxthreads = None, maxqueue = None):
254 super(ParallelRun, self).__init__(maxthreads, maxqueue, True)
256 def put(self, what, *args, **kwargs):
257 super(ParallelRun, self).put(self.__run, (what, args, kwargs))
259 def put_nowait(self, what, *args, **kwargs):
260 super(ParallelRun, self).put_nowait(self.__filter, (what, args, kwargs))
263 def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
264 mapper = ParallelMap(
265 maxthreads = maxthreads,
269 for elem in iterable:
275 def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
276 filtrer = ParallelFilter(
278 maxthreads = maxthreads,
281 for elem in iterable: