3939994b8e03104da196a49fa32ebf0e39de248c
[sfa.git] / sfa / client / multiclient.py
1 from __future__ import print_function
2
3 import threading
4 import traceback
5 import time
6 from Queue import Queue
7 from sfa.util.sfalogging import logger
8
9
10 def ThreadedMethod(callable, results, errors):
11     """
12     A function decorator that returns a running thread. The thread
13     runs the specified callable and stores the result in the specified
14     results queue
15     """
16     def wrapper(args, kwds):
17         class ThreadInstance(threading.Thread):
18
19             def run(self):
20                 try:
21                     results.put(callable(*args, **kwds))
22                 except Exception as e:
23                     logger.log_exc('MultiClient: Error in thread: ')
24                     errors.put(traceback.format_exc())
25
26         thread = ThreadInstance()
27         thread.start()
28         return thread
29     return wrapper
30
31
32 class MultiClient:
33     """
34     MultiClient allows to issue several SFA calls in parallel in different threads
35     and stores the results in a thread safe queue. 
36     """
37
38     def __init__(self):
39         self.results = Queue()
40         self.errors = Queue()
41         self.threads = []
42
43     def run(self, method, *args, **kwds):
44         """
45         Execute a callable in a separate thread.    
46         """
47         method = ThreadedMethod(method, self.results, self.errors)
48         thread = method(args, kwds)
49         self.threads.append(thread)
50
51     start = run
52
53     def join(self):
54         """
55         Wait for all threads to complete  
56         """
57         for thread in self.threads:
58             thread.join()
59
60     def get_results(self, lenient=True):
61         """
62         Return a list of all the results so far. Blocks until 
63         all threads are finished. 
64         If lenient is set to false the error queue will be checked before 
65         the response is returned. If there are errors in the queue an SFA Fault will 
66         be raised.   
67         """
68         self.join()
69         results = []
70         if not lenient:
71             errors = self.get_errors()
72             if errors:
73                 raise Exception(errors[0])
74
75         while not self.results.empty():
76             results.append(self.results.get())
77         return results
78
79     def get_errors(self):
80         """
81         Return a list of all errors. Blocks untill all threads are finished
82         """
83         self.join()
84         errors = []
85         while not self.errors.empty():
86             errors.append(self.errors.get())
87         return errors
88
89     def get_return_value(self):
90         """
91         Get the value that should be returuned to the client. If there are errors then the
92         first error is returned. If there are no errors, then the first result is returned  
93         """
94
95
96 if __name__ == '__main__':
97
98     def f(name, n, sleep=1):
99         nums = []
100         for i in range(n, n + 5):
101             print("%s: %s" % (name, i))
102             nums.append(i)
103             time.sleep(sleep)
104         return nums
105
106     def e(name, n, sleep=1):
107         nums = []
108         for i in range(n, n + 3) + ['n', 'b']:
109             print("%s: 1 + %s:" % (name, i))
110             nums.append(i + 1)
111             time.sleep(sleep)
112         return nums
113
114     threads = MultiClient()
115     threads.run(f, "Thread1", 10, 2)
116     threads.run(f, "Thread2", -10, 1)
117     threads.run(e, "Thread3", 19, 1)
118
119     #results = threads.get_results()
120     #errors = threads.get_errors()
121     # print "Results:", results
122     # print "Errors:", errors
123     results_xlenient = threads.get_results(lenient=False)