Added my ugly FPS graphing script.
[distributedratelimiting.git] / pgsql / ulogd_PGSQL.c
1 /* ulogd_PGSQL.c, Version $Revision: 6404 $
2  *
3  * ulogd output plugin for logging to a PGSQL database
4  *
5  * (C) 2000-2005 by Harald Welte <laforge@gnumonks.org> 
6  * This software is distributed under the terms of GNU GPL 
7  * 
8  * This plugin is based on the MySQL plugin made by Harald Welte.
9  * The support PostgreSQL were made by Jakab Laszlo.
10  *
11  */
12
13 #include <stdlib.h>
14 #include <string.h>
15 #include <arpa/inet.h>
16 #include <ulogd/ulogd.h>
17 #include <ulogd/conffile.h>
18 #include <libpq-fe.h>
19
20
21 #ifdef DEBUG_PGSQL
22 #define DEBUGP(x, args...)      fprintf(stderr, x, ## args)
23 #else
24 #define DEBUGP(x, args...)
25 #endif
26
27 struct _field {
28         char name[ULOGD_MAX_KEYLEN];
29         unsigned int id;
30         struct _field *next;
31 };
32
33 /* the database handle we are using */
34 static PGconn *dbh;
35
36 /* a linked list of the fields the table has */
37 static struct _field *fields;
38
39 /* buffer for our insert statement */
40 static char *stmt;
41
42 /* pointer to the beginning of the "VALUES" part */
43 static char *stmt_val;
44
45 /* pointer to current inser position in statement */
46 static char *stmt_ins;
47
48 /* our configuration directives */
49 static config_entry_t db_ce = { 
50         .key = "db", 
51         .type = CONFIG_TYPE_STRING,
52         .options = CONFIG_OPT_MANDATORY,
53 };
54
55 static config_entry_t host_ce = { 
56         .next = &db_ce, 
57         .key = "host", 
58         .type = CONFIG_TYPE_STRING,
59         .options = CONFIG_OPT_NONE,
60 };
61
62 static config_entry_t user_ce = { 
63         .next = &host_ce, 
64         .key = "user", 
65         .type = CONFIG_TYPE_STRING,
66         .options = CONFIG_OPT_MANDATORY,
67 };
68
69 static config_entry_t pass_ce = { 
70         .next = &user_ce, 
71         .key = "pass", 
72         .type = CONFIG_TYPE_STRING,
73         .options = CONFIG_OPT_NONE,
74 };
75
76 static config_entry_t table_ce = { 
77         .next = &pass_ce, 
78         .key = "table", 
79         .type = CONFIG_TYPE_STRING,
80         .options = CONFIG_OPT_MANDATORY,
81 };
82
83 static config_entry_t schema_ce = { 
84         .next = &table_ce, 
85         .key = "schema", 
86         .type = CONFIG_TYPE_STRING,
87         .options = CONFIG_OPT_NONE,
88         .u = { .string = "public" },
89 };
90
91 static config_entry_t port_ce = {
92         .next = &schema_ce,
93         .key = "port",
94         .type = CONFIG_TYPE_INT,
95         .options = CONFIG_OPT_NONE,
96 };
97
98 static unsigned char pgsql_have_schemas;
99
100 /* our main output function, called by ulogd */
101 static int pgsql_output(ulog_iret_t *result)
102 {
103         struct _field *f;
104         ulog_iret_t *res;
105         PGresult   *pgres;
106 #ifdef IP_AS_STRING
107         char *tmpstr;           /* need this for --log-ip-as-string */
108         struct in_addr addr;
109 #endif
110
111         stmt_ins = stmt_val;
112
113         for (f = fields; f; f = f->next) {
114                 res = keyh_getres(f->id);
115
116                 if (!res) {
117                         ulogd_log(ULOGD_NOTICE,
118                                 "no result for %s ?!?\n", f->name);
119                 }
120
121                 if (!res || !IS_VALID((*res))) {
122                         /* no result, we have to fake something */
123                         sprintf(stmt_ins, "NULL,");
124                         stmt_ins = stmt + strlen(stmt);
125                         continue;
126                 }
127
128                 switch (res->type) {
129                         case ULOGD_RET_INT8:
130                                 sprintf(stmt_ins, "%d,", res->value.i8);
131                                 break;
132                         case ULOGD_RET_INT16:
133                                 sprintf(stmt_ins, "%d,", res->value.i16);
134                                 break;
135                         case ULOGD_RET_INT32:
136                                 sprintf(stmt_ins, "%d,", res->value.i32);
137                                 break;
138                         case ULOGD_RET_INT64:
139                                 sprintf(stmt_ins, "%lld,", res->value.i64);
140                                 break;
141                         case ULOGD_RET_UINT8:
142                                 sprintf(stmt_ins, "%u,", res->value.ui8);
143                                 break;
144                         case ULOGD_RET_UINT16:
145                                 sprintf(stmt_ins, "%u,", res->value.ui16);
146                                 break;
147                         case ULOGD_RET_IPADDR:
148 #ifdef IP_AS_STRING
149                                 *stmt_ins++ = '\'';
150                                 memset(&addr, 0, sizeof(addr));
151                                 addr.s_addr = ntohl(res->value.ui32);
152                                 tmpstr = (char *)inet_ntoa(addr);
153                                 PQescapeString(stmt_ins,tmpstr,strlen(tmpstr)); 
154                                 stmt_ins = stmt + strlen(stmt);
155                                 sprintf(stmt_ins, "',");
156                                 break;
157 #endif /* IP_AS_STRING */
158                                 /* EVIL: fallthrough when logging IP as
159                                  * u_int32_t */
160
161                         case ULOGD_RET_UINT32:
162                                 sprintf(stmt_ins, "%u,", res->value.ui32);
163                                 break;
164                         case ULOGD_RET_UINT64:
165                                 sprintf(stmt_ins, "%llu,", res->value.ui64);
166                                 break;
167                         case ULOGD_RET_BOOL:
168                                 sprintf(stmt_ins, "'%d',", res->value.b);
169                                 break;
170                         case ULOGD_RET_STRING:
171                                 *stmt_ins++ = '\'';
172                                 PQescapeString(stmt_ins,res->value.ptr,strlen(res->value.ptr)); 
173                                 stmt_ins = stmt + strlen(stmt);
174                                 sprintf(stmt_ins, "',");
175                                 break;
176                         case ULOGD_RET_RAW:
177                                 ulogd_log(ULOGD_NOTICE,"%s: pgsql doesn't support type RAW\n",res->key);
178                                 sprintf(stmt_ins, "NULL,");
179                                 break;
180                         default:
181                                 ulogd_log(ULOGD_NOTICE,
182                                         "unknown type %d for %s\n",
183                                         res->type, res->key);
184                                 break;
185                 }
186                 stmt_ins = stmt + strlen(stmt);
187         }
188         *(stmt_ins - 1) = ')';
189         DEBUGP("stmt=#%s#\n", stmt);
190
191         /* now we have created our statement, insert it */
192         /* Added code by Jaki */
193         pgres = PQexec(dbh, stmt);
194         if(!pgres || PQresultStatus(pgres) != PGRES_COMMAND_OK) {
195                 ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n",
196                                 PQresultErrorMessage(pgres));
197                 return 1;
198         }
199
200         PQclear(pgres);
201
202         return 0;
203 }
204
205 #define PGSQL_HAVE_NAMESPACE_TEMPLATE "SELECT nspname FROM pg_namespace n WHERE n.nspname='%s'"
206
207 /* Determine if server support schemas */
208 static int pgsql_namespace(void) {
209         PGresult *result;
210         char pgbuf[strlen(PGSQL_HAVE_NAMESPACE_TEMPLATE)+strlen(schema_ce.u.string)+1];
211
212         if (!dbh)
213                 return 1;
214
215         sprintf(pgbuf, PGSQL_HAVE_NAMESPACE_TEMPLATE, schema_ce.u.string);
216         ulogd_log(ULOGD_DEBUG, "%s\n", pgbuf);
217         
218         result = PQexec(dbh, pgbuf);
219         if (!result) {
220                 ulogd_log(ULOGD_DEBUG, "\n result false");
221                 return 1;
222         }
223
224         if (PQresultStatus(result) == PGRES_TUPLES_OK) {
225                 ulogd_log(ULOGD_DEBUG, "using schema %s\n", schema_ce.u.string);
226                 pgsql_have_schemas = 1;
227         } else {
228                 pgsql_have_schemas = 0;
229         }
230
231         PQclear(result);
232         
233         return 0;
234 }
235
236 #define PGSQL_INSERTTEMPL   "insert into X (Y) values (Z)"
237 #define PGSQL_VALSIZE   100
238
239 /* create the static part of our insert statement */
240 static int pgsql_createstmt(void)
241 {
242         struct _field *f;
243         unsigned int size;
244         char buf[ULOGD_MAX_KEYLEN];
245         char *underscore;
246
247         if (stmt) {
248                 ulogd_log(ULOGD_NOTICE, "createstmt called, but stmt"
249                         " already existing\n");
250                 return 1;
251         }
252
253         /* caclulate the size for the insert statement */
254         size = strlen(PGSQL_INSERTTEMPL) + strlen(table_ce.u.string) + strlen(schema_ce.u.string) + 1;
255
256         for (f = fields; f; f = f->next) {
257                 /* we need space for the key and a comma, as well as
258                  * enough space for the values */
259                 size += strlen(f->name) + 1 + PGSQL_VALSIZE;
260         }
261
262         ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size);
263
264         stmt = (char *) malloc(size);
265
266         if (!stmt) {
267                 ulogd_log(ULOGD_ERROR, "OOM!\n");
268                 return 1;
269         }
270
271         if (pgsql_have_schemas) {
272                 sprintf(stmt, "insert into %s.%s (", schema_ce.u.string, table_ce.u.string);
273         } else {
274                 sprintf(stmt, "insert into %s (", table_ce.u.string);
275         }
276
277         stmt_val = stmt + strlen(stmt);
278
279         for (f = fields; f; f = f->next) {
280                 strncpy(buf, f->name, ULOGD_MAX_KEYLEN);
281                 while ((underscore = strchr(buf, '.')))
282                         *underscore = '_';
283                 sprintf(stmt_val, "%s,", buf);
284                 stmt_val = stmt + strlen(stmt);
285         }
286         *(stmt_val - 1) = ')';
287
288         sprintf(stmt_val, " values (");
289         stmt_val = stmt + strlen(stmt);
290
291         ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", stmt);
292
293         return 0;
294 }
295
296 #define PGSQL_GETCOLUMN_TEMPLATE "SELECT  a.attname FROM pg_class c, pg_attribute a WHERE c.relname ='%s' AND a.attnum>0 AND a.attrelid=c.oid ORDER BY a.attnum"
297
298 #define PGSQL_GETCOLUMN_TEMPLATE_SCHEMA "SELECT a.attname FROM pg_attribute a, pg_class c LEFT JOIN pg_namespace n ON c.relnamespace=n.oid WHERE c.relname ='%s' AND n.nspname='%s' AND a.attnum>0 AND a.attrelid=c.oid AND a.attisdropped=FALSE ORDER BY a.attnum"
299
300 /* find out which columns the table has */
301 static int pgsql_get_columns(const char *table)
302 {
303         PGresult *result;
304         char buf[ULOGD_MAX_KEYLEN];
305         char pgbuf[strlen(PGSQL_GETCOLUMN_TEMPLATE_SCHEMA)+strlen(table)+strlen(schema_ce.u.string)+2];
306         char *underscore;
307         struct _field *f;
308         int id;
309         int intaux;
310
311         if (!dbh)
312                 return 1;
313
314         if (pgsql_have_schemas) {
315                 snprintf(pgbuf, sizeof(pgbuf)-1, PGSQL_GETCOLUMN_TEMPLATE_SCHEMA, table, schema_ce.u.string);
316         } else {
317                 snprintf(pgbuf, sizeof(pgbuf)-1, PGSQL_GETCOLUMN_TEMPLATE, table);
318         }
319
320         ulogd_log(ULOGD_DEBUG, "%s\n", pgbuf);
321
322         result = PQexec(dbh, pgbuf);
323         if (!result) {
324                 ulogd_log(ULOGD_DEBUG, "\n result false");
325                 return 1;
326         }
327
328         if (PQresultStatus(result) != PGRES_TUPLES_OK) {
329                 ulogd_log(ULOGD_DEBUG, "\n pres_command_not_ok");
330                 return 1;
331         }
332
333         for (intaux=0; intaux<PQntuples(result); intaux++) {
334
335                 /* replace all underscores with dots */
336                 strncpy(buf, PQgetvalue(result, intaux, 0), ULOGD_MAX_KEYLEN);
337                 while ((underscore = strchr(buf, '_')))
338                         *underscore = '.';
339
340                 DEBUGP("field '%s' found: ", buf);
341
342                 if (!(id = keyh_getid(buf))) {
343                         DEBUGP(" no keyid!\n");
344                         continue;
345                 }
346
347                 DEBUGP("keyid %u\n", id);
348
349                 /* prepend it to the linked list */
350                 f = (struct _field *) malloc(sizeof *f);
351                 if (!f) {
352                         ulogd_log(ULOGD_ERROR, "OOM!\n");
353                         return 1;
354                 }
355                 strncpy(f->name, buf, ULOGD_MAX_KEYLEN);
356                 f->id = id;
357                 f->next = fields;
358                 fields = f;
359         }
360
361         PQclear(result);
362         return 0;
363 }
364
365 static int exit_nicely(PGconn *conn)
366 {
367         PQfinish(conn);
368         return 0;;
369 }
370
371 /* make connection and select database */
372 static int pgsql_open_db(char *server, int port, char *user, char *pass, 
373                          char *db)
374 {
375         int len;
376         char *connstr;
377
378         /* 80 is more than what we need for the fixed parts below */
379         len = 80 + strlen(user) + strlen(db);
380
381         /* hostname and  and password are the only optionals */
382         if (server)
383                 len += strlen(server);
384         if (pass)
385                 len += strlen(pass);
386         if (port)
387                 len += 20;
388
389         connstr = (char *) malloc(len);
390         if (!connstr)
391                 return 1;
392
393         if (server) {
394                 strcpy(connstr, " host=");
395                 strcat(connstr, server);
396         }
397
398         if (port) {
399                 char portbuf[20];
400                 snprintf(portbuf, sizeof(portbuf), " port=%u", port);
401                 strcat(connstr, portbuf);
402         }
403
404         strcat(connstr, " dbname=");
405         strcat(connstr, db);
406         strcat(connstr, " user=");
407         strcat(connstr, user);
408
409         if (pass) {
410                 strcat(connstr, " password=");
411                 strcat(connstr, pass);
412         }
413         
414         dbh = PQconnectdb(connstr);
415         if (PQstatus(dbh)!=CONNECTION_OK) {
416                 exit_nicely(dbh);
417                 return 1;
418         }
419
420         return 0;
421 }
422
423 static int pgsql_init(void)
424 {
425         /* have the opts parsed */
426         config_parse_file("PGSQL", &port_ce);
427
428         if (pgsql_open_db(host_ce.u.string, port_ce.u.value, user_ce.u.string,
429                            pass_ce.u.string, db_ce.u.string)) {
430                 ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
431                 return 1;
432         }
433
434         if (pgsql_namespace()) {
435                 return 1;
436                 ulogd_log(ULOGD_ERROR, "unable to test for pgsql schemas\n");
437         }
438
439         /* read the fieldnames to know which values to insert */
440         if (pgsql_get_columns(table_ce.u.string)) {
441                 ulogd_log(ULOGD_ERROR, "unable to get pgsql columns\n");
442                 return 1;
443         }
444         pgsql_createstmt();
445
446         return 0;
447 }
448
449 static void pgsql_fini(void)
450 {
451         PQfinish(dbh);
452 }
453
454 static ulog_output_t pgsql_plugin = { 
455         .name = "pgsql", 
456         .output = &pgsql_output,
457         .init = &pgsql_init,
458         .fini = &pgsql_fini,
459 };
460
461 void _init(void)
462 {
463         register_output(&pgsql_plugin);
464 }