Merge remote-tracking branch 'origin/pycurl' into planetlab-4_0-branch
[plcapi.git] / psycopg2 / examples / threads.py
1 # threads.py -- example of multiple threads using psycopg
2 # -*- encoding: latin1 -*-
3 #
4 # Copyright (C) 2001-2004 Federico Di Gregorio  <fog@debian.org>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by the
8 # Free Software Foundation; either version 2, or (at your option) any later
9 # version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
13 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 # for more details.
15
16 ## put in DSN your DSN string
17
18 DSN = 'dbname=test'
19
20 ## some others parameters
21 INSERT_THREADS = ('A', 'B', 'C')
22 SELECT_THREADS = ('1', '2')
23
24 ROWS = 1000
25
26 COMMIT_STEP = 20
27 SELECT_SIZE = 10000
28 SELECT_STEP = 500
29 SELECT_DIV  = 250
30
31 # the available modes are:
32 # 0 - one connection for all insert and one for all select threads
33 # 1 - connections generated using the connection pool
34
35 MODE = 1
36
37 ## don't modify anything below tis line (except for experimenting)
38
39 import sys, psycopg2, threading
40 from psycopg2.pool import ThreadedConnectionPool
41
42 if len(sys.argv) > 1:
43     DSN = sys.argv[1]
44 if len(sys.argv) > 2:
45     MODE = int(sys.argv[2])
46     
47 print "Opening connection using dns:", DSN
48 conn = psycopg2.connect(DSN)
49 curs = conn.cursor()
50
51 try:
52     curs.execute("""CREATE TABLE test_threads (
53                         name text, value1 int4, value2 float)""")
54 except:
55     conn.rollback()
56     curs.execute("DROP TABLE test_threads")
57     curs.execute("""CREATE TABLE test_threads (
58                         name text, value1 int4, value2 float)""")
59 conn.commit()
60
61
62 ## this function inserts a big number of rows and creates and destroys
63 ## a large number of cursors
64
65 def insert_func(conn_or_pool, rows):
66     name = threading.currentThread().getName()
67
68     if MODE == 0:
69         conn = conn_or_pool
70     else:
71         conn = conn_or_pool.getconn()
72         
73     for i in range(rows):
74         if divmod(i, COMMIT_STEP)[1] == 0:
75             conn.commit()
76             if MODE == 1:
77                 conn_or_pool.putconn(conn)
78             s = name + ": COMMIT STEP " + str(i)
79             print s
80             if MODE == 1:
81                 conn = conn_or_pool.getconn()
82         c = conn.cursor()
83         try:
84             c.execute("INSERT INTO test_threads VALUES (%s, %s, %s)",
85                       (str(i), i, float(i)))
86         except psycopg2.ProgrammingError, err:
87             print name, ": an error occurred; skipping this insert"
88             print err
89     conn.commit()
90
91 ## a nice select function that prints the current number of rows in the
92 ## database (and transefer them, putting some pressure on the network)
93     
94 def select_func(conn_or_pool, z):
95     name = threading.currentThread().getName()
96
97     if MODE == 0:
98         conn = conn_or_pool
99         conn.set_isolation_level(0)
100     
101     for i in range(SELECT_SIZE):
102         if divmod(i, SELECT_STEP)[1] == 0:
103             try:
104                 if MODE == 1:
105                     conn = conn_or_pool.getconn()
106                     conn.set_isolation_level(0)
107                 c = conn.cursor()
108                 c.execute("SELECT * FROM test_threads WHERE value2 < %s",
109                           (int(i/z),))
110                 l = c.fetchall()
111                 if MODE == 1:
112                     conn_or_pool.putconn(conn)
113                 s = name + ": number of rows fetched: " + str(len(l))
114                 print s
115             except psycopg2.ProgrammingError, err:
116                 print name, ": an error occurred; skipping this select"
117                 print err
118
119 ## create the connection pool or the connections
120 if MODE == 0:
121     conn_insert = psycopg2.connect(DSN)
122     conn_select = psycopg2.connect(DSN)
123 else:
124     m = len(INSERT_THREADS) + len(SELECT_THREADS)
125     n = m/2
126     conn_insert = conn_select = ThreadedConnectionPool(n, m, DSN)
127     
128 ## create the threads
129 threads = []
130
131 print "Creating INSERT threads:"
132 for name in INSERT_THREADS:
133     t = threading.Thread(None, insert_func, 'Thread-'+name,
134                          (conn_insert, ROWS))
135     t.setDaemon(0)
136     threads.append(t)
137
138 print "Creating SELECT threads:"
139 for name in SELECT_THREADS:
140     t = threading.Thread(None, select_func, 'Thread-'+name,
141                          (conn_select, SELECT_DIV))
142     t.setDaemon(0)
143     threads.append(t)
144
145 ## really start the threads now
146 for t in threads:
147     t.start()
148
149 # and wait for them to finish
150 for t in threads:
151     t.join()
152     print t.getName(), "exited OK"
153
154
155 conn.commit()
156 curs.execute("SELECT count(name) FROM test_threads")
157 print "Inserted", curs.fetchone()[0], "rows."
158
159 curs.execute("DROP TABLE test_threads")
160 conn.commit()