Small edit in order to test the git repo. NT.
[sfa.git] / sfa / util / threadmanager.py
1 import threading
2 import traceback
3 import time
4 from Queue import Queue
5
6 def ThreadedMethod(callable, results, errors):
7     """
8     A function decorator that returns a running thread. The thread
9     runs the specified callable and stores the result in the specified
10     results queue
11     """
12     def wrapper(args, kwds):
13         class ThreadInstance(threading.Thread): 
14             def run(self):
15                 try:
16                     results.put(callable(*args, **kwds))
17                 except Exception, e:
18                     errors.put(traceback.format_exc())
19                     
20         thread = ThreadInstance()
21         thread.start()
22         return thread
23     return wrapper
24
25  
26
27 class ThreadManager:
28     """
29     ThreadManager executes a callable in a thread and stores the result
30     in a thread safe queue. 
31     """
32     results = Queue()
33     errors = Queue()
34     threads = []
35
36     def run (self, method, *args, **kwds):
37         """
38         Execute a callable in a separate thread.    
39         """
40         method = ThreadedMethod(method, self.results, self.errors)
41         thread = method(args, kwds)
42         self.threads.append(thread)
43
44     start = run
45
46     def join(self):
47         """
48         Wait for all threads to complete  
49         """
50         for thread in self.threads:
51             thread.join()
52
53     def get_results(self, lenient=True):
54         """
55         Return a list of all the results so far. Blocks until 
56         all threads are finished. 
57         If lienent is set to false the error queue will be checked before 
58         the response is returned. If there are errors in the queue an SFA Fault will 
59         be raised.   
60         """
61         self.join()
62         results = []
63         if not lenient:
64             errors = self.get_errors()
65             if errors: 
66                 raise Exception(errors[0])
67
68         while not self.results.empty():
69             results.append(self.results.get())  
70         return results
71
72     def get_errors(self):
73         """
74         Return a list of all errors. Blocks untill all threads are finished
75         """
76         self.join()
77         errors = []
78         while not self.errors.empty():
79             errors.append(self.errors.get())
80         return errors
81
82     def get_return_value(self):
83         """
84         Get the value that should be returuned to the client. If there are errors then the
85         first error is returned. If there are no errors, then the first result is returned  
86         """
87     
88            
89 if __name__ == '__main__':
90
91     def f(name, n, sleep=1):
92         nums = []
93         for i in range(n, n+5):
94             print "%s: %s" % (name, i)
95             nums.append(i)
96             time.sleep(sleep)
97         return nums
98     def e(name, n, sleep=1):
99         nums = []
100         for i in range(n, n+3) + ['n', 'b']:
101             print "%s: 1 + %s:" % (name, i)
102             nums.append(i + 1)
103             time.sleep(sleep)
104         return nums      
105
106     threads = ThreadManager()
107     threads.run(f, "Thread1", 10, 2)
108     threads.run(f, "Thread2", -10, 1)
109     threads.run(e, "Thread3", 19, 1)
110
111     #results = threads.get_results()
112     #errors = threads.get_errors()
113     #print "Results:", results
114     #print "Errors:", errors
115     results_xlenient = threads.get_results(lenient=False)
116