X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fclient%2Fmulticlient.py;fp=sfa%2Fclient%2Fmulticlient.py;h=75573ed51c5828ec1402383878b612f982b00022;hb=52f8ae3e237fbbd3c73b831cbdf6ca35de06e711;hp=0000000000000000000000000000000000000000;hpb=751353deb980129ebdf84f3e77830b47de800097;p=sfa.git diff --git a/sfa/client/multiclient.py b/sfa/client/multiclient.py new file mode 100644 index 00000000..75573ed5 --- /dev/null +++ b/sfa/client/multiclient.py @@ -0,0 +1,120 @@ +import threading +import traceback +import time +from Queue import Queue +from sfa.util.sfalogging import logger + +def ThreadedMethod(callable, results, errors): + """ + A function decorator that returns a running thread. The thread + runs the specified callable and stores the result in the specified + results queue + """ + def wrapper(args, kwds): + class ThreadInstance(threading.Thread): + def run(self): + try: + results.put(callable(*args, **kwds)) + except Exception, e: + logger.log_exc('MultiClient: Error in thread: ') + errors.put(traceback.format_exc()) + + thread = ThreadInstance() + thread.start() + return thread + return wrapper + + + +class MultiClient: + """ + MultiClient allows to issue several SFA calls in parallel in different threads + and stores the results in a thread safe queue. + """ + + def __init__(self): + self.results = Queue() + self.errors = Queue() + self.threads = [] + + def run (self, method, *args, **kwds): + """ + Execute a callable in a separate thread. + """ + method = ThreadedMethod(method, self.results, self.errors) + thread = method(args, kwds) + self.threads.append(thread) + + start = run + + def join(self): + """ + Wait for all threads to complete + """ + for thread in self.threads: + thread.join() + + def get_results(self, lenient=True): + """ + Return a list of all the results so far. Blocks until + all threads are finished. + If lenient is set to false the error queue will be checked before + the response is returned. If there are errors in the queue an SFA Fault will + be raised. + """ + self.join() + results = [] + if not lenient: + errors = self.get_errors() + if errors: + raise Exception(errors[0]) + + while not self.results.empty(): + results.append(self.results.get()) + return results + + def get_errors(self): + """ + Return a list of all errors. Blocks untill all threads are finished + """ + self.join() + errors = [] + while not self.errors.empty(): + errors.append(self.errors.get()) + return errors + + def get_return_value(self): + """ + Get the value that should be returuned to the client. If there are errors then the + first error is returned. If there are no errors, then the first result is returned + """ + + +if __name__ == '__main__': + + def f(name, n, sleep=1): + nums = [] + for i in range(n, n+5): + print "%s: %s" % (name, i) + nums.append(i) + time.sleep(sleep) + return nums + def e(name, n, sleep=1): + nums = [] + for i in range(n, n+3) + ['n', 'b']: + print "%s: 1 + %s:" % (name, i) + nums.append(i + 1) + time.sleep(sleep) + return nums + + threads = MultiClient() + threads.run(f, "Thread1", 10, 2) + threads.run(f, "Thread2", -10, 1) + threads.run(e, "Thread3", 19, 1) + + #results = threads.get_results() + #errors = threads.get_errors() + #print "Results:", results + #print "Errors:", errors + results_xlenient = threads.get_results(lenient=False) +