Lots of changes. In no particular order:
[distributedratelimiting.git] / sqlite3 / ulogd_SQLITE3.c
1 /*
2  * ulogd output plugin for logging to a SQLITE database
3  *
4  * (C) 2005 by Ben La Monica <ben.lamonica@gmail.com>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License version 2 
8  *  as published by the Free Software Foundation
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, write to the Free Software
17  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18  * 
19  *  This module has been adapted from the ulogd_MYSQL.c written by
20  *  Harald Welte <laforge@gnumonks.org>
21  *  Alex Janssen <alex@ynfonatic.de>
22  *
23  *  You can see benchmarks and an explanation of the testing
24  *  at http://www.pojo.us/ulogd/
25  *
26  *  2005-02-09 Harald Welte <laforge@gnumonks.org>:
27  *      - port to ulogd-1.20 
28  */
29
30 #include <stdlib.h>
31 #include <string.h>
32 #include <arpa/inet.h>
33 #include <ulogd/ulogd.h>
34 #include <ulogd/conffile.h>
35 #include <sqlite3.h>
36
37 #ifdef DEBUG_SQLITE3
38 #define DEBUGP(x, args...)      fprintf(stderr, x, ## args)
39 #else
40 #define DEBUGP(x, args...)
41 #endif
42
43 struct _field {
44         char name[ULOGD_MAX_KEYLEN];
45         unsigned int id;
46         struct _field *next;
47 };
48
49 /* the database handle we are using */
50 static sqlite3 *dbh;
51
52 /* a linked list of the fields the table has */
53 static struct _field *fields;
54
55 /* buffer for our insert statement */
56 static char *stmt;
57
58 /* pointer to the final prepared statement */
59 static sqlite3_stmt *p_stmt;
60
61 /* number of statements to buffer before we commit */
62 static int buffer_size;
63
64 /* number of statements currently in the buffer */
65 static int buffer_ctr;
66
67 /* our configuration directives */
68 static config_entry_t db_ce = { 
69         .key = "db", 
70         .type = CONFIG_TYPE_STRING,
71         .options = CONFIG_OPT_MANDATORY,
72 };
73
74 static config_entry_t table_ce = { 
75         .next = &db_ce, 
76         .key = "table",
77         .type = CONFIG_TYPE_STRING,
78         .options = CONFIG_OPT_MANDATORY,
79 };
80
81 static config_entry_t buffer_ce = { 
82         .next = &table_ce,
83         .key = "buffer",
84         .type = CONFIG_TYPE_INT,
85         .options = CONFIG_OPT_MANDATORY,
86 };
87
88 /* our main output function, called by ulogd */
89 static int _sqlite3_output(ulog_iret_t *result)
90 {
91         struct _field *f;
92         ulog_iret_t *res;
93         int col_counter;
94 #ifdef IP_AS_STRING
95         char *ipaddr;
96         struct in_addr addr;
97 #endif
98
99         col_counter = 1;
100         for (f = fields; f; f = f->next) {
101                 res = keyh_getres(f->id);
102
103                 if (!res) {
104                         ulogd_log(ULOGD_NOTICE,
105                                 "no result for %s ?!?\n", f->name);
106                 }
107                         
108                 if (!res || !IS_VALID((*res))) {
109                         /* no result, pass a null */
110                         sqlite3_bind_null(p_stmt, col_counter);
111                         col_counter++;
112                         continue;
113                 }
114                 
115                 switch (res->type) {
116                         case ULOGD_RET_INT8:
117                                 sqlite3_bind_int(p_stmt,col_counter,res->value.i8);
118                                 break;
119                         case ULOGD_RET_INT16:
120                                 sqlite3_bind_int(p_stmt,col_counter,res->value.i16);
121                                 break;
122                         case ULOGD_RET_INT32:
123                                 sqlite3_bind_int(p_stmt,col_counter,res->value.i32);
124                                 break;
125                         case ULOGD_RET_INT64:
126                                 sqlite3_bind_int64(p_stmt,col_counter,res->value.i64);
127                                 break;
128                         case ULOGD_RET_UINT8:
129                                 sqlite3_bind_int(p_stmt,col_counter,res->value.ui8);
130                                 break;
131                         case ULOGD_RET_UINT16:
132                                 sqlite3_bind_int(p_stmt,col_counter,res->value.ui16);
133                                 break;
134                         case ULOGD_RET_IPADDR:
135 #ifdef IP_AS_STRING
136                                 memset(&addr, 0, sizeof(addr));
137                                 addr.s_addr = ntohl(res->value.ui32);
138                                 ipaddr = inet_ntoa(addr);
139                                 sqlite3_bind_text(p_stmt,col_counter,ipaddr,strlen(ipaddr),SQLITE_STATIC);
140                                 break;
141 #endif /* IP_AS_STRING */
142                         /* EVIL: fallthrough when logging IP as u_int32_t */
143                         case ULOGD_RET_UINT32:
144                                 sqlite3_bind_int(p_stmt,col_counter,res->value.ui32);
145                                 break;
146                         case ULOGD_RET_UINT64:
147                                 sqlite3_bind_int64(p_stmt,col_counter,res->value.ui64);
148                                 break;
149                         case ULOGD_RET_BOOL:
150                                 sqlite3_bind_int(p_stmt,col_counter,res->value.b);
151                                 break;
152                         case ULOGD_RET_STRING:
153                                 sqlite3_bind_text(p_stmt,col_counter,res->value.ptr,strlen(res->value.ptr),SQLITE_STATIC);
154                                 break;
155                         default:
156                                 ulogd_log(ULOGD_NOTICE,
157                                         "unknown type %d for %s\n",
158                                         res->type, res->key);
159                                 break;
160                 } 
161
162                 col_counter++;
163         }
164
165         /* now we have created our statement, insert it */
166
167         if (sqlite3_step(p_stmt) == SQLITE_DONE) {
168                 sqlite3_reset(p_stmt);
169                 buffer_ctr++;
170         } else {
171                 ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n",
172                                 sqlite3_errmsg(dbh));
173                 return 1;
174         }
175
176         /* commit all of the inserts to the database, ie flush buffer */
177         if (buffer_ctr >= buffer_size) {
178                 if (sqlite3_exec(dbh,"commit",NULL,NULL,NULL) != SQLITE_OK)
179                         ulogd_log(ULOGD_ERROR,"unable to commit records to db.");
180
181                 if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK)
182                         ulogd_log(ULOGD_ERROR,"unable to begin a new transaction.");
183
184                 buffer_ctr = 0;
185                 DEBUGP("committing.\n");
186         }
187
188         return 0;
189 }
190
191 #define _SQLITE3_INSERTTEMPL   "insert into X (Y) values (Z)"
192
193 /* create the static part of our insert statement */
194 static int _sqlite3_createstmt(void)
195 {
196         struct _field *f;
197         unsigned int size;
198         char buf[ULOGD_MAX_KEYLEN];
199         char *underscore;
200         char *stmt_pos;
201         int col_count;
202         int i;
203
204         if (stmt) {
205                 ulogd_log(ULOGD_NOTICE, "createstmt called, but stmt"
206                         " already existing\n"); 
207                 return 1;
208         }
209
210         /* caclulate the size for the insert statement */
211         size = strlen(_SQLITE3_INSERTTEMPL) + strlen(table_ce.u.string);
212
213         DEBUGP("initial size: %u\n", size);
214
215         col_count = 0;
216         for (f = fields; f; f = f->next) {
217                 /* we need space for the key and a comma, and a ? */
218                 size += strlen(f->name) + 3;
219                 DEBUGP("size is now %u since adding %s\n",size,f->name);
220                 col_count++;
221         }
222
223         DEBUGP("there were %d columns\n",col_count);
224         DEBUGP("after calc name length: %u\n",size);
225
226         ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size);
227
228         stmt = (char *) malloc(size);
229
230         if (!stmt) {
231                 ulogd_log(ULOGD_ERROR, "OOM!\n");
232                 return 1;
233         }
234
235         sprintf(stmt, "insert into %s (", table_ce.u.string);
236         stmt_pos = stmt + strlen(stmt);
237
238         for (f = fields; f; f = f->next) {
239                 strncpy(buf, f->name, ULOGD_MAX_KEYLEN);        
240                 while ((underscore = strchr(buf, '.')))
241                         *underscore = '_';
242                 sprintf(stmt_pos, "%s,", buf);
243                 stmt_pos = stmt + strlen(stmt);
244         }
245
246         *(stmt_pos - 1) = ')';
247
248         sprintf(stmt_pos, " values (");
249         stmt_pos = stmt + strlen(stmt);
250
251         for (i = 0; i < col_count - 1; i++) {
252                 sprintf(stmt_pos,"?,");
253                 stmt_pos += 2;
254         }
255
256         sprintf(stmt_pos, "?)");
257         ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", stmt);
258
259         DEBUGP("about to prepare statement.\n");
260
261         sqlite3_prepare(dbh,stmt,-1,&p_stmt,0);
262
263         DEBUGP("statement prepared.\n");
264
265         if (!p_stmt) {
266                 ulogd_log(ULOGD_ERROR,"unable to prepare statement");
267                 return 1;
268         }
269
270         return 0;
271 }
272
273
274 /* length of "select * from \0" */
275 #define SQLITE_SELECT_LEN 15
276
277 /* find out which columns the table has */
278 static int _sqlite3_get_columns(const char *table)
279 {
280         char buf[ULOGD_MAX_KEYLEN];
281         char query[SQLITE_SELECT_LEN + CONFIG_VAL_STRING_LEN] = "select * from \0";
282         char *underscore;
283         struct _field *f;
284         sqlite3_stmt *schema_stmt;
285         int column;
286         int result;
287         int id;
288
289         if (!dbh)
290                 return 1;
291
292         strncat(query,table,LINE_LEN);
293         
294         result = sqlite3_prepare(dbh,query,-1,&schema_stmt,0);
295         
296         if (result != SQLITE_OK)
297                 return 1;
298
299         for (column = 0; column < sqlite3_column_count(schema_stmt); column++) {
300                 /* replace all underscores with dots */
301                 strncpy(buf, sqlite3_column_name(schema_stmt,column), ULOGD_MAX_KEYLEN);
302                 while ((underscore = strchr(buf, '_')))
303                         *underscore = '.';
304
305                 DEBUGP("field '%s' found: ", buf);
306
307                 if (!(id = keyh_getid(buf))) {
308                         DEBUGP(" no keyid!\n");
309                         continue;
310                 }
311
312                 DEBUGP("keyid %u\n", id);
313
314                 /* prepend it to the linked list */
315                 f = (struct _field *) malloc(sizeof *f);
316                 if (!f) {
317                         ulogd_log(ULOGD_ERROR, "OOM!\n");
318                         return 1;
319                 }
320                 strncpy(f->name, buf, ULOGD_MAX_KEYLEN);
321                 f->id = id;
322                 f->next = fields;
323                 fields = f;     
324         }
325
326         sqlite3_finalize(schema_stmt);
327         return 0;
328 }
329
330 /** 
331  * make connection and select database 
332  * returns 0 if database failed to open.
333  */
334 static int _sqlite3_open_db(char *db_file)
335 {
336         DEBUGP("opening database.\n");
337         return sqlite3_open(db_file,&dbh);
338 }
339
340 /* give us an opportunity to close the database down properly */
341 static void _sqlite3_fini(void)
342 {
343         DEBUGP("cleaning up db connection\n");
344
345         /* free up our prepared statements so we can close the db */
346         if (p_stmt) {
347                 sqlite3_finalize(p_stmt);
348                 DEBUGP("prepared statement finalized\n");
349         }
350
351         if (dbh) {
352                 int result;
353                 /* flush the remaining insert statements to the database. */
354                 result = sqlite3_exec(dbh,"commit",NULL,NULL,NULL);
355
356                 if (result != SQLITE_OK)
357                         ulogd_log(ULOGD_ERROR,"unable to commit remaining records to db.");
358
359                 sqlite3_close(dbh);
360                 DEBUGP("database file closed\n");
361         }
362 }
363
364 #define _SQLITE3_BUSY_TIMEOUT 300
365
366 static int _sqlite3_init(void)
367 {
368         /* have the opts parsed */
369         config_parse_file("SQLITE3", &buffer_ce);
370
371         if (_sqlite3_open_db(db_ce.u.string)) {
372                 ulogd_log(ULOGD_ERROR, "can't open the database file\n");
373                 return 1;
374         }
375
376         /* set the timeout so that we don't automatically fail
377          * if the table is busy. */
378         sqlite3_busy_timeout(dbh, _SQLITE3_BUSY_TIMEOUT);
379
380         /* read the fieldnames to know which values to insert */
381         if (_sqlite3_get_columns(table_ce.u.string)) {
382                 ulogd_log(ULOGD_ERROR, "unable to get sqlite columns\n");
383                 return 1;
384         }
385
386         /* initialize our buffer size and counter */
387         buffer_size = buffer_ce.u.value;
388         buffer_ctr = 0;
389
390         DEBUGP("Have a buffer size of : %d\n", buffer_size);
391
392         if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK)
393                 ulogd_log(ULOGD_ERROR,"can't create a new transaction\n");
394
395         /* create and prepare the actual insert statement */
396         _sqlite3_createstmt();
397
398         return 0;
399 }
400
401 static ulog_output_t _sqlite3_plugin = { 
402         .name = "sqlite3", 
403         .output = &_sqlite3_output, 
404         .init = &_sqlite3_init,
405         .fini = &_sqlite3_fini,
406 };
407
408 void _init(void) 
409 {
410         register_output(&_sqlite3_plugin);
411 }
412