no need to check to kernel source (also fixes the nightly builds)
[distributedratelimiting.git] / netflow / ulogd_NETFLOW.c
1 /*
2  * ulogd output target for IP flow analysis
3  *
4  * Mark Huang <mlhuang@cs.princeton.edu>
5  * Copyright (C) 2004-2005 The Trustees of Princeton University
6  *
7  * Based on admindump.pl by Mic Bowman and Paul Brett
8  * Copyright (c) 2002 Intel Corporation
9  *
10  * $Id: ulogd_NETFLOW.c,v 1.19 2005/04/20 21:10:05 mlhuang Exp $
11  */
12
13 /* Enable GNU glibc extensions */
14 #define _GNU_SOURCE
15
16 #include <stdio.h>
17 #include <stdlib.h>
18
19 /* va_start() and friends */
20 #include <stdarg.h>
21
22 /* ispunct() */
23 #include <ctype.h>
24
25 /* strstr() and friends */
26 #include <string.h>
27
28 /* dirname() and basename() */
29 #include <libgen.h>
30
31 /* fork() and wait() */
32 #include <sys/types.h>
33 #include <unistd.h>
34 #include <sys/wait.h>
35
36 /* fgetpwent() */
37 #include <pwd.h>
38
39 /* errno and assert() */
40 #include <errno.h>
41 #include <assert.h>
42
43 /* getopt_long() */
44 #include <getopt.h>
45
46 /* time() and friends */
47 #include <time.h>
48 #include <sys/time.h>
49
50 /* inet_aton() */
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
54
55 /* ICMP definitions */
56 #include <netinet/ip.h>
57 #include <netinet/ip_icmp.h>
58
59 /* stat() */
60 #include <sys/stat.h>
61
62 /* pthread_create() */
63 #include <pthread.h>
64
65 /* flock() */
66 #include <sys/file.h>
67
68 #include <ulogd/ulogd.h>
69 #include <ulogd/conffile.h>
70
71 #if !defined(STANDALONE) && HAVE_LIBPROPER
72 #include <proper/prop.h>
73 #endif
74
75 /*
76  * /etc/ulogd.conf configuration options
77  */
78
79 /* Dump interval in minutes */
80 static config_entry_t interval = {
81         .next = NULL,
82         .key = "interval",
83         .type = CONFIG_TYPE_INT,
84         .options = CONFIG_OPT_NONE,
85         .u = { .value = 5 },
86 };
87
88 /* Slice map (in /etc/passwd format) */
89 static config_entry_t slicemap = {
90         .next = &interval,
91         .key = "slicemap",
92         .type = CONFIG_TYPE_STRING,
93         .options = CONFIG_OPT_NONE,
94         .u = { .string = "/etc/passwd" },
95 };
96
97 /* MySQL database name */
98 static config_entry_t mysqldb = {
99         .next = &slicemap,
100         .key = "mysqldb",
101         .type = CONFIG_TYPE_STRING,
102         .options = CONFIG_OPT_NONE,
103         .u = { .string = "netflow" },
104 };
105
106 /* MySQL database user */
107 static config_entry_t mysqluser = {
108         .next = &mysqldb,
109         .key = "mysqluser",
110         .type = CONFIG_TYPE_STRING,
111         .options = CONFIG_OPT_NONE,
112         .u = { .string = "netflow" },
113 };
114
115 /* MySQL database password */
116 static config_entry_t mysqlpass = {
117         .next = &mysqluser,
118         .key = "mysqlpass",
119         .type = CONFIG_TYPE_STRING,
120         .options = CONFIG_OPT_NONE,
121         .u = { .string = "" },
122 };
123
124 /* MySQL database host */
125 static config_entry_t mysqlhost = {
126         .next = &mysqlpass,
127         .key = "mysqlhost",
128         .type = CONFIG_TYPE_STRING,
129         .options = CONFIG_OPT_NONE,
130         .u = { .string = "" },
131 };
132
133 /* Latest flows in CSV format */
134 static config_entry_t csv = {
135         .next = &mysqlhost,
136         .key = "csv",
137         .type = CONFIG_TYPE_STRING,
138         .options = CONFIG_OPT_NONE,
139         .u = { .string = "/var/www/html/flows.csv" },
140 };
141
142 #define config_entries (&csv)
143
144 /*
145  * Debug functionality
146  */
147
148 #ifdef DMALLOC
149 #include <dmalloc.h>
150 #endif
151
152 #define NIPQUAD(addr) \
153         ((unsigned char *)&addr)[0], \
154         ((unsigned char *)&addr)[1], \
155         ((unsigned char *)&addr)[2], \
156         ((unsigned char *)&addr)[3]
157
158 #define IPQUAD(addr) \
159         ((unsigned char *)&addr)[3], \
160         ((unsigned char *)&addr)[2], \
161         ((unsigned char *)&addr)[1], \
162         ((unsigned char *)&addr)[0]
163
164 /*
165  * MySQL support
166  */
167
168 #include <mysql/mysql.h>
169
170 /* mysql_query() with printf() syntax */
171 static int
172 mysql_queryf(MYSQL *dbh, const char *fmt, ...)
173 {
174         char *buf = NULL;
175         size_t len;
176         va_list ap;
177         int ret = 2 * strlen(fmt);
178
179         do {
180                 len = ret + 1;
181                 if (!(buf = realloc(buf, len))) {
182                         ulogd_log(ULOGD_ERROR, "realloc: %s\n", strerror(errno));
183                         ret = -errno;
184                         goto done;
185                 }
186                 va_start(ap, fmt);
187                 ret = vsnprintf(buf, len, fmt, ap);
188                 va_end(ap);
189                 if (ret < 0) {
190                         ulogd_log(ULOGD_ERROR, "vsnprintf: %s\n", strerror(errno));
191                         goto done;
192                 }
193         } while (ret >= len);
194
195         ret = mysql_query(dbh, buf);
196         if (ret)
197                 ulogd_log(ULOGD_ERROR, "%s: %s\n", buf, mysql_error(dbh));
198
199  done:
200         if (buf)
201                 free(buf);
202         return ret;
203 }
204
205 /*
206  * Jenkins hash support
207  */
208
209 typedef u_int8_t u8;
210 typedef u_int16_t u16;
211 typedef u_int32_t u32;
212 #include <linux/jhash.h>
213
214 /* Salt for the hash functions */
215 static int salt;
216
217 /*
218  * Hash slice name lookups on context ID.
219  */
220
221 /* Special context IDs */
222 #define UNKNOWN_XID -1
223 #define ROOT_XID 0
224
225 enum {
226         CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
227         ICMP_ECHOREPLY_XID,
228         ICMP_UNREACH_XID,
229 };
230
231 /* A slice */
232 struct slice {
233         struct slice *next;
234         int xid;
235         char *name;
236 };
237
238 /* Must be a power of 2 */
239 #define SLICE_HASH_SIZE 128
240 struct slice *slice_hash[SLICE_HASH_SIZE];
241
242 static inline int
243 hash_slice(int xid)
244 {
245         return jhash_1word(xid, salt) & (SLICE_HASH_SIZE - 1);
246 }
247
248 static struct slice *
249 get_slice(int xid)
250 {
251         struct slice *slice;
252         int i;
253
254         i = hash_slice(xid);
255         for (slice = slice_hash[i]; slice; slice = slice->next) {
256                 if (slice->xid == xid)
257                         break;
258         }
259
260         return slice;
261 }
262
263 static struct slice *
264 add_slice(int xid, char *name)
265 {
266         struct slice *slice;
267         int i;
268
269         slice = malloc(sizeof(struct slice));
270         if (!slice)
271                 return NULL;
272         memset(slice, 0, sizeof(struct slice));
273
274         slice->xid = xid;
275         slice->name = strdup(name);
276         if (!slice->name) {
277                 free(slice);
278                 return NULL;
279         }
280
281         /* Add to hashed list */
282         i = hash_slice(xid);
283         slice->next = slice_hash[i];
284         slice_hash[i] = slice;
285
286         return slice;
287 }
288
289 static int
290 hash_passwd(void)
291 {
292         int fd;
293         FILE *fp;
294         struct passwd *pw;
295         struct slice *slice, *next;
296         int i;
297
298 #if !defined(STANDALONE) && HAVE_LIBPROPER
299         if ((fd = prop_open_file(slicemap.u.string, PROP_OPEN_READ)) < 0) {
300                 /* prop_open_file() returns -errno */
301                 errno = -fd;
302         }
303 #else
304         fd = open(slicemap.u.string, O_RDONLY);
305 #endif
306
307         if (fd < 0 || !(fp = fdopen(fd, "r"))) {
308                 ulogd_log(ULOGD_ERROR, "%s: %s\n", slicemap.u.string, strerror(errno));
309                 if (fd >= 0)
310                         close(fd);
311                 return errno;
312         }
313
314         /* Clean hash */
315         for (i = 0; i < SLICE_HASH_SIZE; i++) {
316                 for (slice = slice_hash[i]; slice; slice = next) {
317                         next = slice->next;
318                         assert(slice->name);
319                         free(slice->name);
320                         free(slice);
321                 }
322                 slice_hash[i] = NULL;
323         }
324
325         /* (Re)build slice hash */
326         while ((pw = fgetpwent(fp)))
327                 add_slice(pw->pw_uid, pw->pw_name);
328
329         fclose(fp);
330         close(fd);
331         return 0;
332 }
333
334 /*
335  * Maintain flows in a two level hash table. The first level hashes
336  * on (src_ip, slice). The second level hashes on (protocol, dst_ip,
337  * src_port, dst_port). This structure mirrors how we store flows in
338  * the database (in many tables where each table contains flows with
339  * the same (src_ip, slice) parameters and unique (protocol,
340  * src_port, dst_ip, dst_port) rows).
341  */
342
343 /* Must be powers of 2 */
344 #ifdef STANDALONE
345 #define TABLE_HASH_SIZE 16
346 #define FLOW_HASH_SIZE 65536
347 #else
348 #define TABLE_HASH_SIZE 8
349 #define FLOW_HASH_SIZE 1024
350 #endif
351
352 struct flow {
353         struct flow *nexth;             /* Next in hash */
354         struct flow *next;              /* Next in ordered list */
355         struct flow_table *table;       /* Back pointer */
356         u_int8_t protocol;              /* IP protocol number */
357         u_int16_t src_port;             /* IP source port (host order) */
358         u_int16_t dst_port;             /* IP destination port (host order) */
359         u_int32_t dst_ip;               /* IP destination address (host order) */
360         time_t start_time;              /* Timestamp of the first packet in the flow */
361         time_t end_time;                /* Timestamp of the first packet in the flow */
362         unsigned long long packets;     /* Number of IP packets */
363         unsigned long long bytes;       /* Number of IP bytes (including headers) */
364 };
365
366 struct flow_table {
367         struct flow_table *nexth;       /* Next in hash */
368         struct flow_table *next;        /* Next in ordered list */
369         u_int32_t src_ip;               /* IP source address (host order) */
370         int xid;                        /* Context ID */
371         char slice[32];                 /* Slice name */
372         unsigned long rows;             /* Total number of rows to insert/update */
373         unsigned long collisions;       /* Total number of row hash collisions */
374         unsigned long long packets;     /* Total number of packets sent in all flows in this table */
375         unsigned long long bytes;       /* Total number of bytes sent in all flows in this table */
376         time_t start_time;              /* Start time of the earliest flow in the table */
377         time_t end_time;                /* End time of the latest flow in the table */
378         /* Hashed list of flows */
379         struct flow *flows[FLOW_HASH_SIZE];
380         /* Ordered list of flows */
381         struct flow *flows_head, *flows_tail;
382 };
383
384 /* Hashed list of flow tables */
385 static struct flow_table *flow_tables[TABLE_HASH_SIZE];
386 /* Ordered list of flow tables */
387 static struct flow_table *flow_tables_head, *flow_tables_tail;
388
389 /* Maximum total number of outstanding allocated flows */
390 #define MAX_FLOWS 65536                         /* ip_conntrack_max on a 1 GB machine */
391 static int flows;
392
393 /* Double buffer the ordered list and dump it in another thread */
394 static struct flow_table *dump_head, *dump_tail;
395 static int dump_flows;
396 static pthread_t dump_thread;
397
398 static inline int
399 hash_flow_table(u_int32_t src_ip, int xid, char *slice)
400 {
401         if (slice)
402                 return jhash(slice, strlen(slice), salt) & (TABLE_HASH_SIZE - 1);
403         else
404                 return jhash_2words(src_ip, xid, salt) & (TABLE_HASH_SIZE - 1);
405 }
406
407 static struct flow_table *
408 get_flow_table(u_int32_t src_ip, int xid, char *slice)
409 {
410         struct flow_table *flow_table;
411         int i;
412
413         /* See if it already exists */
414         i = hash_flow_table(src_ip, xid, slice);
415         for (flow_table = flow_tables[i]; flow_table; flow_table = flow_table->nexth) {
416                 if (flow_table->src_ip == src_ip &&
417                     flow_table->xid == xid &&
418                     (!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1)))
419                         break;
420         }
421
422         if (!flow_table) {
423                 /* Allocate a new flow table */
424                 flow_table = malloc(sizeof(struct flow_table));
425                 if (!flow_table) {
426                         ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
427                         return NULL;
428                 }
429
430                 /* Initialize */
431                 memset(flow_table, 0, sizeof(struct flow_table));
432                 flow_table->src_ip = src_ip;
433                 flow_table->xid = xid;
434                 if (slice)
435                         strncpy(flow_table->slice, slice, sizeof(flow_table->slice) - 1);
436
437                 /* Add to hashed list */
438                 i = hash_flow_table(src_ip, xid, slice);
439                 flow_table->nexth = flow_tables[i];
440                 flow_tables[i] = flow_table;
441
442                 /* Add to ordered list */
443                 if (flow_tables_tail) {
444                         assert(flow_tables_head);
445                         flow_tables_tail->next = flow_table;
446                         flow_tables_tail = flow_table;
447                 } else {
448                         assert(!flow_tables_head);
449                         flow_tables_head = flow_tables_tail = flow_table;
450                 }
451         }
452
453         assert(flow_table);
454         assert(flow_table->src_ip == src_ip);
455         assert(flow_table->xid == xid);
456         assert(!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1));
457
458         return flow_table;
459 }
460
461 static inline int
462 hash_flow(u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
463 {
464         return jhash_3words(protocol, dst_ip, (src_port << 16) | dst_port, salt) & (FLOW_HASH_SIZE - 1);
465 }
466
467 static struct flow *
468 get_flow(u_int32_t src_ip, int xid, char *slice,
469          u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
470 {
471         struct flow_table *flow_table;
472         struct flow *flow;
473         int i;
474
475         if (xid != UNKNOWN_XID)
476                 flow_table = get_flow_table(src_ip, xid, slice);
477         else {
478                 /* Support searching for flows without specifying a context ID */
479                 flow_table = flow_tables_head;
480         }
481
482         for (; flow_table; flow_table = flow_table->next) {
483                 i = hash_flow(protocol, src_port, dst_ip, dst_port);
484                 for (flow = flow_table->flows[i]; flow; flow = flow->nexth) {
485                         if (flow->protocol == protocol &&
486                             flow->src_port == src_port &&
487                             flow->dst_ip == dst_ip &&
488                             flow->dst_port == dst_port)
489                                 break;
490                 }
491                 if (xid != UNKNOWN_XID)
492                         break;
493         }
494
495         if (!flow_table)
496                 return NULL;
497
498         if (!flow) {
499                 /* Allocate a new flow */
500                 flow = malloc(sizeof(struct flow));
501                 if (!flow) {
502                         ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
503                         return NULL;
504                 }
505
506                 /* Initialize */
507                 memset(flow, 0, sizeof(struct flow));
508                 flow->table = flow_table;
509                 flow->protocol = protocol;
510                 flow->src_port = src_port;
511                 flow->dst_ip = dst_ip;
512                 flow->dst_port = dst_port;
513
514                 /* Add to hashed list */
515                 i = hash_flow(protocol, src_port, dst_ip, dst_port);
516                 flow->nexth = flow_table->flows[i];
517                 flow_table->flows[i] = flow;
518
519                 /* Add to ordered list */
520                 if (flow_table->flows_tail) {
521                         assert(flow_table->flows_head);
522                         flow_table->flows_tail->next = flow;
523                         flow_table->flows_tail = flow;
524                 } else {
525                         assert(!flow_table->flows_head);
526                         flow_table->flows_head = flow_table->flows_tail = flow;
527                 }
528
529                 /* Update table statistics */
530                 flow_table->collisions += flow->nexth ? 1 : 0;
531                 flow_table->rows++;
532
533                 /* Update total number of outstanding flows */
534                 flows++;
535         }
536
537         assert(flow);
538         assert(flow->table == flow_table);
539         assert(flow->protocol == protocol);
540         assert(flow->src_port == src_port);
541         assert(flow->dst_ip == dst_ip);
542         assert(flow->dst_port == dst_port);
543
544         return flow;
545 }
546
547 /*
548  * The dump thread imports one set of flows while the collection
549  * thread continues to record new flows.
550  */
551
552 void *
553 dump_interval(void *unused)
554 {
555         MYSQL *dbh;
556         struct flow_table *flow_table, *next_flow_table;
557         struct flow *flow, *next_flow, bind_flow;
558         struct slice *slice;
559         char slice_name[64];
560         char table_name[NAME_LEN + 1], *c;
561         struct tm tm;
562         MYSQL_STMT *stmt;
563         char query[512];
564         unsigned int tables, table_collisions, row_collisions, rows;
565         unsigned long long packets, bytes;
566         struct timeval start, end, interval_start;
567         int fd;
568         FILE *fp;
569
570         /* Time this interval */
571         gettimeofday(&interval_start, NULL);
572
573         /* Hash slice name lookups */
574         hash_passwd();
575
576         /* Open, lock, and truncate CSV file */
577         if ((fd = open(csv.u.string, O_CREAT | O_WRONLY, 0644)) >= 0) {
578                 int tries = 10;
579
580                 while (flock(fd, LOCK_EX | LOCK_NB)) {
581                         sleep(1);
582                         tries--;
583                 }
584                 if (tries == 0)
585                         ulogd_log(ULOGD_ERROR, "Could not acquire lock on %s\n", csv.u.string);
586                 if (!(fp = fdopen(fd, "w")))
587                         close(fd);
588                 else
589                         ftruncate(fd, 0);
590         } else
591                 fp = NULL;
592
593         /* Connect to DB */
594         if (!(dbh = mysql_init(NULL)))
595                 ulogd_log(ULOGD_ERROR, "mysql_init: failed\n");
596         if (dbh && !mysql_real_connect(dbh,
597                                        mysqlhost.u.string[0] ? mysqlhost.u.string : NULL,
598                                        mysqluser.u.string,
599                                        mysqlpass.u.string[0] ? mysqlpass.u.string : NULL,
600                                        mysqldb.u.string,
601                                        0, NULL, 0)) {
602                 ulogd_log(ULOGD_ERROR,
603                           "%s@%s:%s: %s\n",
604                           mysqluser.u.string,
605                           mysqlhost.u.string[0] ? mysqlhost.u.string : "localhost",
606                           mysqldb.u.string,
607                           mysql_error(dbh));
608                 mysql_close(dbh);
609                 dbh = NULL;
610         }
611
612         if (dbh) {
613                 /* All times in the database are in GMT */
614                 mysql_query(dbh, "SET time_zone='+00:00'");
615         }
616
617         /* Initialize statistics */
618         tables = table_collisions = row_collisions = rows = 0;
619         packets = bytes = 0;
620
621         assert(dump_head);
622
623         for (flow_table = dump_head; flow_table; flow_table = next_flow_table) {
624                 /* Keep track of some basic statistics */
625                 tables++;
626                 table_collisions += flow_table->nexth ? 1 : 0;
627                 row_collisions += flow_table->collisions;
628                 rows += flow_table->rows;
629                 packets += flow_table->packets;
630                 bytes += flow_table->bytes;
631
632                 /* Get slice name */
633                 slice_name[sizeof(slice_name) - 1] = '\0';
634                 switch (flow_table->xid) {
635                 case CONNECTION_REFUSED_XID:
636                         strncpy(slice_name, "connection-refused", sizeof(slice_name));
637                         break;
638                 case ICMP_ECHOREPLY_XID:
639                         strncpy(slice_name, "icmp-reply", sizeof(slice_name));
640                         break;
641                 case ICMP_UNREACH_XID:
642                         strncpy(slice_name, "icmp-unreachable", sizeof(slice_name));
643                         break;
644                 default:
645                         if ((slice = get_slice(flow_table->xid)))
646                                 strncpy(slice_name, slice->name, sizeof(slice_name));
647                         else if (flow_table->slice[0])
648                                 strncpy(slice_name, flow_table->slice, sizeof(slice_name));
649                         else
650                                 snprintf(slice_name, sizeof(slice_name), "%u", flow_table->xid);
651                         break;
652                 }
653
654                 if (dbh) {
655                         /* Get day */
656                         gmtime_r(&flow_table->start_time, &tm);
657
658                         /* Form a human readable table name */
659                         snprintf(table_name, sizeof(table_name),
660                                  "%u_%u_%u_%u_%s_%u_%02u_%02u",
661                                  IPQUAD(flow_table->src_ip),
662                                  slice_name,
663                                  1900 + tm.tm_year, tm.tm_mon + 1, tm.tm_mday);
664                         assert(strlen(table_name) <= NAME_LEN);
665
666                         /* Replace punctation with underscores */
667                         for (c = table_name; *c; c++) {
668                                 if (ispunct(*c))
669                                         *c = '_';
670                         }
671
672                         /* Time database operations on each table */
673                         gettimeofday(&start, NULL);
674
675                         /* Insert/update table summary */
676                         if (mysql_queryf(dbh,
677                                          "INSERT INTO flow_tables "
678                                          "(table_name,src_ip,slice,"
679                                          "start_time,end_time,packets,bytes) "
680                                          "VALUES ('%s',%u,'%s',"
681                                          "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
682                                          "ON DUPLICATE KEY UPDATE "
683                                          "end_time=FROM_UNIXTIME(%u),"
684                                          "packets=packets+%llu,"
685                                          "bytes=bytes+%llu",
686                                          table_name,
687                                          flow_table->src_ip,
688                                          slice_name,
689                                          (unsigned) flow_table->start_time,
690                                          (unsigned) flow_table->end_time,
691                                          flow_table->packets,
692                                          flow_table->bytes,
693                                          (unsigned) flow_table->end_time,
694                                          flow_table->packets,
695                                          flow_table->bytes))
696                                 goto free_flow_table;
697
698                         /* Create table */
699                         if (mysql_queryf(dbh,
700                                          "CREATE TABLE IF NOT EXISTS %s LIKE flow_template",
701                                          table_name))
702                                 goto free_flow_table;
703
704                         /* Lock for speed */
705                         mysql_queryf(dbh, "ALTER TABLE %s DISABLE KEYS", table_name);
706                         mysql_queryf(dbh, "LOCK TABLES %s WRITE", table_name);
707
708                         /* Prepare the insert statement */
709                         if (!(stmt = mysql_stmt_init(dbh)))
710                                 ulogd_log(ULOGD_ERROR, "mysql_stmt_init: %s\n", mysql_error(dbh));
711                         else {
712                                 my_bool is_null = 0;
713                                 MYSQL_BIND bind[] = {
714                                         { .buffer_type = MYSQL_TYPE_TINY,
715                                           .buffer = &bind_flow.protocol,
716                                           .buffer_length = sizeof(bind_flow.protocol),
717                                           .length = &bind[0].buffer_length,
718                                           .is_null = &is_null,
719                                           .is_unsigned = 1, },
720                                         { .buffer_type = MYSQL_TYPE_SHORT,
721                                           .buffer = &bind_flow.src_port,
722                                           .buffer_length = sizeof(bind_flow.src_port),
723                                           .length = &bind[1].buffer_length,
724                                           .is_null = &is_null,
725                                           .is_unsigned = 1, },
726                                         { .buffer_type = MYSQL_TYPE_LONG,
727                                           .buffer = &bind_flow.dst_ip,
728                                           .buffer_length = sizeof(bind_flow.dst_ip),
729                                           .length = &bind[2].buffer_length,
730                                           .is_null = &is_null,
731                                           .is_unsigned = 1, },
732                                         { .buffer_type = MYSQL_TYPE_SHORT,
733                                           .buffer = &bind_flow.dst_port,
734                                           .buffer_length = sizeof(bind_flow.dst_port),
735                                           .length = &bind[3].buffer_length,
736                                           .is_null = &is_null,
737                                           .is_unsigned = 1, },
738                                         { .buffer_type = MYSQL_TYPE_LONG,
739                                           .buffer = &bind_flow.start_time,
740                                           .buffer_length = sizeof(bind_flow.start_time),
741                                           .length = &bind[4].buffer_length,
742                                           .is_null = &is_null,
743                                           .is_unsigned = 1, },
744                                         { .buffer_type = MYSQL_TYPE_LONG,
745                                           .buffer = &bind_flow.end_time,
746                                           .buffer_length = sizeof(bind_flow.end_time),
747                                           .length = &bind[5].buffer_length,
748                                           .is_null = &is_null,
749                                           .is_unsigned = 1, },
750                                         { .buffer_type = MYSQL_TYPE_LONGLONG,
751                                           .buffer = &bind_flow.packets,
752                                           .buffer_length = sizeof(bind_flow.packets),
753                                           .length = &bind[6].buffer_length,
754                                           .is_null = &is_null,
755                                           .is_unsigned = 1, },
756                                         { .buffer_type = MYSQL_TYPE_LONGLONG,
757                                           .buffer = &bind_flow.bytes,
758                                           .buffer_length = sizeof(bind_flow.bytes),
759                                           .length = &bind[7].buffer_length,
760                                           .is_null = &is_null,
761                                           .is_unsigned = 1, },
762                                         { .buffer_type = MYSQL_TYPE_LONG,
763                                           .buffer = &bind_flow.end_time,
764                                           .buffer_length = sizeof(bind_flow.end_time),
765                                           .length = &bind[8].buffer_length,
766                                           .is_null = &is_null,
767                                           .is_unsigned = 1, },
768                                         { .buffer_type = MYSQL_TYPE_LONGLONG,
769                                           .buffer = &bind_flow.packets,
770                                           .buffer_length = sizeof(bind_flow.packets),
771                                           .length = &bind[9].buffer_length,
772                                           .is_null = &is_null,
773                                           .is_unsigned = 1, },
774                                         { .buffer_type = MYSQL_TYPE_LONGLONG,
775                                           .buffer = &bind_flow.bytes,
776                                           .buffer_length = sizeof(bind_flow.bytes),
777                                           .length = &bind[10].buffer_length,
778                                           .is_null = &is_null,
779                                           .is_unsigned = 1, },
780                                 };
781
782                                 snprintf(query, sizeof(query),
783                                          "INSERT INTO %s "
784                                          "(protocol,src_port,dst_ip,dst_port,"
785                                          "start_time,end_time,packets,bytes) "
786                                          "VALUES (?,?,?,?,"
787                                          "FROM_UNIXTIME(?),FROM_UNIXTIME(?),?,?) "
788                                          "ON DUPLICATE KEY UPDATE "
789                                          "end_time=FROM_UNIXTIME(?),"
790                                          "packets=packets+?,"
791                                          "bytes=bytes+?",
792                                          table_name);
793
794                                 if (mysql_stmt_prepare(stmt, query, strlen(query)) ||
795                                     mysql_stmt_bind_param(stmt, bind)) {
796                                         ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
797                                         mysql_stmt_close(stmt);
798                                         stmt = NULL;
799                                 }
800                         }
801                 } else
802                         stmt = NULL;
803
804                 for (flow = flow_table->flows_head; flow; flow = flow->next) {
805                         /* Dump to CSV */
806                         if (fp) {
807                                 fprintf(fp,
808                                         "%s,%u,"
809                                         "%u.%u.%u.%u,%u,"
810                                         "%u.%u.%u.%u,%u,"
811                                         "%u,%u,%llu,%llu\n",
812                                         slice_name, flow->protocol,
813                                         IPQUAD(flow_table->src_ip), flow->src_port,
814                                         IPQUAD(flow->dst_ip), flow->dst_port,
815                                         (unsigned) flow->start_time,
816                                         (unsigned) flow->end_time,
817                                         flow->packets,
818                                         flow->bytes);
819                         }
820
821                         /* Insert/update flow record */
822                         if (dbh) {
823                                 if (stmt) {
824                                         bind_flow = *flow;
825                                         if (mysql_stmt_execute(stmt))
826                                                 ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
827                                 } else {
828                                         mysql_queryf(dbh,
829                                                      "INSERT INTO %s "
830                                                      "(protocol,src_port,dst_ip,dst_port,"
831                                                      "start_time,end_time,packets,bytes) "
832                                                      "VALUES (%u,%u,%u,%u,"
833                                                      "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
834                                                      "ON DUPLICATE KEY UPDATE "
835                                                      "end_time=FROM_UNIXTIME(%u),"
836                                                      "packets=packets+%llu,"
837                                                      "bytes=bytes+%llu",
838                                                      table_name,
839                                                      flow->protocol,
840                                                      flow->src_port,
841                                                      flow->dst_ip,
842                                                      flow->dst_port,
843                                                      (unsigned) flow->start_time,
844                                                      (unsigned) flow->end_time,
845                                                      flow->packets,
846                                                      flow->bytes,
847                                                      (unsigned) flow->end_time,
848                                                      flow->packets,
849                                                      flow->bytes);
850                                 }
851                         }
852                 }
853
854                 if (dbh) {
855                         if (stmt)
856                                 mysql_stmt_close(stmt);
857
858                         /* Unlock */
859                         mysql_query(dbh, "UNLOCK TABLES");
860                         mysql_queryf(dbh, "ALTER TABLE %s ENABLE KEYS", table_name);
861
862                         /* Update table summary */
863                         mysql_queryf(dbh,
864                                      "UPDATE flow_tables "
865                                      "SET flows=(SELECT COUNT(protocol) FROM %s) "
866                                      "WHERE table_name='%s'",
867                                      table_name,
868                                      table_name);
869
870                         gettimeofday(&end, NULL);
871                         timersub(&end, &start, &end);
872                 }
873
874                 ulogd_log(ULOGD_DEBUG,
875                           "Updated %u rows in %s in %u.%06u s (%u collisions, %llu packets, %llu bytes)\n",
876                           flow_table->rows, table_name,
877                           (unsigned) end.tv_sec, (unsigned) end.tv_usec,
878                           flow_table->collisions, flow_table->packets, flow_table->bytes);
879
880         free_flow_table:
881                 for (flow = flow_table->flows_head; flow; flow = next_flow) {
882                         flow_table->rows--;
883                         flow_table->packets -= flow->packets;
884                         flow_table->bytes -= flow->bytes;
885                         dump_flows--;
886                         next_flow = flow->next;
887                         assert(next_flow || flow == flow_table->flows_tail);
888                         free(flow);
889                 }
890
891                 assert(!flow_table->rows);
892                 assert(!flow_table->packets);
893                 assert(!flow_table->bytes);
894
895                 next_flow_table = flow_table->next;
896                 assert(next_flow_table || flow_table == dump_tail);
897                 free(flow_table);
898         }
899
900         assert(!dump_flows);
901         dump_head = dump_tail = NULL;
902
903         if (dbh) {
904                 mysql_close(dbh);
905
906                 gettimeofday(&end, NULL);
907                 timersub(&end, &interval_start, &end);
908
909                 ulogd_log(ULOGD_NOTICE,
910                           "Updated %u rows in %u tables (%u bytes) in %u.%06u s "
911                           "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
912                           rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
913                           (unsigned) end.tv_sec, (unsigned) end.tv_usec,
914                           table_collisions, row_collisions, packets, bytes);
915         } else {
916                 /* Could not connect to database */
917                 ulogd_log(ULOGD_ERROR,
918                           "Lost %u rows in %u tables (%u bytes) "
919                           "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
920                           rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
921                           table_collisions, row_collisions, packets, bytes);
922         }
923
924         if (fp) {
925                 fclose(fp);
926                 assert(fd >= 0);
927                 close(fd);
928         }
929
930         pthread_exit(NULL);
931         return NULL;
932 }
933
934 static void
935 start_dump(void)
936 {
937         /* Wait for previous dump thread to complete */
938         if (dump_thread) {
939                 pthread_join(dump_thread, NULL);
940                 dump_thread = 0;
941         }
942
943         /* Switch buffers */
944         assert(!dump_head);
945         assert(!dump_tail);
946         assert(!dump_flows);
947         dump_head = flow_tables_head;
948         dump_tail = flow_tables_tail;
949         dump_flows = flows;
950
951         /* Start up the dump thread if necessary */
952         if (pthread_create(&dump_thread, NULL, dump_interval, NULL)) {
953                 ulogd_log(ULOGD_ERROR, "pthread_create: %s\n", strerror(errno));
954                 /* Try again later */
955                 dump_thread = 0;
956                 dump_head = dump_tail = NULL;
957                 dump_flows = 0;
958         } else {
959                 /* Clear hash */
960                 memset(flow_tables, 0, sizeof(flow_tables));
961                 flow_tables_head = flow_tables_tail = NULL;
962                 flows = 0;
963         }
964 }
965
966 static void
967 update_flow(struct flow *flow, time_t now, unsigned int packets, unsigned int bytes)
968 {
969         /* Update flow */
970         if (now > flow->end_time)
971                 flow->end_time = now;
972         if (!flow->start_time || now < flow->start_time)
973                 flow->start_time = now;
974         flow->packets += packets;
975         flow->bytes += bytes;
976
977         /* Update table summary */
978         if (now > flow->table->end_time)
979                 flow->table->end_time = now;
980         if (!flow->table->start_time || now < flow->table->start_time)
981                 flow->table->start_time = now;
982         flow->table->packets += packets;
983         flow->table->bytes += bytes;
984 }
985
986 #ifndef STANDALONE
987
988 struct intr_id {
989         char* name;
990         ulog_iret_t *res;
991 };
992
993 /* Interesting keys */
994 enum {
995         OOB_TIME_SEC = 0,
996         OOB_MARK,
997         IP_SADDR,
998         IP_DADDR,
999         IP_TOTLEN,
1000         IP_PROTOCOL,
1001         TCP_SPORT,
1002         TCP_DPORT,
1003         TCP_ACK,
1004         TCP_RST,
1005         UDP_SPORT,
1006         UDP_DPORT,
1007         ICMP_TYPE,
1008         ICMP_CODE,
1009         GRE_FLAG_KEY,
1010         GRE_VERSION,
1011         GRE_KEY,
1012         PPTP_CALLID,
1013 };
1014
1015 #define INTR_IDS        (sizeof(intr_ids)/sizeof(intr_ids[0]))
1016 static struct intr_id intr_ids[] = {
1017         [OOB_TIME_SEC] = { "oob.time.sec", 0 },
1018         [OOB_MARK] = { "oob.mark", 0 },
1019         [IP_SADDR] = { "ip.saddr", 0 },
1020         [IP_DADDR] = { "ip.daddr", 0 },
1021         [IP_TOTLEN] = { "ip.totlen", 0 },
1022         [IP_PROTOCOL] = { "ip.protocol", 0 },
1023         [TCP_SPORT] = { "tcp.sport", 0 },
1024         [TCP_DPORT] { "tcp.dport", 0 },
1025         [TCP_ACK] = { "tcp.ack", 0 },
1026         [TCP_RST] = { "tcp.rst", 0 },
1027         [UDP_SPORT] = { "udp.sport", 0 },
1028         [UDP_DPORT] = { "udp.dport", 0 },
1029         [ICMP_TYPE] = { "icmp.type", 0 },
1030         [ICMP_CODE] = { "icmp.code", 0 },
1031         [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
1032         [GRE_VERSION] = { "gre.version", 0 },
1033         [GRE_KEY] = { "gre.key", 0 },
1034         [PPTP_CALLID] = { "pptp.callid", 0 },
1035 };
1036
1037 #define GET_VALUE(x)    intr_ids[x].res->value
1038
1039 #define DATE(t)         ((t) / (24*60*60) * (24*60*60))
1040
1041 static int _output_netflow(ulog_iret_t *res)
1042 {
1043         u_int8_t protocol;
1044         u_int32_t src_ip, dst_ip;
1045         u_int16_t src_port, dst_port;
1046         int xid;
1047         struct flow *flow = NULL;
1048         time_t now;
1049
1050         now = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
1051
1052         if (flow_tables_head) {
1053                 /* If we have collected for at least 5 minutes, or
1054                  * collected the maximum number of flows, or it is now
1055                  * the next day, dump this interval.
1056                  */
1057                 if ((now - flow_tables_head->start_time) >= (interval.u.value * 60) ||
1058                     flows >= MAX_FLOWS ||
1059                     DATE(flow_tables_head->start_time) != DATE(now)) {
1060                         /* Out of memory */
1061                         if (flows >= MAX_FLOWS)
1062                                 ulogd_log(ULOGD_ERROR, "dumping %d flows early\n", flows);
1063
1064                         start_dump();
1065                 }
1066         }
1067
1068         protocol = GET_VALUE(IP_PROTOCOL).ui8;
1069         src_ip = GET_VALUE(IP_SADDR).ui32;
1070         dst_ip = GET_VALUE(IP_DADDR).ui32;
1071         xid = GET_VALUE(OOB_MARK).ui32;
1072
1073         switch (protocol) {
1074
1075         case IPPROTO_TCP:
1076                 src_port = GET_VALUE(TCP_SPORT).ui16;
1077                 dst_port = GET_VALUE(TCP_DPORT).ui16;
1078
1079                 /* check for root termination */
1080                 if (xid == ROOT_XID) {
1081                         if ((flow = get_flow(src_ip, UNKNOWN_XID, NULL, protocol, src_port, dst_ip, dst_port))) {
1082                                 /*
1083                                  * this is supposed to catch some of the cases where the
1084                                  * network stack responds on behalf of the user but the
1085                                  * slice is incorrectly accounted for, e.g. on socket
1086                                  * shutdown
1087                                  */
1088                                 assert(flow->table);
1089                                 xid = flow->table->xid;
1090                         } else {
1091                                 /*
1092                                  * we have not seen any packets on this flow during the
1093                                  * current interval, check for the connection refused
1094                                  */
1095                                 if (GET_VALUE(TCP_RST).b && GET_VALUE(TCP_ACK).b)
1096                                         xid = CONNECTION_REFUSED_XID;
1097                         }
1098                 }
1099                 break;
1100
1101         case IPPROTO_UDP:
1102                 /*
1103                  * we could record the source port, however this pretty much
1104                  * kills any notion of UDP flows and therefore consume large
1105                  * quantities of space, so we set the source port to 0
1106                  * tuple.sport = GET_VALUE(UDP_SPORT).ui16;
1107                  */
1108                 src_port = 0;
1109
1110                 /*
1111                  * traceroutes create a large number of flows in the db
1112                  * this is a quick hack to catch the most common form
1113                  * of traceroute (basically we're mapping any UDP packet
1114                  * in the 33435-33524 range to the "trace" port, 33524 is
1115                  * 3 packets * nhops (30).
1116                  */
1117                 dst_port = GET_VALUE(UDP_DPORT).ui16;
1118                 if (dst_port >= 33435 && dst_port <= 33524)
1119                         dst_port = 33435;
1120                 break;
1121
1122         case IPPROTO_ICMP:
1123                 src_port = GET_VALUE(ICMP_TYPE).ui8;
1124                 dst_port = GET_VALUE(ICMP_CODE).ui8;
1125
1126                 /*
1127                  * We special case some of the ICMP traffic that the kernel
1128                  * always generates. Since this is attributed to root, it 
1129                  * creates significant "noise" in the output. We want to be
1130                  * able to quickly see that root is generating traffic.
1131                  */
1132                 if (xid == ROOT_XID) {
1133                         if (src_port == ICMP_ECHOREPLY)
1134                                 xid = ICMP_ECHOREPLY_XID;
1135                         else if (src_port == ICMP_UNREACH)
1136                                 xid = ICMP_UNREACH_XID;
1137                 }
1138                 break;
1139
1140         case IPPROTO_GRE:
1141                 if (GET_VALUE(GRE_FLAG_KEY).b) {
1142                         if (GET_VALUE(GRE_VERSION).ui8 == 1) {
1143                                 /* Get PPTP call ID */
1144                                 src_port = GET_VALUE(PPTP_CALLID).ui16;
1145                         } else {
1146                                 /* XXX Truncate GRE keys to 16 bits */
1147                                 src_port = (u_int16_t) GET_VALUE(GRE_KEY).ui32;
1148                         }
1149                 } else {
1150                         /* No key available */
1151                         src_port = 0;
1152                 }
1153                 dst_port = 0;
1154                 break;
1155
1156         default:
1157                 /* This is the default key for packets from unsupported protocols */
1158                 src_port = 0;
1159                 dst_port = 0;
1160                 break;
1161         }
1162
1163         /* Record the flow */
1164         if (!flow) {
1165                 flow = get_flow(src_ip, xid, NULL, protocol, src_port, dst_ip, dst_port);
1166                 if (!flow)
1167                         return -ENOMEM;
1168         }
1169
1170         /* Update the flow */
1171         update_flow(flow, now, 1, GET_VALUE(IP_TOTLEN).ui16);
1172
1173         return 0;
1174 }
1175
1176 /* get all key id's for the keys we are intrested in */
1177 static int get_ids(void)
1178 {
1179         int i;
1180         struct intr_id *cur_id;
1181
1182         for (i = 0; i < INTR_IDS; i++) {
1183                 cur_id = &intr_ids[i];
1184                 cur_id->res = keyh_getres(keyh_getid(cur_id->name));
1185                 if (!cur_id->res) {
1186                         ulogd_log(ULOGD_ERROR, 
1187                                 "Cannot resolve keyhash id for %s\n", 
1188                                 cur_id->name);
1189                         return 1;
1190                 }
1191         }       
1192         return 0;
1193 }
1194
1195
1196 static int _netflow_init(void)
1197 {
1198         /* have the opts parsed */
1199         config_parse_file("NETFLOW",config_entries);
1200
1201         if (get_ids()) {
1202                 ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1203                 exit(2);
1204         }
1205
1206         /* Seed the hash function */
1207         salt = getpid() ^ time(NULL);
1208
1209         return 0;
1210 }
1211
1212
1213 static void _netflow_fini(void)
1214 {
1215   /* should probably stop the dump thread?! */
1216
1217   /* do nothing */
1218 }
1219
1220
1221 static ulog_output_t netflow_op = {
1222   .name = "netflow",
1223   .output = &_output_netflow,
1224   .init = _netflow_init,
1225   .fini = _netflow_fini,
1226 };
1227
1228 void _init(void)
1229 {
1230   register_output(&netflow_op);
1231 }
1232
1233 #else
1234
1235 static FILE *logfile = NULL;            /* logfile pointer */
1236 static int loglevel = 5;                /* current loglevel */
1237
1238 /* log message to the logfile */
1239 void __ulogd_log(int level, char *file, int line, const char *format, ...)
1240 {
1241         char *timestr;
1242         va_list ap;
1243         time_t tm;
1244         FILE *outfd;
1245
1246         /* log only messages which have level at least as high as loglevel */
1247         if (level < loglevel)
1248                 return;
1249
1250         if (logfile)
1251                 outfd = logfile;
1252         else
1253                 outfd = stderr;
1254
1255         va_start(ap, format);
1256
1257         tm = time(NULL);
1258         timestr = ctime(&tm);
1259         timestr[strlen(timestr)-1] = '\0';
1260         fprintf(outfd, "%s <%1.1d> %s:%d ", timestr, level, file, line);
1261         
1262         vfprintf(outfd, format, ap);
1263         va_end(ap);
1264
1265         /* flush glibc's buffer */
1266         fflush(outfd);
1267 }
1268
1269 int
1270 main(int argc, char **argv)
1271 {
1272         /* Get options */
1273         while (1) {
1274                 int option_index = 0;
1275                 static struct option long_options[] = {
1276                         { "user", required_argument, NULL, 'u' },
1277                         { "database", required_argument, NULL, 'd' },
1278                         { "password", required_argument, NULL, 'p' },
1279                         { "verbose", required_argument, NULL, 'v' },
1280                         { "host", required_argument, NULL, 'h' },
1281                         { "help", required_argument, NULL, '?' },
1282                         { 0, 0, 0, 0 }
1283                 };
1284                 struct option *opt;
1285                 int c;
1286
1287                 c = getopt_long(argc, argv, "u:d:p:v:h:", long_options, &option_index);
1288                 if (c == -1)
1289                         break;
1290
1291                 switch (c) {
1292                 case 'u':
1293                         strncpy(mysqluser.u.string, optarg, sizeof(mysqluser.u.string));
1294                         break;
1295                 case 'd':
1296                         strncpy(mysqldb.u.string, optarg, sizeof(mysqldb.u.string));
1297                         break;
1298                 case 'p':
1299                         strncpy(mysqlpass.u.string, optarg, sizeof(mysqlpass.u.string));
1300                         break;
1301                 case 'v':
1302                         loglevel = atoi(optarg);
1303                         break;
1304                 case 'h':
1305                         strncpy(mysqlhost.u.string, optarg, sizeof(mysqlhost.u.string));
1306                         break;
1307                 default:
1308                         fprintf(stderr, "usage: %s [OPTION]... YY/mm-dd.log[.bz2|.gz]...\n", argv[0]);
1309                         for (opt = long_options; opt->name; opt++)
1310                                 fprintf(stderr, "\t-%c, --%s%s\n", opt->val, opt->name, required_argument ? "=ARGUMENT" : "");
1311                         return 1;
1312                 }
1313         }
1314
1315         /* Seed the hash function */
1316         salt = getpid() ^ time(NULL);
1317
1318         /* Don't lookup slice names in /etc/passwd */
1319         strcpy(slicemap.u.string, "/dev/null");
1320
1321         /* All times in the log files are in GMT */
1322         putenv("TZ=GMT");
1323
1324         /* Parse the rest of the non-option arguments (files to import) */
1325         for (argv = &argv[optind]; *argv; argv++) {
1326                 char pathname[PATH_MAX], *s, *next;
1327                 const char *cmd = NULL, *opts = NULL;
1328                 pid_t pid = 0;
1329                 FILE *fp = NULL;
1330                 char *line = NULL;
1331                 size_t len = 0;
1332                 int fds[2] = { -1, -1 };
1333                 time_t now;
1334                 struct tm tm;
1335
1336                 if (!realpath(*argv, pathname)) {
1337                         ulogd_log(ULOGD_ERROR, "%s: %s\n", *argv, strerror(errno));
1338                         goto next_file;
1339                 }
1340
1341                 /* We may need to fork a child to decompress the log file */
1342                 if (strstr(pathname, ".bz2")) {
1343                         cmd = "bzip2";
1344                         opts = "-cdfq";
1345                 } else if (strstr(pathname, ".gz")) {
1346                         cmd = "gzip";
1347                         opts = "-cdfq";
1348                 } else if (strstr(pathname, ".zip")) {
1349                         cmd = "unzip";
1350                         opts = "-p";
1351                 }
1352
1353                 /* Fork a child to decompress the log file */
1354                 if (cmd) {
1355                         /* Open a pipe */
1356                         if (pipe(fds)) {
1357                                 ulogd_log(ULOGD_ERROR, "pipe: %s\n", strerror(errno));
1358                                 goto next_file;
1359                         }
1360                         switch ((pid = fork())) {
1361                         case -1:
1362                                 ulogd_log(ULOGD_ERROR, "fork: %s\n", strerror(errno));
1363                                 goto next_file;
1364                         case 0:
1365                                 close(fds[0]);
1366                                 fds[0] = -1;
1367                                 /* Redirect stdout to the write end of the pipe */
1368                                 if (dup2(fds[1], fileno(stdout)) < 0) {
1369                                         ulogd_log(ULOGD_ERROR, "dup2: %s\n", strerror(errno));
1370                                         exit(errno);
1371                                 }
1372                                 execlp(cmd, cmd, opts, pathname, NULL);
1373                                 ulogd_log(ULOGD_ERROR, "execlp: %s\n", strerror(errno));
1374                                 goto next_file;
1375                         default:
1376                                 close(fds[1]);
1377                                 fds[1] = -1;
1378                                 /* Open the read end of the pipe */
1379                                 if (!(fp = fdopen(fds[0], "r"))) {
1380                                         ulogd_log(ULOGD_ERROR, "fdopen: %s\n", strerror(errno));
1381                                         goto next_file;
1382                                 }
1383                                 break;
1384                         }
1385                 }
1386
1387                 /* Just open the file */
1388                 else if (!(fp = fopen(pathname, "r"))) {
1389                         ulogd_log(ULOGD_ERROR, "%s: %s\n", pathname, strerror(errno));
1390                         goto next_file;
1391                 }
1392
1393                 /* Parse date from the pathname (e.g. [.*]/05/01-25.log[.bz2|gz]) */
1394                 now = time(NULL);
1395                 gmtime_r(&now, &tm);
1396
1397                 next = pathname;
1398                 while ((s = strsep(&next, "/"))) {
1399                         int mon, mday, year;
1400
1401                         if (sscanf(s, "%02u-%02u.log%*s", &mon, &mday) == 2) {
1402                                 tm.tm_mon = mon - 1;
1403                                 tm.tm_mday = mday;
1404                         } else if (strlen(s) == 2 && sscanf(s, "%02u", &year) == 1) {
1405                                 /* Use strptime(3) strategy: ...values in the range
1406                                  * 69-99 refer to years in the twentieth century
1407                                  * (1969-1999); values in the range 00-68 refer to
1408                                  * years in the twenty-first century (2000-2068).
1409                                  */
1410                                 if (year < 69)
1411                                         year += 100;
1412                                 tm.tm_year = year;
1413                         }
1414                 }
1415
1416                 /* Reset to midnight and recalculate derived fields with mktime() */
1417                 tm.tm_hour = tm.tm_min = tm.tm_sec = 0;
1418                 mktime(&tm);
1419
1420                 while (getline(&line, &len, fp) >= 0) {
1421                         char *s, *next;
1422                         char *slice;
1423                         u_int8_t protocol;
1424                         struct in_addr src_ip, dst_ip;
1425                         unsigned int src_port, dst_port;
1426                         unsigned int packets, bytes;
1427                         struct flow *flow;
1428
1429                         /* Parse the flow record */
1430                         next = line;
1431
1432                         if (!(s = strsep(&next, ",")) ||
1433                             sscanf(s, "%02u-%02u-%02u", &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3)
1434                                 continue;
1435                         now = mktime(&tm);
1436
1437                         if (!(s = strsep(&next, ",")))
1438                                 continue;
1439                         if (!strcasecmp(s, "TCP"))
1440                                 protocol = IPPROTO_TCP;
1441                         else if (!strcasecmp(s, "UDP"))
1442                                 protocol = IPPROTO_UDP;
1443                         else if (!strcasecmp(s, "ICMP"))
1444                                 protocol = IPPROTO_ICMP;
1445                         else if (!strcasecmp(s, "GRE"))
1446                                 protocol = IPPROTO_GRE;
1447                         else
1448                                 protocol = atoi(s);
1449
1450                         if (!(slice = strsep(&next, ",")))
1451                                 continue;
1452
1453                         if (!(s = strsep(&next, ",")) ||
1454                             !inet_aton(s, &src_ip))
1455                                 continue;
1456
1457                         if (!(s = strsep(&next, ",")) ||
1458                             sscanf(s, "%u", &src_port) != 1)
1459                                 continue;
1460
1461                         if (!(s = strsep(&next, ",")) ||
1462                             !inet_aton(s, &dst_ip))
1463                                 continue;
1464
1465                         if (!(s = strsep(&next, ",")) ||
1466                             sscanf(s, "%u", &dst_port) != 1)
1467                                 continue;
1468
1469                         if (!(s = strsep(&next, ",")) ||
1470                             sscanf(s, "%u", &packets) != 1)
1471                                 continue;
1472
1473                         if (!(s = strsep(&next, ",")) ||
1474                             sscanf(s, "%u", &bytes) != 1)
1475                                 continue;
1476
1477                         /* Record the flow */
1478                         flow = get_flow(ntohl(src_ip.s_addr), 0, slice,
1479                                         protocol, src_port, ntohl(dst_ip.s_addr), dst_port);
1480                         if (!flow)
1481                                 continue;
1482
1483                         /* Update flow */
1484                         update_flow(flow, now, packets, bytes);
1485                 }
1486                 if (line)
1487                         free(line);
1488
1489                 start_dump();
1490
1491         next_file:
1492                 if (pid && kill(pid, SIGTERM))
1493                         ulogd_log(ULOGD_ERROR, "kill: %s\n", strerror(errno));
1494                 wait(NULL);
1495                 if (fp)
1496                         fclose(fp);
1497                 if (fds[0] >= 0)
1498                         close(fds[0]);
1499                 if (fds[1] >= 0)
1500                         close(fds[1]);
1501         }
1502
1503         /* Wait for previous dump thread to complete */
1504         if (dump_thread) {
1505                 pthread_join(dump_thread, NULL);
1506                 dump_thread = 0;
1507         }
1508
1509         return 0;
1510 }
1511
1512 #endif