From: Faiyaz Ahmed Date: Mon, 11 Aug 2008 18:34:45 +0000 (+0000) Subject: NM has a worker thread for every sliver ever to exist on the node. Each thread works... X-Git-Tag: NodeManager-1.8-0~17 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=c2ce4bf43f46ab89a72f8811caa10989e57ce967;p=nodemanager.git NM has a worker thread for every sliver ever to exist on the node. Each thread works off of a queue where the database adds a state from the statemachine to execute. To keep from spawning upwards of 300 worker processes during start(), a cummulative global delay is incremented (by 2) for every sliver known to the db. After the delay, start() is run. But, it's possible for the delay to grow without bound when NM is resynced to PLC and the db adds previously added states to the queue and causing the cumm start delay to be incremented once again. This patch keeps the queue from growing beyond the 4 known states. Any other additions are likely repeates and don't need to be added again. This is a bandaid; really we should keep a table of what exists in the queue and keep from readding while also bounding the delay to some maximum over the number of slivers present on the node. --- diff --git a/accounts.py b/accounts.py index 3065408..1fd264f 100644 --- a/accounts.py +++ b/accounts.py @@ -111,14 +111,14 @@ class Worker: # task list # outsiders request operations by putting (fn, args...) tuples on _q # the worker thread (created below) will perform these operations in order - self._q = Queue.Queue() + self._q = Queue.Queue(maxsize=4) # keep from overflowing and backing up. tools.as_daemon_thread(self._run) def ensure_created(self, rec): """Cause the account specified by to exist if it doesn't already.""" if rec.has_key('name'): logger.verbose('Worker.ensure_created with name=%s'%rec['name']) - self._q.put((self._ensure_created, rec.copy(), Startingup)) + self._enqueue((self._ensure_created, rec.copy(), Startingup)) logger.verbose('Worker queue has %d item(s)'%self._q.qsize()) def _ensure_created(self, rec, startingup): @@ -141,13 +141,13 @@ class Worker: elif next_class != curr_class or self._acct.initscriptchanged: self._acct.start() - def ensure_destroyed(self): self._q.put((self._ensure_destroyed,)) + def ensure_destroyed(self): self._enqueue((self._ensure_destroyed,)) def _ensure_destroyed(self): self._destroy(self._get_class()) - def start(self, delay=0): self._q.put((self._start, delay)) + def start(self, delay=0): self._enqueue((self._start, delay)) def _start(self, d): self._acct.start(delay=d) - def stop(self): self._q.put((self._stop,)) + def stop(self): self._enqueue((self._stop,)) def _stop(self): self._acct.stop() def is_running(self): @@ -179,3 +179,7 @@ class Worker: cmd[0](*cmd[1:]) except: logger.log_exc(self.name) + + def _enqueue(self, cmds): + try: self._q.put_nowait(cmds) + except Queue.Full: logger.log("%s Worker queue full." % self.name)