4 from Queue import Queue
5 from sfa.util.sfa.logging import logger
7 def ThreadedMethod(callable, results, errors):
9 A function decorator that returns a running thread. The thread
10 runs the specified callable and stores the result in the specified
13 def wrapper(args, kwds):
14 class ThreadInstance(threading.Thread):
17 results.put(callable(*args, **kwds))
19 logger.log_exc('ThreadManager: Error in thread: ')
20 errors.put(traceback.format_exc())
22 thread = ThreadInstance()
31 ThreadManager executes a callable in a thread and stores the result
32 in a thread safe queue.
38 def run (self, method, *args, **kwds):
40 Execute a callable in a separate thread.
42 method = ThreadedMethod(method, self.results, self.errors)
43 thread = method(args, kwds)
44 self.threads.append(thread)
50 Wait for all threads to complete
52 for thread in self.threads:
55 def get_results(self, lenient=True):
57 Return a list of all the results so far. Blocks until
58 all threads are finished.
59 If lienent is set to false the error queue will be checked before
60 the response is returned. If there are errors in the queue an SFA Fault will
66 errors = self.get_errors()
68 raise Exception(errors[0])
70 while not self.results.empty():
71 results.append(self.results.get())
76 Return a list of all errors. Blocks untill all threads are finished
80 while not self.errors.empty():
81 errors.append(self.errors.get())
84 def get_return_value(self):
86 Get the value that should be returuned to the client. If there are errors then the
87 first error is returned. If there are no errors, then the first result is returned
91 if __name__ == '__main__':
93 def f(name, n, sleep=1):
95 for i in range(n, n+5):
96 print "%s: %s" % (name, i)
100 def e(name, n, sleep=1):
102 for i in range(n, n+3) + ['n', 'b']:
103 print "%s: 1 + %s:" % (name, i)
108 threads = ThreadManager()
109 threads.run(f, "Thread1", 10, 2)
110 threads.run(f, "Thread2", -10, 1)
111 threads.run(e, "Thread3", 19, 1)
113 #results = threads.get_results()
114 #errors = threads.get_errors()
115 #print "Results:", results
116 #print "Errors:", errors
117 results_xlenient = threads.get_results(lenient=False)