1 # threads.py -- example of multiple threads using psycopg
2 # -*- encoding: latin1 -*-
4 # Copyright (C) 2001-2004 Federico Di Gregorio <fog@debian.org>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by the
8 # Free Software Foundation; either version 2, or (at your option) any later
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
16 ## put in DSN your DSN string
20 ## some others parameters
21 INSERT_THREADS = ('A', 'B', 'C')
22 SELECT_THREADS = ('1', '2')
31 # the available modes are:
32 # 0 - one connection for all insert and one for all select threads
33 # 1 - connections generated using the connection pool
37 ## don't modify anything below tis line (except for experimenting)
39 import sys, psycopg2, threading
40 from psycopg2.pool import ThreadedConnectionPool
45 MODE = int(sys.argv[2])
47 print "Opening connection using dns:", DSN
48 conn = psycopg2.connect(DSN)
52 curs.execute("""CREATE TABLE test_threads (
53 name text, value1 int4, value2 float)""")
56 curs.execute("DROP TABLE test_threads")
57 curs.execute("""CREATE TABLE test_threads (
58 name text, value1 int4, value2 float)""")
62 ## this function inserts a big number of rows and creates and destroys
63 ## a large number of cursors
65 def insert_func(conn_or_pool, rows):
66 name = threading.currentThread().getName()
71 conn = conn_or_pool.getconn()
74 if divmod(i, COMMIT_STEP)[1] == 0:
77 conn_or_pool.putconn(conn)
78 s = name + ": COMMIT STEP " + str(i)
81 conn = conn_or_pool.getconn()
84 c.execute("INSERT INTO test_threads VALUES (%s, %s, %s)",
85 (str(i), i, float(i)))
86 except psycopg2.ProgrammingError, err:
87 print name, ": an error occurred; skipping this insert"
91 ## a nice select function that prints the current number of rows in the
92 ## database (and transefer them, putting some pressure on the network)
94 def select_func(conn_or_pool, z):
95 name = threading.currentThread().getName()
99 conn.set_isolation_level(0)
101 for i in range(SELECT_SIZE):
102 if divmod(i, SELECT_STEP)[1] == 0:
105 conn = conn_or_pool.getconn()
106 conn.set_isolation_level(0)
108 c.execute("SELECT * FROM test_threads WHERE value2 < %s",
112 conn_or_pool.putconn(conn)
113 s = name + ": number of rows fetched: " + str(len(l))
115 except psycopg2.ProgrammingError, err:
116 print name, ": an error occurred; skipping this select"
119 ## create the connection pool or the connections
121 conn_insert = psycopg2.connect(DSN)
122 conn_select = psycopg2.connect(DSN)
124 m = len(INSERT_THREADS) + len(SELECT_THREADS)
126 conn_insert = conn_select = ThreadedConnectionPool(n, m, DSN)
128 ## create the threads
131 print "Creating INSERT threads:"
132 for name in INSERT_THREADS:
133 t = threading.Thread(None, insert_func, 'Thread-'+name,
138 print "Creating SELECT threads:"
139 for name in SELECT_THREADS:
140 t = threading.Thread(None, select_func, 'Thread-'+name,
141 (conn_select, SELECT_DIV))
145 ## really start the threads now
149 # and wait for them to finish
152 print t.getName(), "exited OK"
156 curs.execute("SELECT count(name) FROM test_threads")
157 print "Inserted", curs.fetchone()[0], "rows."
159 curs.execute("DROP TABLE test_threads")