# ZPsycopgDA/DA.py - ZPsycopgDA Zope product: Database Connection # # Copyright (C) 2004 Federico Di Gregorio # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2, or (at your option) any later # version. # # Or, at your option this program (ZPsycopgDA) can be distributed under the # Zope Public License (ZPL) Version 1.0, as published on the Zope web site, # http://www.zope.org/Resources/ZPL. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY # or FITNESS FOR A PARTICULAR PURPOSE. # # See the LICENSE file for details. ALLOWED_PSYCOPG_VERSIONS = ('2.0.1', '2.0.2', '2.0.3', '2.0.4', '2.0.5') import sys import time import db import re import Acquisition import Shared.DC.ZRDB.Connection from db import DB from Globals import HTMLFile from ExtensionClass import Base from App.Dialogs import MessageDialog from DateTime import DateTime # Build Zope version in a float for later cheks import App zope_version = App.version_txt.getZopeVersion() zope_version = float("%s.%s" %(zope_version[:2])) # ImageFile is deprecated in Zope >= 2.9 if zope_version < 2.9: from ImageFile import ImageFile else: from App.ImageFile import ImageFile # import psycopg and functions/singletons needed for date/time conversions import psycopg2 from psycopg2 import NUMBER, STRING, ROWID, DATETIME from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE from psycopg2.extensions import TIME, INTERVAL from psycopg2.extensions import new_type, register_type # add a new connection to a folder manage_addZPsycopgConnectionForm = HTMLFile('dtml/add',globals()) def manage_addZPsycopgConnection(self, id, title, connection_string, zdatetime=None, tilevel=2, check=None, REQUEST=None): """Add a DB connection to a folder.""" self._setObject(id, Connection(id, title, connection_string, zdatetime, check, tilevel)) if REQUEST is not None: return self.manage_main(self, REQUEST) # the connection object class Connection(Shared.DC.ZRDB.Connection.Connection): """ZPsycopg Connection.""" _isAnSQLConnection = 1 id = 'Psycopg2_database_connection' database_type = 'Psycopg2' meta_type = title = 'Z Psycopg 2 Database Connection' icon = 'misc_/conn' def __init__(self, id, title, connection_string, zdatetime, check=None, tilevel=2, encoding=''): self.zdatetime = zdatetime self.id = str(id) self.edit(title, connection_string, zdatetime, check=check, tilevel=tilevel, encoding=encoding) def factory(self): return DB ## connection parameters editing ## def edit(self, title, connection_string, zdatetime, check=None, tilevel=2, encoding=''): self.title = title self.connection_string = connection_string self.zdatetime = zdatetime self.tilevel = tilevel self.encoding = encoding self.set_type_casts() if check: self.connect(self.connection_string) manage_properties = HTMLFile('dtml/edit', globals()) def manage_edit(self, title, connection_string, zdatetime=None, check=None, tilevel=2, encoding='UTF-8', REQUEST=None): """Edit the DB connection.""" self.edit(title, connection_string, zdatetime, check=check, tilevel=tilevel, encoding=encoding) if REQUEST is not None: msg = "Connection edited." return self.manage_main(self,REQUEST,manage_tabs_message=msg) def connect(self, s): try: self._v_database_connection.close() except: pass # check psycopg version and raise exception if does not match if psycopg2.__version__[:5] not in ALLOWED_PSYCOPG_VERSIONS: raise ImportError("psycopg version mismatch (imported %s)" % psycopg2.__version__) self.set_type_casts() self._v_connected = '' dbf = self.factory() # TODO: let the psycopg exception propagate, or not? self._v_database_connection = dbf( self.connection_string, self.tilevel, self.encoding) self._v_database_connection.open() self._v_connected = DateTime() return self def set_type_casts(self): # note that in both cases order *is* important if self.zdatetime: # use zope internal datetime routines register_type(ZDATETIME) register_type(ZDATE) register_type(ZTIME) else: # use the standard register_type(DATETIME) register_type(DATE) register_type(TIME) ## browsing and table/column management ## manage_options = Shared.DC.ZRDB.Connection.Connection.manage_options # + ( # {'label': 'Browse', 'action':'manage_browse'},) #manage_tables = HTMLFile('dtml/tables', globals()) #manage_browse = HTMLFile('dtml/browse', globals()) info = None def table_info(self): return self._v_database_connection.table_info() def __getitem__(self, name): if name == 'tableNamed': if not hasattr(self, '_v_tables'): self.tpValues() return self._v_tables.__of__(self) raise KeyError, name def tpValues(self): res = [] conn = self._v_database_connection for d in conn.tables(rdb=0): try: name = d['TABLE_NAME'] b = TableBrowser() b.__name__ = name b._d = d b._c = c try: b.icon = table_icons[d['TABLE_TYPE']] except: pass r.append(b) except: pass return res ## database connection registration data ## classes = (Connection,) meta_types = ({'name':'Z Psycopg 2 Database Connection', 'action':'manage_addZPsycopgConnectionForm'},) folder_methods = { 'manage_addZPsycopgConnection': manage_addZPsycopgConnection, 'manage_addZPsycopgConnectionForm': manage_addZPsycopgConnectionForm} __ac_permissions__ = ( ('Add Z Psycopg Database Connections', ('manage_addZPsycopgConnectionForm', 'manage_addZPsycopgConnection')),) # add icons misc_={'conn': ImageFile('Shared/DC/ZRDB/www/DBAdapterFolder_icon.gif')} for icon in ('table', 'view', 'stable', 'what', 'field', 'text', 'bin', 'int', 'float', 'date', 'time', 'datetime'): misc_[icon] = ImageFile('icons/%s.gif' % icon, globals()) ## zope-specific psycopg typecasters ## # convert an ISO timestamp string from postgres to a Zope DateTime object def _cast_DateTime(iso, curs): if iso: return DateTime(re.split("GMT\+?|GMT-?", iso)[0]) # this will split us into [date, time, GMT/AM/PM(if there)] # dt = str.split(' ') # if len(dt) > 1: # # we now should split out any timezone info # dt[1] = dt[1].split('-')[0] # dt[1] = dt[1].split('+')[0] # return DateTime(' '.join(dt[:2])) # else: # return DateTime(dt[0]) # convert an ISO date string from postgres to a Zope DateTime object def _cast_Date(iso, curs): if iso: return DateTime(iso) # Convert a time string from postgres to a Zope DateTime object. # NOTE: we set the day as today before feeding to DateTime so # that it has the same DST settings. def _cast_Time(iso, curs): if iso: return DateTime(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())[:3]+ time.strptime(iso[:8], "%H:%M:%S")[3:])) # NOTE: we don't cast intervals anymore because they are passed # untouched to Zope. def _cast_Interval(iso, curs): return iso ZDATETIME = new_type((1184, 1114), "ZDATETIME", _cast_DateTime) ZINTERVAL = new_type((1186,), "ZINTERVAL", _cast_Interval) ZDATE = new_type((1082,), "ZDATE", _cast_Date) ZTIME = new_type((1083,), "ZTIME", _cast_Time) ## table browsing helpers ## class TableBrowserCollection(Acquisition.Implicit): pass class Browser(Base): def __getattr__(self, name): try: return self._d[name] except KeyError: raise AttributeError, name class values: def len(self): return 1 def __getitem__(self, i): try: return self._d[i] except AttributeError: pass self._d = self._f() return self._d[i] class TableBrowser(Browser, Acquisition.Implicit): icon = 'what' Description = check = '' info = HTMLFile('table_info', globals()) menu = HTMLFile('table_menu', globals()) def tpValues(self): v = values() v._f = self.tpValues_ return v def tpValues_(self): r=[] tname=self.__name__ for d in self._c.columns(tname): b=ColumnBrowser() b._d=d try: b.icon=field_icons[d['Type']] except: pass b.TABLE_NAME=tname r.append(b) return r def tpId(self): return self._d['TABLE_NAME'] def tpURL(self): return "Table/%s" % self._d['TABLE_NAME'] def Name(self): return self._d['TABLE_NAME'] def Type(self): return self._d['TABLE_TYPE'] manage_designInput=HTMLFile('designInput',globals()) def manage_buildInput(self, id, source, default, REQUEST=None): "Create a database method for an input form" args=[] values=[] names=[] columns=self._columns for i in range(len(source)): s=source[i] if s=='Null': continue c=columns[i] d=default[i] t=c['Type'] n=c['Name'] names.append(n) if s=='Argument': values.append("'" % (n, vartype(t))) a='%s%s' % (n, boboType(t)) if d: a="%s=%s" % (a,d) args.append(a) elif s=='Property': values.append("'" % (n, vartype(t))) else: if isStringType(t): if find(d,"\'") >= 0: d=join(split(d,"\'"),"''") values.append("'%s'" % d) elif d: values.append(str(d)) else: raise ValueError, ( 'no default was given for %s' % n) class ColumnBrowser(Browser): icon='field' def check(self): return ('\t' % (self.TABLE_NAME, self._d['Name'])) def tpId(self): return self._d['Name'] def tpURL(self): return "Column/%s" % self._d['Name'] def Description(self): d=self._d if d['Scale']: return " %(Type)s(%(Precision)s,%(Scale)s) %(Nullable)s" % d else: return " %(Type)s(%(Precision)s) %(Nullable)s" % d table_icons={ 'TABLE': 'table', 'VIEW':'view', 'SYSTEM_TABLE': 'stable', } field_icons={ NUMBER.name: 'i', STRING.name: 'text', DATETIME.name: 'date', INTEGER.name: 'int', FLOAT.name: 'float', BOOLEAN.name: 'bin', ROWID.name: 'int' }