+class WorkerThread(threading.Thread):
+ class QUIT:
+ pass
+ class REASSIGNED:
+ pass
+
+ def run(self):
+ while True:
+ task = self.queue.get()
+ if task is None:
+ self.done = True
+ self.queue.task_done()
+ continue
+ elif task is self.QUIT:
+ self.done = True
+ self.queue.task_done()
+ break
+ elif task is self.REASSIGNED:
+ continue
+ else:
+ self.done = False
+
+ try:
+ try:
+ callable, args, kwargs = task
+ rv = callable(*args, **kwargs)
+
+ if self.rvqueue is not None:
+ self.rvqueue.put(rv)
+ finally:
+ self.queue.task_done()
+ except:
+ traceback.print_exc(file = sys.stderr)
+ self.delayed_exceptions.append(sys.exc_info())
+
+ def waitdone(self):
+ while not self.queue.empty() and not self.done:
+ self.queue.join()
+
+ def attach(self, queue, rvqueue, delayed_exceptions):
+ if self.isAlive():
+ self.waitdone()
+ oldqueue = self.queue
+ self.queue = queue
+ self.rvqueue = rvqueue
+ self.delayed_exceptions = delayed_exceptions
+ if self.isAlive():
+ oldqueue.put(self.REASSIGNED)
+
+ def detach(self):
+ if self.isAlive():
+ self.waitdone()
+ self.oldqueue = self.queue
+ self.queue = Queue.Queue()
+ self.rvqueue = None
+ self.delayed_exceptions = []
+
+ def detach_signal(self):
+ if self.isAlive():
+ self.oldqueue.put(self.REASSIGNED)
+ del self.oldqueue
+
+ def quit(self):
+ self.queue.put(self.QUIT)
+ self.join()
+