...
[plcapi.git] / psycopg2 / ZPsycopgDA / db.py
1 # ZPsycopgDA/db.py - query execution
2 #
3 # Copyright (C) 2004 Federico Di Gregorio <fog@initd.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by the
7 # Free Software Foundation; either version 2, or (at your option) any later
8 # version.
9 #
10 # Or, at your option this program (ZPsycopgDA) can be distributed under the
11 # Zope Public License (ZPL) Version 1.0, as published on the Zope web site,
12 # http://www.zope.org/Resources/ZPL.
13 #
14 # This program is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
16 # or FITNESS FOR A PARTICULAR PURPOSE.
17 #
18 # See the LICENSE file for details.
19
20 from Shared.DC.ZRDB.TM import TM
21 from Shared.DC.ZRDB import dbi_db
22
23 from ZODB.POSException import ConflictError
24
25 import site
26 import pool
27
28 import psycopg2
29 from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE, TIME
30 from psycopg2 import NUMBER, STRING, ROWID, DATETIME 
31
32
33 # the DB object, managing all the real query work
34
35 class DB(TM, dbi_db.DB):
36     
37     _p_oid = _p_changed = _registered = None
38
39     def __init__(self, dsn, tilevel, enc='utf-8'):
40         self.dsn = dsn
41         self.tilevel = tilevel
42         self.encoding = enc
43         self.failures = 0
44         self.calls = 0
45         self.make_mappings()
46
47     def getconn(self, create=True):
48         conn = pool.getconn(self.dsn)
49         conn.set_isolation_level(int(self.tilevel))
50         return conn
51
52     def putconn(self, close=False):
53         try:
54             conn = pool.getconn(self.dsn, False)
55         except AttributeError:
56             pass
57         pool.putconn(self.dsn, conn, close)
58         
59     def getcursor(self):
60         conn = self.getconn()
61         return conn.cursor()
62
63     def _finish(self, *ignored):
64         try:
65             conn = self.getconn(False)
66             conn.commit()
67             self.putconn()
68         except AttributeError:
69             pass
70             
71     def _abort(self, *ignored):
72         try:
73             conn = self.getconn(False)
74             conn.rollback()
75             self.putconn()
76         except AttributeError:
77             pass
78
79     def open(self):
80         # this will create a new pool for our DSN if not already existing,
81         # then get and immediately release a connection
82         self.getconn()
83         self.putconn()
84         
85     def close(self):
86         # FIXME: if this connection is closed we flush all the pool associated
87         # with the current DSN; does this makes sense?
88         pool.flushpool(self.dsn)
89
90     def sortKey(self):
91         return 1
92
93     def make_mappings(self):
94         """Generate the mappings used later by self.convert_description()."""
95         self.type_mappings = {}
96         for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'),  
97                      (BOOLEAN,'n'), (ROWID, 'i'),
98                      (DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]:
99             for v in t.values:
100                 self.type_mappings[v] = (t, s)
101
102     def convert_description(self, desc, use_psycopg_types=False):
103         """Convert DBAPI-2.0 description field to Zope format."""
104         items = []
105         for name, typ, width, ds, p, scale, null_ok in desc:
106             m = self.type_mappings.get(typ, (STRING, 's'))
107             items.append({
108                 'name': name,
109                 'type': use_psycopg_types and m[0] or m[1],
110                 'width': width,
111                 'precision': p,
112                 'scale': scale,
113                 'null': null_ok,
114                 })
115         return items
116
117     ## tables and rows ##
118
119     def tables(self, rdb=0, _care=('TABLE', 'VIEW')):
120         self._register()
121         c = self.getcursor()
122         c.execute(
123             "SELECT t.tablename AS NAME, 'TABLE' AS TYPE "
124             "  FROM pg_tables t WHERE tableowner <> 'postgres' "
125             "UNION SELECT v.viewname AS NAME, 'VIEW' AS TYPE "
126             "  FROM pg_views v WHERE viewowner <> 'postgres' "
127             "UNION SELECT t.tablename AS NAME, 'SYSTEM_TABLE\' AS TYPE "
128             "  FROM pg_tables t WHERE tableowner = 'postgres' "
129             "UNION SELECT v.viewname AS NAME, 'SYSTEM_TABLE' AS TYPE "
130             "FROM pg_views v WHERE viewowner = 'postgres'")
131         res = []
132         for name, typ in c.fetchall():
133             if typ in _care:
134                 res.append({'TABLE_NAME': name, 'TABLE_TYPE': typ})
135         self.putconn()
136         return res
137
138     def columns(self, table_name):
139         self._register()
140         c = self.getcursor()
141         try:
142             r = c.execute('SELECT * FROM "%s" WHERE 1=0' % table_name)
143         except:
144             return ()
145         self.putconn()
146         return self.convert_description(c.description, True)
147     
148     ## query execution ##
149
150     def query(self, query_string, max_rows=None, query_data=None):
151         self._register()
152         self.calls = self.calls+1
153
154         desc = ()
155         res = []
156         nselects = 0
157
158         c = self.getcursor()
159
160         try:
161             for qs in [x for x in query_string.split('\0') if x]:
162                 if type(qs) == unicode:
163                     if self.encoding:
164                         qs = qs.encode(self.encoding)
165                 try:
166                     if query_data:
167                         c.execute(qs, query_data)
168                     else:
169                         c.execute(qs)
170                 except psycopg2.OperationalError, e:
171                     try:
172                         self.close()
173                     except:
174                         pass
175                     self.open()
176                     try:
177                         if   query_data:
178                             c.execute(qs, query_data)
179                         else:
180                             c.execute(qs)
181                     except (psycopg2.ProgrammingError,
182                             psycopg2.IntegrityError), e:
183                         if e.args[0].find("concurrent update") > -1:
184                             raise ConflictError
185                         raise e
186                 except (psycopg2.ProgrammingError, psycopg2.IntegrityError), e:
187                     if e.args[0].find("concurrent update") > -1:
188                         raise ConflictError
189                     raise e
190                 if c.description is not None:
191                     nselects += 1
192                     if c.description != desc and nselects > 1:
193                         raise psycopg2.ProgrammingError(
194                             'multiple selects in single query not allowed')
195                     if max_rows:
196                         res = c.fetchmany(max_rows)
197                     else:
198                         res = c.fetchall()
199                     desc = c.description
200             self.failures = 0
201
202         except StandardError, err:
203             self._abort()
204             raise err
205         
206         return self.convert_description(desc), res