This commit was generated by cvs2svn to compensate for changes in r431,
[plcapi.git] / psycopg2 / psycopg / pqpath.c
1 /* pqpath.c - single path into libpq
2  *
3  * Copyright (C) 2003 Federico Di Gregorio <fog@debian.org>
4  *
5  * This file is part of psycopg.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * as published by the Free Software Foundation; either version 2,
10  * or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20  */
21
22 /* IMPORTANT NOTE: no function in this file do its own connection locking
23    except for pg_execute and pq_fetch (that are somehow high-level. This means
24    that all the othe functions should be called while holding a lock to the
25    connection.
26 */
27
28 #include <Python.h>
29 #include <string.h>
30
31 #define PSYCOPG_MODULE
32 #include "psycopg/config.h"
33 #include "psycopg/python.h"
34 #include "psycopg/psycopg.h"
35 #include "psycopg/pqpath.h"
36 #include "psycopg/connection.h"
37 #include "psycopg/cursor.h"
38 #include "psycopg/typecast.h"
39 #include "psycopg/pgtypes.h"
40 #include "psycopg/pgversion.h"
41
42 /* pq_raise - raise a python exception of the right kind
43
44    This function should be called while holding the GIL. */
45
46 void
47 pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg)
48 {
49     PyObject *pgc = (PyObject*)curs;
50     
51     char *err = NULL;
52     char *err2 = NULL;
53     char *code = NULL;
54     char *buf = NULL;
55     
56     if ((conn == NULL && curs == NULL) || (curs != NULL && conn == NULL)) {
57         PyErr_SetString(Error, "psycopg went psycotic and raised a null error");
58         return;
59     }
60     
61     if (curs && curs->pgres) {
62         err = PQresultErrorMessage(curs->pgres);
63 #ifdef HAVE_PQPROTOCOL3
64         if (err != NULL && conn->protocol == 3) {
65             code = PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
66         }
67 #endif
68     }
69     if (err == NULL)
70         err = PQerrorMessage(conn->pgconn);
71
72     /* if the is no error message we probably called pq_raise without reason:
73        we need to set an exception anyway because the caller will probably
74        raise and a meaningful message is better than an empty one */
75     if (err == NULL) {
76         PyErr_SetString(Error, "psycopg went psycotic without error set");
77         return;
78     }
79     
80     /* if exc is NULL, analyze the message and try to deduce the right
81        exception kind (only if we have a pgres, obviously) */
82     if (exc == NULL) {
83         if (curs && curs->pgres) {
84             if (conn->protocol == 3) {
85 #ifdef HAVE_PQPROTOCOL3
86                 char *pgstate =
87                     PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
88                 if (pgstate != NULL && !strncmp(pgstate, "23", 2))
89                     exc = IntegrityError;
90                 else
91                     exc = ProgrammingError;
92 #endif
93             }
94         }
95     }
96     
97     /* if exc is still NULL psycopg was not built with HAVE_PQPROTOCOL3 or the
98        connection is using protocol 2: in both cases we default to comparing
99        error messages */
100     if (exc == NULL) {
101         if (!strncmp(err, "ERROR:  Cannot insert a duplicate key", 37)
102             || !strncmp(err, "ERROR:  ExecAppend: Fail to add null", 36)
103             || strstr(err, "referential integrity violation"))
104             exc = IntegrityError;
105         else
106             exc = ProgrammingError;
107     }
108     
109     /* try to remove the initial "ERROR: " part from the postgresql error */
110     if (err && strlen(err) > 8) err2 = &(err[8]);
111     else err2 = err;
112
113     /* if msg is not NULL, add it to the error message, after a '\n' */
114     if (msg && code) {
115         int len = strlen(code) + strlen(err) + strlen(msg) + 5;
116         if ((buf = PyMem_Malloc(len))) {
117             snprintf(buf, len, "[%s] %s\n%s", code, err2, msg);
118             psyco_set_error(exc, pgc, buf, err, code);
119         }
120     }
121     else if (msg) {
122         int len = strlen(err) + strlen(msg) + 2;
123         if ((buf = PyMem_Malloc(len))) {
124             snprintf(buf, len, "%s\n%s", err2, msg);
125             psyco_set_error(exc, pgc, buf, err, code);
126         }
127     }
128     else {
129         psyco_set_error(exc, pgc, err2, err, code);        
130     }
131     
132     if (buf != NULL) PyMem_Free(buf);
133 }
134
135 /* pq_set_critical, pq_resolve_critical - manage critical errors
136
137    this function is invoked when a PQexec() call returns NULL, meaning a
138    critical condition like out of memory or lost connection. it save the error
139    message and mark the connection as 'wanting cleanup'.
140
141    both functions do not call any Py_*_ALLOW_THREADS macros.
142    pq_resolve_critical should be called while holding the GIL. */
143
144 void
145 pq_set_critical(connectionObject *conn, const char *msg)
146 {
147     if (msg == NULL) 
148         msg = PQerrorMessage(conn->pgconn);
149     if (conn->critical) free(conn->critical);
150     if (msg && msg[0] != '\0') conn->critical = strdup(msg);
151     else conn->critical = NULL;
152 }
153
154 PyObject *
155 pq_resolve_critical(connectionObject *conn, int close)
156 {
157     Dprintf("pq_resolve_critical: resolving %s", conn->critical);
158     
159     if (conn->critical) {
160         char *msg = &(conn->critical[6]);
161         Dprintf("pq_resolve_critical: error = %s", msg);
162         /* we can't use pq_raise because the error has already been cleared
163            from the connection, so we just raise an OperationalError with the
164            critical message */
165         PyErr_SetString(OperationalError, msg);
166         
167         /* we don't want to destroy this connection but just close it */
168         if (close == 1) conn_close(conn);
169     }
170     return NULL;
171 }
172
173 /* pq_clear_async - clear the effects of a previous async query
174
175    note that this function does block because it needs to wait for the full
176    result sets of the previous query to clear them.
177
178    
179    this function does not call any Py_*_ALLOW_THREADS macros */
180
181 void
182 pq_clear_async(connectionObject *conn)
183 {
184     PGresult *pgres;
185
186     do {
187         pgres = PQgetResult(conn->pgconn);
188         Dprintf("pq_clear_async: clearing PGresult at %p", pgres);
189         IFCLEARPGRES(pgres);
190     } while (pgres != NULL);
191 }
192
193 /* pq_begin - send a BEGIN WORK, if necessary
194
195    this function does not call any Py_*_ALLOW_THREADS macros */
196
197 int
198 pq_begin(connectionObject *conn)
199 {
200     const char *query[] = {
201         NULL,
202         "BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
203         "BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"};
204     
205     int pgstatus, retvalue = -1;
206     PGresult *pgres = NULL;
207
208     Dprintf("pq_begin: pgconn = %p, isolevel = %ld, status = %d",
209             conn->pgconn, conn->isolation_level, conn->status);
210
211     if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) {
212         Dprintf("pq_begin: transaction in progress");
213         return 0;
214     }
215
216     pq_clear_async(conn);
217     pgres = PQexec(conn->pgconn, query[conn->isolation_level]);
218     if (pgres == NULL) {
219         Dprintf("pq_begin: PQexec() failed");
220         pq_set_critical(conn, NULL);
221         goto cleanup;
222     }
223
224     pgstatus = PQresultStatus(pgres);
225     if (pgstatus != PGRES_COMMAND_OK ) {
226         Dprintf("pq_begin: result is NOT OK");
227         pq_set_critical(conn, NULL);
228         goto cleanup;
229     }
230     Dprintf("pq_begin: issued '%s' command", query[conn->isolation_level]);
231
232     retvalue = 0;
233     conn->status = CONN_STATUS_BEGIN;
234
235  cleanup:
236     IFCLEARPGRES(pgres);
237     return retvalue;
238 }
239
240 /* pq_commit - send an END, if necessary
241
242    this function does not call any Py_*_ALLOW_THREADS macros */
243
244 int
245 pq_commit(connectionObject *conn)
246 {
247     const char *query = "END";
248     int pgstatus, retvalue = -1;
249     PGresult *pgres = NULL;
250
251     Dprintf("pq_commit: pgconn = %p, isolevel = %ld, status = %d",
252             conn->pgconn, conn->isolation_level, conn->status);
253
254     if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
255         Dprintf("pq_commit: no transaction to commit");
256         return 0;
257     }
258
259     pq_clear_async(conn);
260     pgres = PQexec(conn->pgconn, query);
261     if (pgres == NULL) {
262         Dprintf("pq_commit: PQexec() failed");
263         pq_set_critical(conn, NULL);
264         goto cleanup;
265     }
266
267     pgstatus = PQresultStatus(pgres);
268     if (pgstatus != PGRES_COMMAND_OK ) {
269         Dprintf("pq_commit: result is NOT OK");
270         pq_set_critical(conn, NULL);
271         goto cleanup;
272     }
273     Dprintf("pq_commit: issued '%s' command", query);
274
275     retvalue = 0;
276     conn->status = CONN_STATUS_READY;
277
278  cleanup:
279     IFCLEARPGRES(pgres);
280     return retvalue;
281 }
282
283 /* pq_abort - send an ABORT, if necessary
284
285    this function does not call any Py_*_ALLOW_THREADS macros */
286
287 int
288 pq_abort(connectionObject *conn)
289 {
290     const char *query = "ABORT";
291     int pgstatus, retvalue = -1;
292     PGresult *pgres = NULL;
293
294     Dprintf("pq_abort: pgconn = %p, isolevel = %ld, status = %d",
295             conn->pgconn, conn->isolation_level, conn->status);
296
297     if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
298         Dprintf("pq_abort: no transaction to abort");
299         return 0;
300     }
301
302     pq_clear_async(conn);
303     pgres = PQexec(conn->pgconn, query);
304     if (pgres == NULL) {
305         Dprintf("pq_abort: PQexec() failed");
306         pq_set_critical(conn, NULL);
307         goto cleanup;
308     }
309
310     pgstatus = PQresultStatus(pgres);
311     if (pgstatus != PGRES_COMMAND_OK ) {
312         Dprintf("pq_abort: result is NOT OK");
313         pq_set_critical(conn, NULL);
314         goto cleanup;
315     }
316     Dprintf("pq_abort: issued '%s' command", query);
317
318     retvalue = 0;
319     conn->status = CONN_STATUS_READY;
320
321  cleanup:
322     IFCLEARPGRES(pgres);
323     return retvalue;
324 }
325
326 /* pq_is_busy - consume input and return connection status
327  
328    a status of 1 means that a call to pq_fetch will block, while a status of 0
329    means that there is data available to be collected. -1 means an error, the
330    exception will be set accordingly.
331
332    this fucntion locks the connection object
333    this function call Py_*_ALLOW_THREADS macros */
334
335 int
336 pq_is_busy(connectionObject *conn)
337 {
338     PGnotify *pgn;
339     
340     Dprintf("pq_is_busy: consuming input");
341
342     Py_BEGIN_ALLOW_THREADS;
343     pthread_mutex_lock(&(conn->lock));
344
345     if (PQconsumeInput(conn->pgconn) == 0) {
346         Dprintf("pq_is_busy: PQconsumeInput() failed");
347         pthread_mutex_unlock(&(conn->lock));
348         Py_BLOCK_THREADS;
349         PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
350         return -1;
351     }
352
353     pthread_mutex_unlock(&(conn->lock));
354     Py_END_ALLOW_THREADS;
355     
356     /* now check for notifies */
357     while ((pgn = PQnotifies(conn->pgconn)) != NULL) {
358         PyObject *notify;
359         
360         Dprintf("curs_is_busy: got NOTIFY from pid %d, msg = %s",
361                 pgn->be_pid, pgn->relname);
362
363         notify = PyTuple_New(2);
364         PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
365         PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
366         PyList_Append(conn->notifies, notify);
367         free(pgn);
368     }
369     
370     return PQisBusy(conn->pgconn);
371 }
372
373 /* pq_execute - execute a query, possibly asyncronously
374
375    this fucntion locks the connection object
376    this function call Py_*_ALLOW_THREADS macros */
377
378 int
379 pq_execute(cursorObject *curs, const char *query, int async)
380 {
381     /* if the status of the connection is critical raise an exception and
382        definitely close the connection */
383     if (curs->conn->critical) {
384         pq_resolve_critical(curs->conn, 1);
385         return -1;
386     }
387
388     /* check status of connection, raise error if not OK */
389     if (PQstatus(curs->conn->pgconn) != CONNECTION_OK) {
390         Dprintf("pq_execute: connection NOT OK");
391         PyErr_SetString(OperationalError, PQerrorMessage(curs->conn->pgconn));
392         return -1;
393     }
394     Dprintf("curs_execute: pg connection at %p OK", curs->conn->pgconn);
395
396     Py_BEGIN_ALLOW_THREADS;
397     pthread_mutex_lock(&(curs->conn->lock));
398
399     pq_begin(curs->conn);
400
401     if (async == 0) {
402         IFCLEARPGRES(curs->pgres);
403         Dprintf("pq_execute: executing SYNC query:");
404         Dprintf("    %-.200s", query);
405         curs->pgres = PQexec(curs->conn->pgconn, query);
406     }
407
408     else if (async == 1) {
409         /* first of all, let see if the previous query has already ended, if
410            not what should we do? just block and discard data or execute
411            another query? */
412         pq_clear_async(curs->conn);
413         
414         Dprintf("pq_execute: executing ASYNC query:");
415         Dprintf("    %-.200s", query);
416         
417         /* then we can go on and send a new query without fear */
418         IFCLEARPGRES(curs->pgres);
419         if (PQsendQuery(curs->conn->pgconn, query) == 0) {
420             pthread_mutex_unlock(&(curs->conn->lock));
421             Py_BLOCK_THREADS;
422             PyErr_SetString(OperationalError,
423                             PQerrorMessage(curs->conn->pgconn));
424             return -1;
425         }
426         Dprintf("pq_execute: async query sent to backend");
427     }
428     
429     pthread_mutex_unlock(&(curs->conn->lock));
430     Py_END_ALLOW_THREADS;
431     
432     /* if the execute was sync, we call pq_fetch() immediately,
433        to respect the old DBAPI-2.0 compatible behaviour */
434     if (async == 0) {
435         Dprintf("pq_execute: entering syncronous DBAPI compatibility mode");
436         if (pq_fetch(curs) == -1) return -1;
437     }
438     else {
439         curs->conn->async_cursor = (PyObject*)curs;
440     }
441     
442     return 1-async;
443 }
444
445
446 /* pq_fetch - fetch data after a query
447
448    this fucntion locks the connection object
449    this function call Py_*_ALLOW_THREADS macros
450
451    return value:
452      -1 - some error occurred while calling libpq
453       0 - no result from the backend but no libpq errors
454       1 - result from backend (possibly data is ready)
455 */
456
457 static void
458 _pq_fetch_tuples(cursorObject *curs)
459 {
460     int i, *dsize = NULL;
461
462     int pgnfields = PQnfields(curs->pgres);
463     int pgbintuples = PQbinaryTuples(curs->pgres);
464
465     curs->notuples = 0;
466
467     /* create the tuple for description and typecasting */
468     Py_XDECREF(curs->description);
469     Py_XDECREF(curs->casts);
470     curs->description = PyTuple_New(pgnfields);
471     curs->casts = PyTuple_New(pgnfields);
472     curs->columns = pgnfields;
473     
474     /* calculate the display size for each column (cpu intensive, can be
475        switched off at configuration time) */
476 #ifdef PSYCOPG_DISPLAY_SIZE
477     dsize = (int *)PyMem_Malloc(pgnfields * sizeof(int));
478     if (dsize != NULL) {
479         int j, len;
480         for (i=0; i < pgnfields; i++) {
481             dsize[i] = -1;
482         }
483         for (j = 0; j < curs->rowcount; j++) {
484             for (i = 0; i < pgnfields; i++) {
485                 len = PQgetlength(curs->pgres, j, i);
486                 if (len > dsize[i]) dsize[i] = len;
487             }
488         }
489     }
490 #endif
491
492     /* calculate various parameters and typecasters */
493     for (i = 0; i < pgnfields; i++) {
494         Oid ftype = PQftype(curs->pgres, i);
495         int fsize = PQfsize(curs->pgres, i);
496         int fmod =  PQfmod(curs->pgres, i);
497         
498         PyObject *dtitem = PyTuple_New(7);
499         PyObject *type = PyInt_FromLong(ftype);
500         PyObject *cast = NULL;
501         
502         PyTuple_SET_ITEM(curs->description, i, dtitem);
503         
504         /* fill the right cast function by accessing the global dictionary of
505            casting objects.  if we got no defined cast use the default
506            one */
507         if (!(cast = PyDict_GetItem(curs->casts, type))) {
508             Dprintf("_pq_fetch_tuples: cast %d not in per-cursor dict", ftype);
509             if (!(cast = PyDict_GetItem(psyco_types, type))) {
510                 Dprintf("_pq_fetch_tuples: cast %d not found, using default",
511                         PQftype(curs->pgres,i));
512                 cast = psyco_default_cast;
513             }
514         }
515         /* else if we got binary tuples and if we got a field that
516            is binary use the default cast
517            FIXME: what the hell am I trying to do here? This just can't work..
518         */
519         else if (pgbintuples && cast == psyco_default_binary_cast) {
520             Dprintf("_pq_fetch_tuples: Binary cursor and "
521                     "binary field: %i using default cast",
522                     PQftype(curs->pgres,i));
523             cast = psyco_default_cast;
524         }
525         Dprintf("_pq_fetch_tuples: using cast at %p (%s) for type %d",
526                 cast, PyString_AS_STRING(((typecastObject*)cast)->name),
527                 PQftype(curs->pgres,i));
528         Py_INCREF(cast);
529         PyTuple_SET_ITEM(curs->casts, i, cast);
530     
531
532         /* 1/ fill the other fields */
533         PyTuple_SET_ITEM(dtitem, 0,
534                          PyString_FromString(PQfname(curs->pgres, i)));
535         PyTuple_SET_ITEM(dtitem, 1, type);
536
537         /* 2/ display size is the maximum size of this field result tuples. */
538         if (dsize && dsize[i] >= 0) {
539             PyTuple_SET_ITEM(dtitem, 2, PyInt_FromLong(dsize[i]));
540         }
541         else {
542             Py_INCREF(Py_None);
543             PyTuple_SET_ITEM(dtitem, 2, Py_None);
544         }
545
546         /* 3/ size on the backend */
547         if (fmod > 0) fmod = fmod - sizeof(int);
548         if (fsize == -1) {
549             if (ftype == NUMERICOID) {
550                 PyTuple_SET_ITEM(dtitem, 3,
551                                  PyInt_FromLong((fmod >> 16) & 0xFFFF));
552             }
553             else { /* If variable length record, return maximum size */
554                 PyTuple_SET_ITEM(dtitem, 3, PyInt_FromLong(fmod));
555             }
556         }
557         else {
558             PyTuple_SET_ITEM(dtitem, 3, PyInt_FromLong(fsize));
559         }
560
561         /* 4,5/ scale and precision */
562         if (ftype == NUMERICOID) {
563             PyTuple_SET_ITEM(dtitem, 4, PyInt_FromLong((fmod >> 16) & 0xFFFF));
564             PyTuple_SET_ITEM(dtitem, 5, PyInt_FromLong((fmod & 0xFFFF) - 4));
565         }
566         else {
567             Py_INCREF(Py_None);
568             PyTuple_SET_ITEM(dtitem, 4, Py_None);
569             Py_INCREF(Py_None);
570             PyTuple_SET_ITEM(dtitem, 5, Py_None);
571         }
572
573         /* 6/ FIXME: null_ok??? */
574         Py_INCREF(Py_None);
575         PyTuple_SET_ITEM(dtitem, 6, Py_None);
576     }
577     
578     if (dsize) PyMem_Free(dsize);
579 }
580
581 #ifdef HAVE_PQPROTOCOL3
582 static int
583 _pq_copy_in_v3(cursorObject *curs)
584 {
585     /* COPY FROM implementation when protocol 3 is available: this function
586        uses the new PQputCopyData() and can detect errors and set the correct
587        exception */
588     PyObject *o;
589     int length = 0, error = 0;
590     
591     while (1) {
592         o = PyObject_CallMethod(curs->copyfile, "read", "i", curs->copysize);
593         if (!o || !PyString_Check(o) || (length = PyString_Size(o)) == -1) {
594             error = 1;
595         }
596         if (length == 0 || error == 1) break;
597         
598         Py_BEGIN_ALLOW_THREADS;
599         if (PQputCopyData(curs->conn->pgconn,
600                           PyString_AS_STRING(o), length) == -1) {
601             error = 2;
602         }
603         Py_END_ALLOW_THREADS;
604
605         if (error == 2) break;
606         
607         Py_DECREF(o);
608     }
609     
610     Py_XDECREF(o);
611     
612     if (error == 0 || error == 2)
613         /* 0 means that the copy went well, 2 that there was an error on the
614            backend: in both cases we'll get the error message from the
615            PQresult */
616         PQputCopyEnd(curs->conn->pgconn, NULL);
617     else
618         PQputCopyEnd(curs->conn->pgconn, "error during .read() call");
619
620     /* and finally we grab the operation result from the backend */
621     IFCLEARPGRES(curs->pgres);
622     while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
623         if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
624             pq_raise(curs->conn, curs, NULL, NULL);
625         IFCLEARPGRES(curs->pgres);
626     }
627
628     return 1;
629 }
630 #endif
631 static int
632 _pq_copy_in(cursorObject *curs)
633 {
634     /* COPY FROM implementation when protocol 3 is not available: this
635        function can't fail but the backend will send an ERROR notice that will
636        be catched by our notice collector */
637     PyObject *o;
638
639     while (1) {
640         o = PyObject_CallMethod(curs->copyfile, "readline", NULL);
641         if (!o || o == Py_None || PyString_GET_SIZE(o) == 0) break;
642         if (PQputline(curs->conn->pgconn, PyString_AS_STRING(o)) != 0) {
643             Py_DECREF(o);
644             return -1;
645         }
646         Py_DECREF(o);
647     }
648     Py_XDECREF(o);
649     PQputline(curs->conn->pgconn, "\\.\n");
650     PQendcopy(curs->conn->pgconn);
651
652     /* if for some reason we're using a protocol 3 libpq to connect to a
653        protocol 2 backend we still need to cycle on the result set */
654     IFCLEARPGRES(curs->pgres);
655     while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
656         if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
657             pq_raise(curs->conn, curs, NULL, NULL);
658         IFCLEARPGRES(curs->pgres);
659     }
660
661     return 1;
662 }
663
664 #ifdef HAVE_PQPROTOCOL3
665 static int
666 _pq_copy_out_v3(cursorObject *curs)
667 {
668     char *buffer;
669     int len;
670     
671     while (1) {
672         Py_BEGIN_ALLOW_THREADS;
673         len = PQgetCopyData(curs->conn->pgconn, &buffer, 0);
674         Py_END_ALLOW_THREADS;
675             
676         if (len > 0 && buffer) {
677             PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len);
678             PQfreemem(buffer);
679         }
680         /* we break on len == 0 but note that that should *not* happen,
681            because we are not doing an async call (if it happens blame
682            postgresql authors :/) */
683         else if (len <= 0) break;
684     }
685     
686     if (len == -2) {
687         pq_raise(curs->conn, NULL, NULL, NULL);
688         return -1;
689     }
690
691     /* and finally we grab the operation result from the backend */
692     IFCLEARPGRES(curs->pgres);
693     while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
694         if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
695             pq_raise(curs->conn, curs, NULL, NULL);
696         IFCLEARPGRES(curs->pgres);
697     }
698     return 1;
699 }
700 #endif
701
702 static int
703 _pq_copy_out(cursorObject *curs)
704 {
705     char buffer[4096];
706     int status, len;
707
708     while (1) {
709         Py_BEGIN_ALLOW_THREADS;
710         status = PQgetline(curs->conn->pgconn, buffer, 4096);
711         Py_END_ALLOW_THREADS;
712         if (status == 0) {
713             if (buffer[0] == '\\' && buffer[1] == '.') break;
714
715             len = strlen(buffer);
716             buffer[len++] = '\n';
717         }
718         else if (status == 1) {
719             len = 4096-1;
720         }
721         else {
722             return -1;
723         }
724         
725         PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len);
726     }
727
728     status = 1;
729     if (PQendcopy(curs->conn->pgconn) != 0)
730         status = -1;
731     
732     /* if for some reason we're using a protocol 3 libpq to connect to a
733        protocol 2 backend we still need to cycle on the result set */
734     IFCLEARPGRES(curs->pgres);
735     while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
736         if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
737             pq_raise(curs->conn, curs, NULL, NULL);
738         IFCLEARPGRES(curs->pgres);
739     }
740
741     return status;
742 }
743
744 int
745 pq_fetch(cursorObject *curs)
746 {
747     int pgstatus, ex = -1;
748
749     /* even if we fail, we remove any information about the previous query */
750     curs_reset(curs);
751     
752     /* we check the result from the previous execute; if the result is not
753        already there, we need to consume some input and go to sleep until we
754        get something edible to eat */
755     if (!curs->pgres) {
756         
757         Dprintf("pq_fetch: no data: entering polling loop");
758         
759         while (pq_is_busy(curs->conn) > 0) {
760             fd_set rfds;
761             struct timeval tv;
762             int sval, sock;
763
764             Py_BEGIN_ALLOW_THREADS;
765             pthread_mutex_lock(&(curs->conn->lock));
766
767             sock = PQsocket(curs->conn->pgconn);
768             FD_ZERO(&rfds);
769             FD_SET(sock, &rfds);
770
771             /* set a default timeout of 5 seconds
772                TODO: make use of the timeout, maybe allowing the user to
773                make a non-blocking (timeouted) call to fetchXXX */
774             tv.tv_sec = 5;
775             tv.tv_usec = 0;
776
777             Dprintf("pq_fetch: entering PDflush() loop");
778             while (PQflush(curs->conn->pgconn) != 0);
779             sval = select(sock+1, &rfds, NULL, NULL, &tv);
780
781             pthread_mutex_unlock(&(curs->conn->lock));
782             Py_END_ALLOW_THREADS;
783         }
784
785         Dprintf("pq_fetch: data is probably ready");
786         IFCLEARPGRES(curs->pgres);
787         curs->pgres = PQgetResult(curs->conn->pgconn);
788     }
789
790     /* check for PGRES_FATAL_ERROR result */
791     /* FIXME: I am not sure we need to check for critical error here.
792     if (curs->pgres == NULL) {
793         Dprintf("pq_fetch: got a NULL pgres, checking for critical");
794         pq_set_critical(curs->conn);
795         if (curs->conn->critical) {
796             pq_resolve_critical(curs->conn);
797             return -1;
798         }
799         else {
800             return 0;
801         }
802     }
803     */
804     
805     if (curs->pgres == NULL) return 0;
806     
807     pgstatus = PQresultStatus(curs->pgres);
808     Dprintf("pq_fetch: pgstatus = %s", PQresStatus(pgstatus));
809
810     /* backend status message */
811     Py_XDECREF(curs->pgstatus);
812     curs->pgstatus = PyString_FromString(PQcmdStatus(curs->pgres));
813
814     switch(pgstatus) {
815
816     case PGRES_COMMAND_OK:
817         Dprintf("pq_fetch: command returned OK (no tuples)");
818         curs->rowcount = atoi(PQcmdTuples(curs->pgres));
819         curs->lastoid = PQoidValue(curs->pgres);
820         CLEARPGRES(curs->pgres);
821         ex = 1;
822         break;
823
824     case PGRES_COPY_OUT:
825         Dprintf("pq_fetch: data from a COPY TO (no tuples)");
826 #ifdef HAVE_PQPROTOCOL3
827         if (curs->conn->protocol == 3)
828             ex = _pq_copy_out_v3(curs);
829         else
830 #endif
831             ex = _pq_copy_out(curs);
832         curs->rowcount = -1;
833         /* error caught by out glorious notice handler */
834         if (PyErr_Occurred()) ex = -1;
835         IFCLEARPGRES(curs->pgres);
836         break;
837         
838     case PGRES_COPY_IN:
839         Dprintf("pq_fetch: data from a COPY FROM (no tuples)");
840 #ifdef HAVE_PQPROTOCOL3
841         if (curs->conn->protocol == 3)        
842             ex = _pq_copy_in_v3(curs);
843         else
844 #endif
845             ex = _pq_copy_in(curs);
846         curs->rowcount = -1;
847         /* error caught by out glorious notice handler */
848         if (PyErr_Occurred()) ex = -1;
849         IFCLEARPGRES(curs->pgres);
850         break;
851         
852     case PGRES_TUPLES_OK:
853         Dprintf("pq_fetch: data from a SELECT (got tuples)");
854         curs->rowcount = PQntuples(curs->pgres);
855         _pq_fetch_tuples(curs); ex = 0;
856         /* don't clear curs->pgres, because it contains the results! */
857         break;
858         
859     default:
860         Dprintf("pq_fetch: uh-oh, something FAILED");
861         pq_raise(curs->conn, curs, NULL, NULL);
862         IFCLEARPGRES(curs->pgres);
863         ex = -1;
864         break;
865     }
866
867     Dprintf("pq_fetch: fetching done; check for critical errors");
868     
869     /* error checking, close the connection if necessary (some critical errors
870        are not really critical, like a COPY FROM error: if that's the case we
871        raise the exception but we avoid to close the connection) */
872     if (curs->conn->critical) {
873         if (ex == -1) {
874             pq_resolve_critical(curs->conn, 1);
875         }
876         else {
877             pq_resolve_critical(curs->conn, 0);
878         }
879         return -1;
880     }
881     
882     return ex;
883 }