2to3 -f except
[sfa.git] / sfa / client / multiclient.py
1 from __future__ import print_function
2
3 import threading
4 import traceback
5 import time
6 from Queue import Queue
7 from sfa.util.sfalogging import logger
8
9 def ThreadedMethod(callable, results, errors):
10     """
11     A function decorator that returns a running thread. The thread
12     runs the specified callable and stores the result in the specified
13     results queue
14     """
15     def wrapper(args, kwds):
16         class ThreadInstance(threading.Thread): 
17             def run(self):
18                 try:
19                     results.put(callable(*args, **kwds))
20                 except Exception as e:
21                     logger.log_exc('MultiClient: Error in thread: ')
22                     errors.put(traceback.format_exc())
23                     
24         thread = ThreadInstance()
25         thread.start()
26         return thread
27     return wrapper
28
29  
30
31 class MultiClient:
32     """
33     MultiClient allows to issue several SFA calls in parallel in different threads
34     and stores the results in a thread safe queue. 
35     """
36
37     def __init__(self):
38         self.results = Queue()
39         self.errors = Queue()
40         self.threads = []
41
42     def run (self, method, *args, **kwds):
43         """
44         Execute a callable in a separate thread.    
45         """
46         method = ThreadedMethod(method, self.results, self.errors)
47         thread = method(args, kwds)
48         self.threads.append(thread)
49
50     start = run
51
52     def join(self):
53         """
54         Wait for all threads to complete  
55         """
56         for thread in self.threads:
57             thread.join()
58
59     def get_results(self, lenient=True):
60         """
61         Return a list of all the results so far. Blocks until 
62         all threads are finished. 
63         If lenient is set to false the error queue will be checked before 
64         the response is returned. If there are errors in the queue an SFA Fault will 
65         be raised.   
66         """
67         self.join()
68         results = []
69         if not lenient:
70             errors = self.get_errors()
71             if errors: 
72                 raise Exception(errors[0])
73
74         while not self.results.empty():
75             results.append(self.results.get())  
76         return results
77
78     def get_errors(self):
79         """
80         Return a list of all errors. Blocks untill all threads are finished
81         """
82         self.join()
83         errors = []
84         while not self.errors.empty():
85             errors.append(self.errors.get())
86         return errors
87
88     def get_return_value(self):
89         """
90         Get the value that should be returuned to the client. If there are errors then the
91         first error is returned. If there are no errors, then the first result is returned  
92         """
93     
94            
95 if __name__ == '__main__':
96
97     def f(name, n, sleep=1):
98         nums = []
99         for i in range(n, n+5):
100             print("%s: %s" % (name, i))
101             nums.append(i)
102             time.sleep(sleep)
103         return nums
104     def e(name, n, sleep=1):
105         nums = []
106         for i in range(n, n+3) + ['n', 'b']:
107             print("%s: 1 + %s:" % (name, i))
108             nums.append(i + 1)
109             time.sleep(sleep)
110         return nums      
111
112     threads = MultiClient()
113     threads.run(f, "Thread1", 10, 2)
114     threads.run(f, "Thread2", -10, 1)
115     threads.run(e, "Thread3", 19, 1)
116
117     #results = threads.get_results()
118     #errors = threads.get_errors()
119     #print "Results:", results
120     #print "Errors:", errors
121     results_xlenient = threads.get_results(lenient=False)
122