1 # Borrowed from Chandler
2 # http://chandlerproject.org/Projects/ChandlerTwistedInThreadedEnvironment
5 from manifold.util.singleton import Singleton
6 from manifold.util.log import *
7 from twisted.internet import defer
8 from twisted.python import threadable
10 __author__ ="Brian Kirsch <bkirsch@osafoundation.org>"
12 #required for using threads with the Reactor
15 class ReactorException(Exception):
16 def __init__(self, *args):
17 Exception.__init__(self, *args)
20 class ReactorThread(threading.Thread):
22 Run the Reactor in a Thread to prevent blocking the
23 Main Thread once reactor.run is called
26 __metaclass__ = Singleton
29 threading.Thread.__init__(self)
30 self._reactorRunning = False
32 # Be sure the import is done only at runtime, we keep a reference in the
34 from twisted.internet import reactor
35 self.reactor = reactor
38 if self._reactorRunning:
39 raise ReactorException("Reactor Already Running")
41 self._reactorRunning = True
43 #call run passing a False flag indicating to the
44 #reactor not to install sig handlers since sig handlers
45 #only work on the main thread
47 #signal.signal(signal.SIGINT, signal.default_int_handler)
48 self.reactor.run(False)
50 print "Reactor exception:", e
52 def callInReactor(self, callable, *args, **kw):
53 if self._reactorRunning:
54 self.reactor.callFromThread(callable, *args, **kw)
58 def isReactorRunning(self):
59 return self._reactorRunning
61 def start_reactor(self):
62 if self._reactorRunning:
63 log_warning("Reactor already running. This is normal, please remove this debug message")
65 #raise ReactorException("Reactor Already Running")
66 threading.Thread.start(self)
68 while not self._reactorRunning:
72 raise ReactorException, "Reactor thread is too long to start... cancelling"
73 self.reactor.addSystemEventTrigger('after', 'shutdown', self.__reactorShutDown)
75 def stop_reactor(self):
77 may want a way to force thread to join if reactor does not shutdown
78 properly. The reactor can get in to a recursive loop condition if reactor.stop
79 placed in the threads join method. This will require further investigation.
81 if not self._reactorRunning:
82 raise ReactorException("Reactor Not Running")
83 self.reactor.callFromThread(self.reactor.stop)
86 def addReactorEventTrigger(self, phase, eventType, callable):
87 if self._reactorRunning:
88 self.reactor.callFromThread(self.reactor.addSystemEventTrigger, phase, eventType, callable)
90 self.reactor.addSystemEventTrigger(phase, eventType, callable)
92 def __reactorShuttingDown(self):
95 def __reactorShutDown(self):
96 """This method called when the reactor is stopped"""
97 self._reactorRunning = False
99 def __getattr__(self, name):
100 # We transfer missing methods to the reactor
101 def _missing(*args, **kwargs):
102 self.reactor.callFromThread(getattr(self.reactor, name), *args, **kwargs)