Ticket #45: spanning tree deployment
[nepi.git] / src / nepi / util / parallel.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import threading
5 import Queue
6 import traceback
7 import sys
8
9 N_PROCS = None
10
11 class ParallelMap(object):
12     def __init__(self, maxthreads = None, maxqueue = None, results = True):
13         global N_PROCS
14         
15         if maxthreads is None:
16             if N_PROCS is None:
17                 try:
18                     f = open("/proc/cpuinfo")
19                     try:
20                         N_PROCS = sum("processor" in l for l in f)
21                     finally:
22                         f.close()
23                 except:
24                     pass
25             maxthreads = N_PROCS
26         
27         if maxthreads is None:
28             maxthreads = 4
29
30         self.queue = Queue.Queue(maxqueue or 0)
31     
32         self.workers = [ threading.Thread(target = self.worker) 
33                          for x in xrange(maxthreads) ]
34         
35         if results:
36             self.rvqueue = Queue.Queue()
37         else:
38             self.rvqueue = None
39         
40     def put(self, callable, *args, **kwargs):
41         self.queue.put((callable, args, kwargs))
42     
43     def put_nowait(self, callable, *args, **kwargs):
44         self.queue.put_nowait((callable, args, kwargs))
45
46     def start(self):
47         for thread in self.workers:
48             thread.start()
49     
50     def join(self):
51         for thread in self.workers:
52             # That's the shutdown signal
53             self.queue.put(None)
54             
55         self.queue.join()
56         for thread in self.workers:
57             thread.join()
58         
59     def worker(self):
60         while True:
61             task = self.queue.get()
62             if task is None:
63                 self.queue.task_done()
64                 break
65             
66             try:
67                 try:
68                     callable, args, kwargs = task
69                     rv = callable(*args, **kwargs)
70                     
71                     if self.rvqueue is not None:
72                         self.rvqueue.put(rv)
73                 finally:
74                     self.queue.task_done()
75             except:
76                 traceback.print_exc(file = sys.stderr)
77
78     def __iter__(self):
79         if self.rvqueue is not None:
80             while True:
81                 try:
82                     yield self.rvqueue.get_nowait()
83                 except Queue.Empty:
84                     self.queue.join()
85                     try:
86                         yield self.rvqueue.get_nowait()
87                     except Queue.Empty:
88                         raise StopIteration
89             
90     
91