+++ /dev/null
-# ZPsycopgDA/db.py - query execution
-#
-# Copyright (C) 2004 Federico Di Gregorio <fog@initd.org>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by the
-# Free Software Foundation; either version 2, or (at your option) any later
-# version.
-#
-# Or, at your option this program (ZPsycopgDA) can be distributed under the
-# Zope Public License (ZPL) Version 1.0, as published on the Zope web site,
-# http://www.zope.org/Resources/ZPL.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
-# or FITNESS FOR A PARTICULAR PURPOSE.
-#
-# See the LICENSE file for details.
-
-from Shared.DC.ZRDB.TM import TM
-from Shared.DC.ZRDB import dbi_db
-
-from ZODB.POSException import ConflictError
-
-import site
-import pool
-
-import psycopg2
-from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE, TIME
-from psycopg2 import NUMBER, STRING, ROWID, DATETIME
-
-
-# the DB object, managing all the real query work
-
-class DB(TM, dbi_db.DB):
-
- _p_oid = _p_changed = _registered = None
-
- def __init__(self, dsn, tilevel, enc='utf-8'):
- self.dsn = dsn
- self.tilevel = tilevel
- self.encoding = enc
- self.failures = 0
- self.calls = 0
- self.make_mappings()
-
- def getconn(self, create=True):
- conn = pool.getconn(self.dsn)
- conn.set_isolation_level(int(self.tilevel))
- return conn
-
- def putconn(self, close=False):
- try:
- conn = pool.getconn(self.dsn, False)
- except AttributeError:
- pass
- pool.putconn(self.dsn, conn, close)
-
- def getcursor(self):
- conn = self.getconn()
- return conn.cursor()
-
- def _finish(self, *ignored):
- try:
- conn = self.getconn(False)
- conn.commit()
- self.putconn()
- except AttributeError:
- pass
-
- def _abort(self, *ignored):
- try:
- conn = self.getconn(False)
- conn.rollback()
- self.putconn()
- except AttributeError:
- pass
-
- def open(self):
- # this will create a new pool for our DSN if not already existing,
- # then get and immediately release a connection
- self.getconn()
- self.putconn()
-
- def close(self):
- # FIXME: if this connection is closed we flush all the pool associated
- # with the current DSN; does this makes sense?
- pool.flushpool(self.dsn)
-
- def sortKey(self):
- return 1
-
- def make_mappings(self):
- """Generate the mappings used later by self.convert_description()."""
- self.type_mappings = {}
- for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'),
- (BOOLEAN,'n'), (ROWID, 'i'),
- (DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]:
- for v in t.values:
- self.type_mappings[v] = (t, s)
-
- def convert_description(self, desc, use_psycopg_types=False):
- """Convert DBAPI-2.0 description field to Zope format."""
- items = []
- for name, typ, width, ds, p, scale, null_ok in desc:
- m = self.type_mappings.get(typ, (STRING, 's'))
- items.append({
- 'name': name,
- 'type': use_psycopg_types and m[0] or m[1],
- 'width': width,
- 'precision': p,
- 'scale': scale,
- 'null': null_ok,
- })
- return items
-
- ## tables and rows ##
-
- def tables(self, rdb=0, _care=('TABLE', 'VIEW')):
- self._register()
- c = self.getcursor()
- c.execute(
- "SELECT t.tablename AS NAME, 'TABLE' AS TYPE "
- " FROM pg_tables t WHERE tableowner <> 'postgres' "
- "UNION SELECT v.viewname AS NAME, 'VIEW' AS TYPE "
- " FROM pg_views v WHERE viewowner <> 'postgres' "
- "UNION SELECT t.tablename AS NAME, 'SYSTEM_TABLE\' AS TYPE "
- " FROM pg_tables t WHERE tableowner = 'postgres' "
- "UNION SELECT v.viewname AS NAME, 'SYSTEM_TABLE' AS TYPE "
- "FROM pg_views v WHERE viewowner = 'postgres'")
- res = []
- for name, typ in c.fetchall():
- if typ in _care:
- res.append({'TABLE_NAME': name, 'TABLE_TYPE': typ})
- self.putconn()
- return res
-
- def columns(self, table_name):
- self._register()
- c = self.getcursor()
- try:
- r = c.execute('SELECT * FROM "%s" WHERE 1=0' % table_name)
- except:
- return ()
- self.putconn()
- return self.convert_description(c.description, True)
-
- ## query execution ##
-
- def query(self, query_string, max_rows=None, query_data=None):
- self._register()
- self.calls = self.calls+1
-
- desc = ()
- res = []
- nselects = 0
-
- c = self.getcursor()
-
- try:
- for qs in [x for x in query_string.split('\0') if x]:
- if type(qs) == unicode:
- if self.encoding:
- qs = qs.encode(self.encoding)
- try:
- if query_data:
- c.execute(qs, query_data)
- else:
- c.execute(qs)
- except psycopg2.OperationalError, e:
- try:
- self.close()
- except:
- pass
- self.open()
- try:
- if query_data:
- c.execute(qs, query_data)
- else:
- c.execute(qs)
- except (psycopg2.ProgrammingError,
- psycopg2.IntegrityError), e:
- if e.args[0].find("concurrent update") > -1:
- raise ConflictError
- raise e
- except (psycopg2.ProgrammingError, psycopg2.IntegrityError), e:
- if e.args[0].find("concurrent update") > -1:
- raise ConflictError
- raise e
- if c.description is not None:
- nselects += 1
- if c.description != desc and nselects > 1:
- raise psycopg2.ProgrammingError(
- 'multiple selects in single query not allowed')
- if max_rows:
- res = c.fetchmany(max_rows)
- else:
- res = c.fetchall()
- desc = c.description
- self.failures = 0
-
- except StandardError, err:
- self._abort()
- raise err
-
- return self.convert_description(desc), res