1 """Connection pooling for psycopg2
3 This module implements thread-safe (and not) connection pools.
5 # psycopg/pool.py - pooling code for psycopg
7 # Copyright (C) 2003-2004 Federico Di Gregorio <fog@debian.org>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by the
11 # Free Software Foundation; either version 2, or (at your option) any later
14 # This program is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
16 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
23 # do basic initialization if the module is not already initialized
24 logging.basicConfig(level=logging.INFO,
25 format='%(asctime)s %(levelname)s %(message)s')
26 # create logger object for psycopg2 module and sub-modules
27 _logger = logging.getLogger("psycopg2")
29 _logger.debug("psycopg2", ' '.join([str(x) for x in args]))
31 import App # does this make sure that we're running in Zope?
32 _logger.info("installed. Logging using Python logging module")
34 _logger.debug("installed. Logging using Python logging module")
37 from zLOG import LOG, DEBUG, INFO
39 LOG('ZPsycopgDA', DEBUG, "",
40 ' '.join([str(x) for x in args])+'\n')
41 LOG('ZPsycopgDA', INFO, "Installed", "Logging using Zope's zLOG\n")
46 sys.stderr.write(' '.join(args)+'\n')
49 class PoolError(psycopg2.Error):
53 class AbstractConnectionPool(object):
54 """Generic key-based pooling code."""
56 def __init__(self, minconn, maxconn, *args, **kwargs):
57 """Initialize the connection pool.
59 New 'minconn' connections are created immediately calling 'connfunc'
60 with given parameters. The connection pool will support a maximum of
61 about 'maxconn' connections.
63 self.minconn = minconn
64 self.maxconn = maxconn
72 self._rused = {} # id(conn) -> key map
75 for i in range(self.minconn):
78 def _connect(self, key=None):
79 """Create a new connection and assign it to 'key' if not None."""
80 conn = psycopg2.connect(*self._args, **self._kwargs)
82 self._used[key] = conn
83 self._rused[id(conn)] = key
85 self._pool.append(conn)
89 """Return a new unique key."""
93 def _getconn(self, key=None):
94 """Get a free connection and assign it to 'key' if not None."""
95 if self.closed: raise PoolError("connection pool is closed")
96 if key is None: key = self._getkey()
98 if self._used.has_key(key):
99 return self._used[key]
102 self._used[key] = conn = self._pool.pop()
103 self._rused[id(conn)] = key
106 if len(self._used) == self.maxconn:
107 raise PoolError("connection pool exausted")
108 return self._connect(key)
110 def _putconn(self, conn, key=None, close=False):
111 """Put away a connection."""
112 if self.closed: raise PoolError("connection pool is closed")
113 if key is None: key = self._rused[id(conn)]
116 raise PoolError("trying to put unkeyed connection")
118 if len(self._pool) < self.minconn and not close:
119 self._pool.append(conn)
123 # here we check for the presence of key because it can happen that a
124 # thread tries to put back a connection after a call to close
125 if not self.closed or key in self._used:
127 del self._rused[id(conn)]
130 """Close all connections.
132 Note that this can lead to some code fail badly when trying to use
133 an already closed connection. If you call .closeall() make sure
134 your code can deal with it.
136 if self.closed: raise PoolError("connection pool is closed")
137 for conn in self._pool + list(self._used.values()):
139 print "Closing connection", conn
146 class SimpleConnectionPool(AbstractConnectionPool):
147 """A connection pool that can't be shared across different threads."""
149 getconn = AbstractConnectionPool._getconn
150 putconn = AbstractConnectionPool._putconn
151 closeall = AbstractConnectionPool._closeall
154 class ThreadedConnectionPool(AbstractConnectionPool):
155 """A connection pool that works with the threading module."""
157 def __init__(self, minconn, maxconn, *args, **kwargs):
158 """Initialize the threading lock."""
160 AbstractConnectionPool.__init__(
161 self, minconn, maxconn, *args, **kwargs)
162 self._lock = threading.Lock()
164 def getconn(self, key=None):
165 """Get a free connection and assign it to 'key' if not None."""
168 return self._getconn(key)
172 def putconn(self, conn=None, key=None, close=False):
173 """Put away an unused connection."""
176 self._putconn(conn, key, close)
181 """Close all connections (even the one currently in use.)"""
189 class PersistentConnectionPool(AbstractConnectionPool):
190 """A pool that assigns persistent connections to different threads.
192 Note that this connection pool generates by itself the required keys
193 using the current thread id. This means that untill a thread put away
194 a connection it will always get the same connection object by successive
195 .getconn() calls. This also means that a thread can't use more than one
196 single connection from the pool.
199 def __init__(self, minconn, maxconn, *args, **kwargs):
200 """Initialize the threading lock."""
202 AbstractConnectionPool.__init__(
203 self, minconn, maxconn, *args, **kwargs)
204 self._lock = threading.Lock()
206 # we we'll need the thread module, to determine thread ids, so we
207 # import it here and copy it in an instance variable
209 self.__thread = thread
212 """Generate thread id and return a connection."""
213 key = self.__thread.get_ident()
216 return self._getconn(key)
220 def putconn(self, conn=None, close=False):
221 """Put away an unused connection."""
222 key = self.__thread.get_ident()
225 if not conn: conn = self._used[key]
226 self._putconn(conn, key, close)
231 """Close all connections (even the one currently in use.)"""