Merge from trunk
[plcapi.git] / trunk / psycopg2 / lib / pool.py
diff --git a/trunk/psycopg2/lib/pool.py b/trunk/psycopg2/lib/pool.py
new file mode 100644 (file)
index 0000000..0468db6
--- /dev/null
@@ -0,0 +1,236 @@
+"""Connection pooling for psycopg2
+
+This module implements thread-safe (and not) connection pools.
+"""
+# psycopg/pool.py - pooling code for psycopg
+#
+# Copyright (C) 2003-2004 Federico Di Gregorio  <fog@debian.org>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2, or (at your option) any later
+# version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
+# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+# for more details.
+
+import psycopg2
+
+try:
+    import logging
+    # do basic initialization if the module is not already initialized
+    logging.basicConfig(level=logging.INFO,
+                        format='%(asctime)s %(levelname)s %(message)s')
+    # create logger object for psycopg2 module and sub-modules
+    _logger = logging.getLogger("psycopg2")
+    def dbg(*args):
+        _logger.debug("psycopg2", ' '.join([str(x) for x in args]))
+    try:
+        import App # does this make sure that we're running in Zope?
+        _logger.info("installed. Logging using Python logging module")
+    except:
+        _logger.debug("installed. Logging using Python logging module")
+    
+except ImportError:
+    from zLOG import LOG, DEBUG, INFO
+    def dbg(*args):
+        LOG('ZPsycopgDA',  DEBUG, "",
+            ' '.join([str(x) for x in args])+'\n')
+    LOG('ZPsycopgDA', INFO, "Installed", "Logging using Zope's zLOG\n") 
+
+except:
+    import sys
+    def dbg(*args):
+        sys.stderr.write(' '.join(args)+'\n')
+
+
+class PoolError(psycopg2.Error):
+    pass
+
+
+class AbstractConnectionPool(object):
+    """Generic key-based pooling code."""
+
+    def __init__(self, minconn, maxconn, *args, **kwargs):
+        """Initialize the connection pool.
+
+        New 'minconn' connections are created immediately calling 'connfunc'
+        with given parameters. The connection pool will support a maximum of
+        about 'maxconn' connections.        
+        """
+        self.minconn = minconn
+        self.maxconn = maxconn
+        self.closed = False
+        
+        self._args = args
+        self._kwargs = kwargs
+
+        self._pool = []
+        self._used = {}
+        self._rused = {} # id(conn) -> key map
+        self._keys = 0
+
+        for i in range(self.minconn):
+            self._connect()
+
+    def _connect(self, key=None):
+        """Create a new connection and assign it to 'key' if not None."""
+        conn = psycopg2.connect(*self._args, **self._kwargs)
+        if key is not None:
+            self._used[key] = conn
+            self._rused[id(conn)] = key
+        else:
+            self._pool.append(conn)
+        return conn
+
+    def _getkey(self):
+        """Return a new unique key."""
+        self._keys += 1
+        return self._keys
+            
+    def _getconn(self, key=None):
+        """Get a free connection and assign it to 'key' if not None."""
+        if self.closed: raise PoolError("connection pool is closed")
+        if key is None: key = self._getkey()
+       
+        if self._used.has_key(key):
+            return self._used[key]
+
+        if self._pool:
+            self._used[key] = conn = self._pool.pop()
+            self._rused[id(conn)] = key
+            return conn
+        else:
+            if len(self._used) == self.maxconn:
+                raise PoolError("connection pool exausted")
+            return self._connect(key)
+                
+    def _putconn(self, conn, key=None, close=False):
+        """Put away a connection."""
+        if self.closed: raise PoolError("connection pool is closed")
+        if key is None: key = self._rused[id(conn)]
+
+        if not key:
+            raise PoolError("trying to put unkeyed connection")
+
+        if len(self._pool) < self.minconn and not close:
+            self._pool.append(conn)
+        else:
+            conn.close()
+
+        # here we check for the presence of key because it can happen that a
+        # thread tries to put back a connection after a call to close
+        if not self.closed or key in self._used:
+            del self._used[key]
+            del self._rused[id(conn)]
+
+    def _closeall(self):
+        """Close all connections.
+
+        Note that this can lead to some code fail badly when trying to use
+        an already closed connection. If you call .closeall() make sure
+        your code can deal with it.
+        """
+        if self.closed: raise PoolError("connection pool is closed")
+        for conn in self._pool + list(self._used.values()):
+            try:
+                print "Closing connection", conn
+                conn.close()
+            except:
+                pass
+        self.closed = True
+        
+
+class SimpleConnectionPool(AbstractConnectionPool):
+    """A connection pool that can't be shared across different threads."""
+
+    getconn = AbstractConnectionPool._getconn
+    putconn = AbstractConnectionPool._putconn
+    closeall   = AbstractConnectionPool._closeall
+
+
+class ThreadedConnectionPool(AbstractConnectionPool):
+    """A connection pool that works with the threading module."""
+
+    def __init__(self, minconn, maxconn, *args, **kwargs):
+        """Initialize the threading lock."""
+        import threading
+        AbstractConnectionPool.__init__(
+            self, minconn, maxconn, *args, **kwargs)
+        self._lock = threading.Lock()
+
+    def getconn(self, key=None):
+        """Get a free connection and assign it to 'key' if not None."""
+        self._lock.acquire()
+        try:
+            return self._getconn(key)
+        finally:
+            self._lock.release()
+
+    def putconn(self, conn=None, key=None, close=False):
+        """Put away an unused connection."""
+        self._lock.acquire()
+        try:
+            self._putconn(conn, key, close)
+        finally:
+            self._lock.release()
+
+    def closeall(self):
+        """Close all connections (even the one currently in use.)"""
+        self._lock.acquire()
+        try:
+            self._closeall()
+        finally:
+            self._lock.release()
+
+
+class PersistentConnectionPool(AbstractConnectionPool):
+    """A pool that assigns persistent connections to different threads. 
+
+    Note that this connection pool generates by itself the required keys
+    using the current thread id.  This means that untill a thread put away
+    a connection it will always get the same connection object by successive
+    .getconn() calls. This also means that a thread can't use more than one
+    single connection from the pool.
+    """
+
+    def __init__(self, minconn, maxconn, *args, **kwargs):
+        """Initialize the threading lock."""
+        import threading
+        AbstractConnectionPool.__init__(
+            self, minconn, maxconn, *args, **kwargs)
+        self._lock = threading.Lock()
+
+        # we we'll need the thread module, to determine thread ids, so we
+        # import it here and copy it in an instance variable
+        import thread
+        self.__thread = thread
+
+    def getconn(self):
+        """Generate thread id and return a connection."""
+        key = self.__thread.get_ident()
+        self._lock.acquire()
+        try:
+            return self._getconn(key)
+        finally:
+            self._lock.release()
+
+    def putconn(self, conn=None, close=False):
+        """Put away an unused connection."""
+        key = self.__thread.get_ident()
+        self._lock.acquire()
+        try:
+            if not conn: conn = self._used[key]
+            self._putconn(conn, key, close)
+        finally:
+            self._lock.release()
+
+    def closeall(self):
+        """Close all connections (even the one currently in use.)"""
+        self._lock.acquire()
+        try:
+            self._closeall()
+        finally:
+            self._lock.release()