1 from __future__ import print_function
6 from Queue import Queue
7 from sfa.util.sfalogging import logger
9 def ThreadedMethod(callable, results, errors):
11 A function decorator that returns a running thread. The thread
12 runs the specified callable and stores the result in the specified
15 def wrapper(args, kwds):
16 class ThreadInstance(threading.Thread):
19 results.put(callable(*args, **kwds))
20 except Exception as e:
21 logger.log_exc('MultiClient: Error in thread: ')
22 errors.put(traceback.format_exc())
24 thread = ThreadInstance()
33 MultiClient allows to issue several SFA calls in parallel in different threads
34 and stores the results in a thread safe queue.
38 self.results = Queue()
42 def run (self, method, *args, **kwds):
44 Execute a callable in a separate thread.
46 method = ThreadedMethod(method, self.results, self.errors)
47 thread = method(args, kwds)
48 self.threads.append(thread)
54 Wait for all threads to complete
56 for thread in self.threads:
59 def get_results(self, lenient=True):
61 Return a list of all the results so far. Blocks until
62 all threads are finished.
63 If lenient is set to false the error queue will be checked before
64 the response is returned. If there are errors in the queue an SFA Fault will
70 errors = self.get_errors()
72 raise Exception(errors[0])
74 while not self.results.empty():
75 results.append(self.results.get())
80 Return a list of all errors. Blocks untill all threads are finished
84 while not self.errors.empty():
85 errors.append(self.errors.get())
88 def get_return_value(self):
90 Get the value that should be returuned to the client. If there are errors then the
91 first error is returned. If there are no errors, then the first result is returned
95 if __name__ == '__main__':
97 def f(name, n, sleep=1):
99 for i in range(n, n+5):
100 print("%s: %s" % (name, i))
104 def e(name, n, sleep=1):
106 for i in range(n, n+3) + ['n', 'b']:
107 print("%s: 1 + %s:" % (name, i))
112 threads = MultiClient()
113 threads.run(f, "Thread1", 10, 2)
114 threads.run(f, "Thread2", -10, 1)
115 threads.run(e, "Thread3", 19, 1)
117 #results = threads.get_results()
118 #errors = threads.get_errors()
119 #print "Results:", results
120 #print "Errors:", errors
121 results_xlenient = threads.get_results(lenient=False)