4 from Queue import Queue
5 from sfa.util.sfalogging import logger
7 def ThreadedMethod(callable, results, errors):
9 A function decorator that returns a running thread. The thread
10 runs the specified callable and stores the result in the specified
13 def wrapper(args, kwds):
14 class ThreadInstance(threading.Thread):
17 results.put(callable(*args, **kwds))
19 logger.log_exc('ThreadManager: Error in thread: ')
20 errors.put(traceback.format_exc())
22 thread = ThreadInstance()
31 ThreadManager executes a callable in a thread and stores the result
32 in a thread safe queue.
36 self.results = Queue()
40 def run (self, method, *args, **kwds):
42 Execute a callable in a separate thread.
44 method = ThreadedMethod(method, self.results, self.errors)
45 thread = method(args, kwds)
46 self.threads.append(thread)
52 Wait for all threads to complete
54 for thread in self.threads:
57 def get_results(self, lenient=True):
59 Return a list of all the results so far. Blocks until
60 all threads are finished.
61 If lienent is set to false the error queue will be checked before
62 the response is returned. If there are errors in the queue an SFA Fault will
68 errors = self.get_errors()
70 raise Exception(errors[0])
72 while not self.results.empty():
73 results.append(self.results.get())
78 Return a list of all errors. Blocks untill all threads are finished
82 while not self.errors.empty():
83 errors.append(self.errors.get())
86 def get_return_value(self):
88 Get the value that should be returuned to the client. If there are errors then the
89 first error is returned. If there are no errors, then the first result is returned
93 if __name__ == '__main__':
95 def f(name, n, sleep=1):
97 for i in range(n, n+5):
98 print "%s: %s" % (name, i)
102 def e(name, n, sleep=1):
104 for i in range(n, n+3) + ['n', 'b']:
105 print "%s: 1 + %s:" % (name, i)
110 threads = ThreadManager()
111 threads.run(f, "Thread1", 10, 2)
112 threads.run(f, "Thread2", -10, 1)
113 threads.run(e, "Thread3", 19, 1)
115 #results = threads.get_results()
116 #errors = threads.get_errors()
117 #print "Results:", results
118 #print "Errors:", errors
119 results_xlenient = threads.get_results(lenient=False)