Bound queue so they don't grow without bound.
authorFaiyaz Ahmed <faiyaza@cs.princeton.edu>
Tue, 12 Aug 2008 03:36:45 +0000 (03:36 +0000)
committerFaiyaz Ahmed <faiyaza@cs.princeton.edu>
Tue, 12 Aug 2008 03:36:45 +0000 (03:36 +0000)
accounts.py

index 3065408..1fd264f 100644 (file)
@@ -111,14 +111,14 @@ class Worker:
         # task list
         # outsiders request operations by putting (fn, args...) tuples on _q
         # the worker thread (created below) will perform these operations in order
-        self._q = Queue.Queue()
+        self._q = Queue.Queue(maxsize=4) # keep from overflowing and backing up.
         tools.as_daemon_thread(self._run)
 
     def ensure_created(self, rec):
         """Cause the account specified by <rec> to exist if it doesn't already."""
         if rec.has_key('name'):
             logger.verbose('Worker.ensure_created with name=%s'%rec['name'])
-        self._q.put((self._ensure_created, rec.copy(), Startingup))
+        self._enqueue((self._ensure_created, rec.copy(), Startingup))
         logger.verbose('Worker queue has %d item(s)'%self._q.qsize())
 
     def _ensure_created(self, rec, startingup):
@@ -141,13 +141,13 @@ class Worker:
         elif next_class != curr_class or self._acct.initscriptchanged:
             self._acct.start()
 
-    def ensure_destroyed(self): self._q.put((self._ensure_destroyed,))
+    def ensure_destroyed(self): self._enqueue((self._ensure_destroyed,))
     def _ensure_destroyed(self): self._destroy(self._get_class())
 
-    def start(self, delay=0): self._q.put((self._start, delay))
+    def start(self, delay=0): self._enqueue((self._start, delay))
     def _start(self, d): self._acct.start(delay=d)
 
-    def stop(self): self._q.put((self._stop,))
+    def stop(self): self._enqueue((self._stop,))
     def _stop(self): self._acct.stop()
 
     def is_running(self): 
@@ -179,3 +179,7 @@ class Worker:
                 cmd[0](*cmd[1:])
             except:
                 logger.log_exc(self.name)
+
+    def _enqueue(self, cmds):
+        try: self._q.put_nowait(cmds)
+        except Queue.Full:  logger.log("%s Worker queue full." % self.name)