Merge remote-tracking branch 'origin/pycurl' into planetlab-4_0-branch
[plcapi.git] / trunk / psycopg2 / lib / pool.py
1 """Connection pooling for psycopg2
2
3 This module implements thread-safe (and not) connection pools.
4 """
5 # psycopg/pool.py - pooling code for psycopg
6 #
7 # Copyright (C) 2003-2004 Federico Di Gregorio  <fog@debian.org>
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by the
11 # Free Software Foundation; either version 2, or (at your option) any later
12 # version.
13 #
14 # This program is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
16 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
17 # for more details.
18
19 import psycopg2
20
21 try:
22     import logging
23     # do basic initialization if the module is not already initialized
24     logging.basicConfig(level=logging.INFO,
25                         format='%(asctime)s %(levelname)s %(message)s')
26     # create logger object for psycopg2 module and sub-modules
27     _logger = logging.getLogger("psycopg2")
28     def dbg(*args):
29         _logger.debug("psycopg2", ' '.join([str(x) for x in args]))
30     try:
31         import App # does this make sure that we're running in Zope?
32         _logger.info("installed. Logging using Python logging module")
33     except:
34         _logger.debug("installed. Logging using Python logging module")
35     
36 except ImportError:
37     from zLOG import LOG, DEBUG, INFO
38     def dbg(*args):
39         LOG('ZPsycopgDA',  DEBUG, "",
40             ' '.join([str(x) for x in args])+'\n')
41     LOG('ZPsycopgDA', INFO, "Installed", "Logging using Zope's zLOG\n") 
42
43 except:
44     import sys
45     def dbg(*args):
46         sys.stderr.write(' '.join(args)+'\n')
47
48
49 class PoolError(psycopg2.Error):
50     pass
51
52
53 class AbstractConnectionPool(object):
54     """Generic key-based pooling code."""
55
56     def __init__(self, minconn, maxconn, *args, **kwargs):
57         """Initialize the connection pool.
58
59         New 'minconn' connections are created immediately calling 'connfunc'
60         with given parameters. The connection pool will support a maximum of
61         about 'maxconn' connections.        
62         """
63         self.minconn = minconn
64         self.maxconn = maxconn
65         self.closed = False
66         
67         self._args = args
68         self._kwargs = kwargs
69
70         self._pool = []
71         self._used = {}
72         self._rused = {} # id(conn) -> key map
73         self._keys = 0
74
75         for i in range(self.minconn):
76             self._connect()
77
78     def _connect(self, key=None):
79         """Create a new connection and assign it to 'key' if not None."""
80         conn = psycopg2.connect(*self._args, **self._kwargs)
81         if key is not None:
82             self._used[key] = conn
83             self._rused[id(conn)] = key
84         else:
85             self._pool.append(conn)
86         return conn
87
88     def _getkey(self):
89         """Return a new unique key."""
90         self._keys += 1
91         return self._keys
92             
93     def _getconn(self, key=None):
94         """Get a free connection and assign it to 'key' if not None."""
95         if self.closed: raise PoolError("connection pool is closed")
96         if key is None: key = self._getkey()
97         
98         if self._used.has_key(key):
99             return self._used[key]
100
101         if self._pool:
102             self._used[key] = conn = self._pool.pop()
103             self._rused[id(conn)] = key
104             return conn
105         else:
106             if len(self._used) == self.maxconn:
107                 raise PoolError("connection pool exausted")
108             return self._connect(key)
109                  
110     def _putconn(self, conn, key=None, close=False):
111         """Put away a connection."""
112         if self.closed: raise PoolError("connection pool is closed")
113         if key is None: key = self._rused[id(conn)]
114
115         if not key:
116             raise PoolError("trying to put unkeyed connection")
117
118         if len(self._pool) < self.minconn and not close:
119             self._pool.append(conn)
120         else:
121             conn.close()
122
123         # here we check for the presence of key because it can happen that a
124         # thread tries to put back a connection after a call to close
125         if not self.closed or key in self._used:
126             del self._used[key]
127             del self._rused[id(conn)]
128
129     def _closeall(self):
130         """Close all connections.
131
132         Note that this can lead to some code fail badly when trying to use
133         an already closed connection. If you call .closeall() make sure
134         your code can deal with it.
135         """
136         if self.closed: raise PoolError("connection pool is closed")
137         for conn in self._pool + list(self._used.values()):
138             try:
139                 print "Closing connection", conn
140                 conn.close()
141             except:
142                 pass
143         self.closed = True
144         
145
146 class SimpleConnectionPool(AbstractConnectionPool):
147     """A connection pool that can't be shared across different threads."""
148
149     getconn = AbstractConnectionPool._getconn
150     putconn = AbstractConnectionPool._putconn
151     closeall   = AbstractConnectionPool._closeall
152
153
154 class ThreadedConnectionPool(AbstractConnectionPool):
155     """A connection pool that works with the threading module."""
156
157     def __init__(self, minconn, maxconn, *args, **kwargs):
158         """Initialize the threading lock."""
159         import threading
160         AbstractConnectionPool.__init__(
161             self, minconn, maxconn, *args, **kwargs)
162         self._lock = threading.Lock()
163
164     def getconn(self, key=None):
165         """Get a free connection and assign it to 'key' if not None."""
166         self._lock.acquire()
167         try:
168             return self._getconn(key)
169         finally:
170             self._lock.release()
171
172     def putconn(self, conn=None, key=None, close=False):
173         """Put away an unused connection."""
174         self._lock.acquire()
175         try:
176             self._putconn(conn, key, close)
177         finally:
178             self._lock.release()
179
180     def closeall(self):
181         """Close all connections (even the one currently in use.)"""
182         self._lock.acquire()
183         try:
184             self._closeall()
185         finally:
186             self._lock.release()
187
188
189 class PersistentConnectionPool(AbstractConnectionPool):
190     """A pool that assigns persistent connections to different threads. 
191
192     Note that this connection pool generates by itself the required keys
193     using the current thread id.  This means that untill a thread put away
194     a connection it will always get the same connection object by successive
195     .getconn() calls. This also means that a thread can't use more than one
196     single connection from the pool.
197     """
198
199     def __init__(self, minconn, maxconn, *args, **kwargs):
200         """Initialize the threading lock."""
201         import threading
202         AbstractConnectionPool.__init__(
203             self, minconn, maxconn, *args, **kwargs)
204         self._lock = threading.Lock()
205
206         # we we'll need the thread module, to determine thread ids, so we
207         # import it here and copy it in an instance variable
208         import thread
209         self.__thread = thread
210
211     def getconn(self):
212         """Generate thread id and return a connection."""
213         key = self.__thread.get_ident()
214         self._lock.acquire()
215         try:
216             return self._getconn(key)
217         finally:
218             self._lock.release()
219
220     def putconn(self, conn=None, close=False):
221         """Put away an unused connection."""
222         key = self.__thread.get_ident()
223         self._lock.acquire()
224         try:
225             if not conn: conn = self._used[key]
226             self._putconn(conn, key, close)
227         finally:
228             self._lock.release()
229
230     def closeall(self):
231         """Close all connections (even the one currently in use.)"""
232         self._lock.acquire()
233         try:
234             self._closeall()
235         finally:
236             self._lock.release()