Importing all of DRL, including ulogd and all of its files.
[distributedratelimiting.git] / netflow / ulogd_NETFLOW.c
diff --git a/netflow/ulogd_NETFLOW.c b/netflow/ulogd_NETFLOW.c
new file mode 100644 (file)
index 0000000..5e7046e
--- /dev/null
@@ -0,0 +1,1512 @@
+/*
+ * ulogd output target for IP flow analysis
+ *
+ * Mark Huang <mlhuang@cs.princeton.edu>
+ * Copyright (C) 2004-2005 The Trustees of Princeton University
+ *
+ * Based on admindump.pl by Mic Bowman and Paul Brett
+ * Copyright (c) 2002 Intel Corporation
+ *
+ * $Id: ulogd_NETFLOW.c,v 1.19 2005/04/20 21:10:05 mlhuang Exp $
+ */
+
+/* Enable GNU glibc extensions */
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+
+/* va_start() and friends */
+#include <stdarg.h>
+
+/* ispunct() */
+#include <ctype.h>
+
+/* strstr() and friends */
+#include <string.h>
+
+/* dirname() and basename() */
+#include <libgen.h>
+
+/* fork() and wait() */
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/wait.h>
+
+/* fgetpwent() */
+#include <pwd.h>
+
+/* errno and assert() */
+#include <errno.h>
+#include <assert.h>
+
+/* getopt_long() */
+#include <getopt.h>
+
+/* time() and friends */
+#include <time.h>
+#include <sys/time.h>
+
+/* inet_aton() */
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+/* ICMP definitions */
+#include <netinet/ip.h>
+#include <netinet/ip_icmp.h>
+
+/* stat() */
+#include <sys/stat.h>
+
+/* pthread_create() */
+#include <pthread.h>
+
+/* flock() */
+#include <sys/file.h>
+
+#include <ulogd/ulogd.h>
+#include <ulogd/conffile.h>
+
+#if !defined(STANDALONE) && HAVE_LIBPROPER
+#include <proper/prop.h>
+#endif
+
+/*
+ * /etc/ulogd.conf configuration options
+ */
+
+/* Dump interval in minutes */
+static config_entry_t interval = {
+       .next = NULL,
+       .key = "interval",
+       .type = CONFIG_TYPE_INT,
+       .options = CONFIG_OPT_NONE,
+       .u = { .value = 5 },
+};
+
+/* Slice map (in /etc/passwd format) */
+static config_entry_t slicemap = {
+       .next = &interval,
+       .key = "slicemap",
+       .type = CONFIG_TYPE_STRING,
+       .options = CONFIG_OPT_NONE,
+       .u = { .string = "/etc/passwd" },
+};
+
+/* MySQL database name */
+static config_entry_t mysqldb = {
+       .next = &slicemap,
+       .key = "mysqldb",
+       .type = CONFIG_TYPE_STRING,
+       .options = CONFIG_OPT_NONE,
+       .u = { .string = "netflow" },
+};
+
+/* MySQL database user */
+static config_entry_t mysqluser = {
+       .next = &mysqldb,
+       .key = "mysqluser",
+       .type = CONFIG_TYPE_STRING,
+       .options = CONFIG_OPT_NONE,
+       .u = { .string = "netflow" },
+};
+
+/* MySQL database password */
+static config_entry_t mysqlpass = {
+       .next = &mysqluser,
+       .key = "mysqlpass",
+       .type = CONFIG_TYPE_STRING,
+       .options = CONFIG_OPT_NONE,
+       .u = { .string = "" },
+};
+
+/* MySQL database host */
+static config_entry_t mysqlhost = {
+       .next = &mysqlpass,
+       .key = "mysqlhost",
+       .type = CONFIG_TYPE_STRING,
+       .options = CONFIG_OPT_NONE,
+       .u = { .string = "" },
+};
+
+/* Latest flows in CSV format */
+static config_entry_t csv = {
+       .next = &mysqlhost,
+       .key = "csv",
+       .type = CONFIG_TYPE_STRING,
+       .options = CONFIG_OPT_NONE,
+       .u = { .string = "/var/www/html/flows.csv" },
+};
+
+#define config_entries (&csv)
+
+/*
+ * Debug functionality
+ */
+
+#ifdef DMALLOC
+#include <dmalloc.h>
+#endif
+
+#define NIPQUAD(addr) \
+       ((unsigned char *)&addr)[0], \
+       ((unsigned char *)&addr)[1], \
+       ((unsigned char *)&addr)[2], \
+       ((unsigned char *)&addr)[3]
+
+#define IPQUAD(addr) \
+       ((unsigned char *)&addr)[3], \
+       ((unsigned char *)&addr)[2], \
+       ((unsigned char *)&addr)[1], \
+       ((unsigned char *)&addr)[0]
+
+/*
+ * MySQL support
+ */
+
+#include <mysql/mysql.h>
+
+/* mysql_query() with printf() syntax */
+static int
+mysql_queryf(MYSQL *dbh, const char *fmt, ...)
+{
+       char *buf = NULL;
+       size_t len;
+       va_list ap;
+       int ret = 2 * strlen(fmt);
+
+       do {
+               len = ret + 1;
+               if (!(buf = realloc(buf, len))) {
+                       ulogd_log(ULOGD_ERROR, "realloc: %s\n", strerror(errno));
+                       ret = -errno;
+                       goto done;
+               }
+               va_start(ap, fmt);
+               ret = vsnprintf(buf, len, fmt, ap);
+               va_end(ap);
+               if (ret < 0) {
+                       ulogd_log(ULOGD_ERROR, "vsnprintf: %s\n", strerror(errno));
+                       goto done;
+               }
+       } while (ret >= len);
+
+       ret = mysql_query(dbh, buf);
+       if (ret)
+               ulogd_log(ULOGD_ERROR, "%s: %s\n", buf, mysql_error(dbh));
+
+ done:
+       if (buf)
+               free(buf);
+       return ret;
+}
+
+/*
+ * Jenkins hash support
+ */
+
+typedef u_int8_t u8;
+typedef u_int16_t u16;
+typedef u_int32_t u32;
+#include <linux/jhash.h>
+
+/* Salt for the hash functions */
+static int salt;
+
+/*
+ * Hash slice name lookups on context ID.
+ */
+
+/* Special context IDs */
+#define UNKNOWN_XID -1
+#define ROOT_XID 0
+
+enum {
+       CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
+       ICMP_ECHOREPLY_XID,
+       ICMP_UNREACH_XID,
+};
+
+/* A slice */
+struct slice {
+       struct slice *next;
+       int xid;
+       char *name;
+};
+
+/* Must be a power of 2 */
+#define SLICE_HASH_SIZE 128
+struct slice *slice_hash[SLICE_HASH_SIZE];
+
+static inline int
+hash_slice(int xid)
+{
+       return jhash_1word(xid, salt) & (SLICE_HASH_SIZE - 1);
+}
+
+static struct slice *
+get_slice(int xid)
+{
+       struct slice *slice;
+       int i;
+
+       i = hash_slice(xid);
+       for (slice = slice_hash[i]; slice; slice = slice->next) {
+               if (slice->xid == xid)
+                       break;
+       }
+
+       return slice;
+}
+
+static struct slice *
+add_slice(int xid, char *name)
+{
+       struct slice *slice;
+       int i;
+
+       slice = malloc(sizeof(struct slice));
+       if (!slice)
+               return NULL;
+       memset(slice, 0, sizeof(struct slice));
+
+       slice->xid = xid;
+       slice->name = strdup(name);
+       if (!slice->name) {
+               free(slice);
+               return NULL;
+       }
+
+       /* Add to hashed list */
+       i = hash_slice(xid);
+       slice->next = slice_hash[i];
+       slice_hash[i] = slice;
+
+       return slice;
+}
+
+static int
+hash_passwd(void)
+{
+       int fd;
+       FILE *fp;
+       struct passwd *pw;
+       struct slice *slice, *next;
+       int i;
+
+#if !defined(STANDALONE) && HAVE_LIBPROPER
+       if ((fd = prop_open_file(slicemap.u.string, PROP_OPEN_READ)) < 0) {
+               /* prop_open_file() returns -errno */
+               errno = -fd;
+       }
+#else
+       fd = open(slicemap.u.string, O_RDONLY);
+#endif
+
+       if (fd < 0 || !(fp = fdopen(fd, "r"))) {
+               ulogd_log(ULOGD_ERROR, "%s: %s\n", slicemap.u.string, strerror(errno));
+               if (fd >= 0)
+                       close(fd);
+               return errno;
+       }
+
+       /* Clean hash */
+       for (i = 0; i < SLICE_HASH_SIZE; i++) {
+               for (slice = slice_hash[i]; slice; slice = next) {
+                       next = slice->next;
+                       assert(slice->name);
+                       free(slice->name);
+                       free(slice);
+               }
+               slice_hash[i] = NULL;
+       }
+
+       /* (Re)build slice hash */
+       while ((pw = fgetpwent(fp)))
+               add_slice(pw->pw_uid, pw->pw_name);
+
+       fclose(fp);
+       close(fd);
+       return 0;
+}
+
+/*
+ * Maintain flows in a two level hash table. The first level hashes
+ * on (src_ip, slice). The second level hashes on (protocol, dst_ip,
+ * src_port, dst_port). This structure mirrors how we store flows in
+ * the database (in many tables where each table contains flows with
+ * the same (src_ip, slice) parameters and unique (protocol,
+ * src_port, dst_ip, dst_port) rows).
+ */
+
+/* Must be powers of 2 */
+#ifdef STANDALONE
+#define TABLE_HASH_SIZE 16
+#define FLOW_HASH_SIZE 65536
+#else
+#define TABLE_HASH_SIZE 8
+#define FLOW_HASH_SIZE 1024
+#endif
+
+struct flow {
+       struct flow *nexth;             /* Next in hash */
+       struct flow *next;              /* Next in ordered list */
+       struct flow_table *table;       /* Back pointer */
+       u_int8_t protocol;              /* IP protocol number */
+       u_int16_t src_port;             /* IP source port (host order) */
+       u_int16_t dst_port;             /* IP destination port (host order) */
+       u_int32_t dst_ip;               /* IP destination address (host order) */
+       time_t start_time;              /* Timestamp of the first packet in the flow */
+       time_t end_time;                /* Timestamp of the first packet in the flow */
+       unsigned long long packets;     /* Number of IP packets */
+       unsigned long long bytes;       /* Number of IP bytes (including headers) */
+};
+
+struct flow_table {
+       struct flow_table *nexth;       /* Next in hash */
+       struct flow_table *next;        /* Next in ordered list */
+       u_int32_t src_ip;               /* IP source address (host order) */
+       int xid;                        /* Context ID */
+       char slice[32];                 /* Slice name */
+       unsigned long rows;             /* Total number of rows to insert/update */
+       unsigned long collisions;       /* Total number of row hash collisions */
+       unsigned long long packets;     /* Total number of packets sent in all flows in this table */
+       unsigned long long bytes;       /* Total number of bytes sent in all flows in this table */
+       time_t start_time;              /* Start time of the earliest flow in the table */
+       time_t end_time;                /* End time of the latest flow in the table */
+       /* Hashed list of flows */
+       struct flow *flows[FLOW_HASH_SIZE];
+       /* Ordered list of flows */
+       struct flow *flows_head, *flows_tail;
+};
+
+/* Hashed list of flow tables */
+static struct flow_table *flow_tables[TABLE_HASH_SIZE];
+/* Ordered list of flow tables */
+static struct flow_table *flow_tables_head, *flow_tables_tail;
+
+/* Maximum total number of outstanding allocated flows */
+#define MAX_FLOWS 65536                                /* ip_conntrack_max on a 1 GB machine */
+static int flows;
+
+/* Double buffer the ordered list and dump it in another thread */
+static struct flow_table *dump_head, *dump_tail;
+static int dump_flows;
+static pthread_t dump_thread;
+
+static inline int
+hash_flow_table(u_int32_t src_ip, int xid, char *slice)
+{
+       if (slice)
+               return jhash(slice, strlen(slice), salt) & (TABLE_HASH_SIZE - 1);
+       else
+               return jhash_2words(src_ip, xid, salt) & (TABLE_HASH_SIZE - 1);
+}
+
+static struct flow_table *
+get_flow_table(u_int32_t src_ip, int xid, char *slice)
+{
+       struct flow_table *flow_table;
+       int i;
+
+       /* See if it already exists */
+       i = hash_flow_table(src_ip, xid, slice);
+       for (flow_table = flow_tables[i]; flow_table; flow_table = flow_table->nexth) {
+               if (flow_table->src_ip == src_ip &&
+                   flow_table->xid == xid &&
+                   (!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1)))
+                       break;
+       }
+
+       if (!flow_table) {
+               /* Allocate a new flow table */
+               flow_table = malloc(sizeof(struct flow_table));
+               if (!flow_table) {
+                       ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
+                       return NULL;
+               }
+
+               /* Initialize */
+               memset(flow_table, 0, sizeof(struct flow_table));
+               flow_table->src_ip = src_ip;
+               flow_table->xid = xid;
+               if (slice)
+                       strncpy(flow_table->slice, slice, sizeof(flow_table->slice) - 1);
+
+               /* Add to hashed list */
+               i = hash_flow_table(src_ip, xid, slice);
+               flow_table->nexth = flow_tables[i];
+               flow_tables[i] = flow_table;
+
+               /* Add to ordered list */
+               if (flow_tables_tail) {
+                       assert(flow_tables_head);
+                       flow_tables_tail->next = flow_table;
+                       flow_tables_tail = flow_table;
+               } else {
+                       assert(!flow_tables_head);
+                       flow_tables_head = flow_tables_tail = flow_table;
+               }
+       }
+
+       assert(flow_table);
+       assert(flow_table->src_ip == src_ip);
+       assert(flow_table->xid == xid);
+       assert(!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1));
+
+       return flow_table;
+}
+
+static inline int
+hash_flow(u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
+{
+       return jhash_3words(protocol, dst_ip, (src_port << 16) | dst_port, salt) & (FLOW_HASH_SIZE - 1);
+}
+
+static struct flow *
+get_flow(u_int32_t src_ip, int xid, char *slice,
+        u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
+{
+       struct flow_table *flow_table;
+       struct flow *flow;
+       int i;
+
+       if (xid != UNKNOWN_XID)
+               flow_table = get_flow_table(src_ip, xid, slice);
+       else {
+               /* Support searching for flows without specifying a context ID */
+               flow_table = flow_tables_head;
+       }
+
+       for (; flow_table; flow_table = flow_table->next) {
+               i = hash_flow(protocol, src_port, dst_ip, dst_port);
+               for (flow = flow_table->flows[i]; flow; flow = flow->nexth) {
+                       if (flow->protocol == protocol &&
+                           flow->src_port == src_port &&
+                           flow->dst_ip == dst_ip &&
+                           flow->dst_port == dst_port)
+                               break;
+               }
+               if (xid != UNKNOWN_XID)
+                       break;
+       }
+
+       if (!flow_table)
+               return NULL;
+
+       if (!flow) {
+               /* Allocate a new flow */
+               flow = malloc(sizeof(struct flow));
+               if (!flow) {
+                       ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
+                       return NULL;
+               }
+
+               /* Initialize */
+               memset(flow, 0, sizeof(struct flow));
+               flow->table = flow_table;
+               flow->protocol = protocol;
+               flow->src_port = src_port;
+               flow->dst_ip = dst_ip;
+               flow->dst_port = dst_port;
+
+               /* Add to hashed list */
+               i = hash_flow(protocol, src_port, dst_ip, dst_port);
+               flow->nexth = flow_table->flows[i];
+               flow_table->flows[i] = flow;
+
+               /* Add to ordered list */
+               if (flow_table->flows_tail) {
+                       assert(flow_table->flows_head);
+                       flow_table->flows_tail->next = flow;
+                       flow_table->flows_tail = flow;
+               } else {
+                       assert(!flow_table->flows_head);
+                       flow_table->flows_head = flow_table->flows_tail = flow;
+               }
+
+               /* Update table statistics */
+               flow_table->collisions += flow->nexth ? 1 : 0;
+               flow_table->rows++;
+
+               /* Update total number of outstanding flows */
+               flows++;
+       }
+
+       assert(flow);
+       assert(flow->table == flow_table);
+       assert(flow->protocol == protocol);
+       assert(flow->src_port == src_port);
+       assert(flow->dst_ip == dst_ip);
+       assert(flow->dst_port == dst_port);
+
+       return flow;
+}
+
+/*
+ * The dump thread imports one set of flows while the collection
+ * thread continues to record new flows.
+ */
+
+void *
+dump_interval(void *unused)
+{
+       MYSQL *dbh;
+       struct flow_table *flow_table, *next_flow_table;
+       struct flow *flow, *next_flow, bind_flow;
+       struct slice *slice;
+       char slice_name[64];
+       char table_name[NAME_LEN + 1], *c;
+       struct tm tm;
+       MYSQL_STMT *stmt;
+       char query[512];
+       unsigned int tables, table_collisions, row_collisions, rows;
+       unsigned long long packets, bytes;
+       struct timeval start, end, interval_start;
+       int fd;
+       FILE *fp;
+
+       /* Time this interval */
+       gettimeofday(&interval_start, NULL);
+
+       /* Hash slice name lookups */
+       hash_passwd();
+
+       /* Open, lock, and truncate CSV file */
+       if ((fd = open(csv.u.string, O_CREAT | O_WRONLY, 0644)) >= 0) {
+               int tries = 10;
+
+               while (flock(fd, LOCK_EX | LOCK_NB)) {
+                       sleep(1);
+                       tries--;
+               }
+               if (tries == 0)
+                       ulogd_log(ULOGD_ERROR, "Could not acquire lock on %s\n", csv.u.string);
+               if (!(fp = fdopen(fd, "w")))
+                       close(fd);
+               else
+                       ftruncate(fd, 0);
+       } else
+               fp = NULL;
+
+       /* Connect to DB */
+       if (!(dbh = mysql_init(NULL)))
+               ulogd_log(ULOGD_ERROR, "mysql_init: failed\n");
+       if (dbh && !mysql_real_connect(dbh,
+                                      mysqlhost.u.string[0] ? mysqlhost.u.string : NULL,
+                                      mysqluser.u.string,
+                                      mysqlpass.u.string[0] ? mysqlpass.u.string : NULL,
+                                      mysqldb.u.string,
+                                      0, NULL, 0)) {
+               ulogd_log(ULOGD_ERROR,
+                         "%s@%s:%s: %s\n",
+                         mysqluser.u.string,
+                         mysqlhost.u.string[0] ? mysqlhost.u.string : "localhost",
+                         mysqldb.u.string,
+                         mysql_error(dbh));
+               mysql_close(dbh);
+               dbh = NULL;
+       }
+
+       if (dbh) {
+               /* All times in the database are in GMT */
+               mysql_query(dbh, "SET time_zone='+00:00'");
+       }
+
+       /* Initialize statistics */
+       tables = table_collisions = row_collisions = rows = 0;
+       packets = bytes = 0;
+
+       assert(dump_head);
+
+       for (flow_table = dump_head; flow_table; flow_table = next_flow_table) {
+               /* Keep track of some basic statistics */
+               tables++;
+               table_collisions += flow_table->nexth ? 1 : 0;
+               row_collisions += flow_table->collisions;
+               rows += flow_table->rows;
+               packets += flow_table->packets;
+               bytes += flow_table->bytes;
+
+               /* Get slice name */
+               slice_name[sizeof(slice_name) - 1] = '\0';
+               switch (flow_table->xid) {
+               case CONNECTION_REFUSED_XID:
+                       strncpy(slice_name, "connection-refused", sizeof(slice_name));
+                       break;
+               case ICMP_ECHOREPLY_XID:
+                       strncpy(slice_name, "icmp-reply", sizeof(slice_name));
+                       break;
+               case ICMP_UNREACH_XID:
+                       strncpy(slice_name, "icmp-unreachable", sizeof(slice_name));
+                       break;
+               default:
+                       if ((slice = get_slice(flow_table->xid)))
+                               strncpy(slice_name, slice->name, sizeof(slice_name));
+                       else if (flow_table->slice[0])
+                               strncpy(slice_name, flow_table->slice, sizeof(slice_name));
+                       else
+                               snprintf(slice_name, sizeof(slice_name), "%u", flow_table->xid);
+                       break;
+               }
+
+               if (dbh) {
+                       /* Get day */
+                       gmtime_r(&flow_table->start_time, &tm);
+
+                       /* Form a human readable table name */
+                       snprintf(table_name, sizeof(table_name),
+                                "%u_%u_%u_%u_%s_%u_%02u_%02u",
+                                IPQUAD(flow_table->src_ip),
+                                slice_name,
+                                1900 + tm.tm_year, tm.tm_mon + 1, tm.tm_mday);
+                       assert(strlen(table_name) <= NAME_LEN);
+
+                       /* Replace punctation with underscores */
+                       for (c = table_name; *c; c++) {
+                               if (ispunct(*c))
+                                       *c = '_';
+                       }
+
+                       /* Time database operations on each table */
+                       gettimeofday(&start, NULL);
+
+                       /* Insert/update table summary */
+                       if (mysql_queryf(dbh,
+                                        "INSERT INTO flow_tables "
+                                        "(table_name,src_ip,slice,"
+                                        "start_time,end_time,packets,bytes) "
+                                        "VALUES ('%s',%u,'%s',"
+                                        "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
+                                        "ON DUPLICATE KEY UPDATE "
+                                        "end_time=FROM_UNIXTIME(%u),"
+                                        "packets=packets+%llu,"
+                                        "bytes=bytes+%llu",
+                                        table_name,
+                                        flow_table->src_ip,
+                                        slice_name,
+                                        (unsigned) flow_table->start_time,
+                                        (unsigned) flow_table->end_time,
+                                        flow_table->packets,
+                                        flow_table->bytes,
+                                        (unsigned) flow_table->end_time,
+                                        flow_table->packets,
+                                        flow_table->bytes))
+                               goto free_flow_table;
+
+                       /* Create table */
+                       if (mysql_queryf(dbh,
+                                        "CREATE TABLE IF NOT EXISTS %s LIKE flow_template",
+                                        table_name))
+                               goto free_flow_table;
+
+                       /* Lock for speed */
+                       mysql_queryf(dbh, "ALTER TABLE %s DISABLE KEYS", table_name);
+                       mysql_queryf(dbh, "LOCK TABLES %s WRITE", table_name);
+
+                       /* Prepare the insert statement */
+                       if (!(stmt = mysql_stmt_init(dbh)))
+                               ulogd_log(ULOGD_ERROR, "mysql_stmt_init: %s\n", mysql_error(dbh));
+                       else {
+                               my_bool is_null = 0;
+                               MYSQL_BIND bind[] = {
+                                       { .buffer_type = MYSQL_TYPE_TINY,
+                                         .buffer = &bind_flow.protocol,
+                                         .buffer_length = sizeof(bind_flow.protocol),
+                                         .length = &bind[0].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_SHORT,
+                                         .buffer = &bind_flow.src_port,
+                                         .buffer_length = sizeof(bind_flow.src_port),
+                                         .length = &bind[1].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONG,
+                                         .buffer = &bind_flow.dst_ip,
+                                         .buffer_length = sizeof(bind_flow.dst_ip),
+                                         .length = &bind[2].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_SHORT,
+                                         .buffer = &bind_flow.dst_port,
+                                         .buffer_length = sizeof(bind_flow.dst_port),
+                                         .length = &bind[3].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONG,
+                                         .buffer = &bind_flow.start_time,
+                                         .buffer_length = sizeof(bind_flow.start_time),
+                                         .length = &bind[4].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONG,
+                                         .buffer = &bind_flow.end_time,
+                                         .buffer_length = sizeof(bind_flow.end_time),
+                                         .length = &bind[5].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONGLONG,
+                                         .buffer = &bind_flow.packets,
+                                         .buffer_length = sizeof(bind_flow.packets),
+                                         .length = &bind[6].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONGLONG,
+                                         .buffer = &bind_flow.bytes,
+                                         .buffer_length = sizeof(bind_flow.bytes),
+                                         .length = &bind[7].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONG,
+                                         .buffer = &bind_flow.end_time,
+                                         .buffer_length = sizeof(bind_flow.end_time),
+                                         .length = &bind[8].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONGLONG,
+                                         .buffer = &bind_flow.packets,
+                                         .buffer_length = sizeof(bind_flow.packets),
+                                         .length = &bind[9].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                                       { .buffer_type = MYSQL_TYPE_LONGLONG,
+                                         .buffer = &bind_flow.bytes,
+                                         .buffer_length = sizeof(bind_flow.bytes),
+                                         .length = &bind[10].buffer_length,
+                                         .is_null = &is_null,
+                                         .is_unsigned = 1, },
+                               };
+
+                               snprintf(query, sizeof(query),
+                                        "INSERT INTO %s "
+                                        "(protocol,src_port,dst_ip,dst_port,"
+                                        "start_time,end_time,packets,bytes) "
+                                        "VALUES (?,?,?,?,"
+                                        "FROM_UNIXTIME(?),FROM_UNIXTIME(?),?,?) "
+                                        "ON DUPLICATE KEY UPDATE "
+                                        "end_time=FROM_UNIXTIME(?),"
+                                        "packets=packets+?,"
+                                        "bytes=bytes+?",
+                                        table_name);
+
+                               if (mysql_stmt_prepare(stmt, query, strlen(query)) ||
+                                   mysql_stmt_bind_param(stmt, bind)) {
+                                       ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
+                                       mysql_stmt_close(stmt);
+                                       stmt = NULL;
+                               }
+                       }
+               } else
+                       stmt = NULL;
+
+               for (flow = flow_table->flows_head; flow; flow = flow->next) {
+                       /* Dump to CSV */
+                       if (fp) {
+                               fprintf(fp,
+                                       "%s,%u,"
+                                       "%u.%u.%u.%u,%u,"
+                                       "%u.%u.%u.%u,%u,"
+                                       "%u,%u,%llu,%llu\n",
+                                       slice_name, flow->protocol,
+                                       IPQUAD(flow_table->src_ip), flow->src_port,
+                                       IPQUAD(flow->dst_ip), flow->dst_port,
+                                       (unsigned) flow->start_time,
+                                       (unsigned) flow->end_time,
+                                       flow->packets,
+                                       flow->bytes);
+                       }
+
+                       /* Insert/update flow record */
+                       if (dbh) {
+                               if (stmt) {
+                                       bind_flow = *flow;
+                                       if (mysql_stmt_execute(stmt))
+                                               ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
+                               } else {
+                                       mysql_queryf(dbh,
+                                                    "INSERT INTO %s "
+                                                    "(protocol,src_port,dst_ip,dst_port,"
+                                                    "start_time,end_time,packets,bytes) "
+                                                    "VALUES (%u,%u,%u,%u,"
+                                                    "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
+                                                    "ON DUPLICATE KEY UPDATE "
+                                                    "end_time=FROM_UNIXTIME(%u),"
+                                                    "packets=packets+%llu,"
+                                                    "bytes=bytes+%llu",
+                                                    table_name,
+                                                    flow->protocol,
+                                                    flow->src_port,
+                                                    flow->dst_ip,
+                                                    flow->dst_port,
+                                                    (unsigned) flow->start_time,
+                                                    (unsigned) flow->end_time,
+                                                    flow->packets,
+                                                    flow->bytes,
+                                                    (unsigned) flow->end_time,
+                                                    flow->packets,
+                                                    flow->bytes);
+                               }
+                       }
+               }
+
+               if (dbh) {
+                       if (stmt)
+                               mysql_stmt_close(stmt);
+
+                       /* Unlock */
+                       mysql_query(dbh, "UNLOCK TABLES");
+                       mysql_queryf(dbh, "ALTER TABLE %s ENABLE KEYS", table_name);
+
+                       /* Update table summary */
+                       mysql_queryf(dbh,
+                                    "UPDATE flow_tables "
+                                    "SET flows=(SELECT COUNT(protocol) FROM %s) "
+                                    "WHERE table_name='%s'",
+                                    table_name,
+                                    table_name);
+
+                       gettimeofday(&end, NULL);
+                       timersub(&end, &start, &end);
+               }
+
+               ulogd_log(ULOGD_DEBUG,
+                         "Updated %u rows in %s in %u.%06u s (%u collisions, %llu packets, %llu bytes)\n",
+                         flow_table->rows, table_name,
+                         (unsigned) end.tv_sec, (unsigned) end.tv_usec,
+                         flow_table->collisions, flow_table->packets, flow_table->bytes);
+
+       free_flow_table:
+               for (flow = flow_table->flows_head; flow; flow = next_flow) {
+                       flow_table->rows--;
+                       flow_table->packets -= flow->packets;
+                       flow_table->bytes -= flow->bytes;
+                       dump_flows--;
+                       next_flow = flow->next;
+                       assert(next_flow || flow == flow_table->flows_tail);
+                       free(flow);
+               }
+
+               assert(!flow_table->rows);
+               assert(!flow_table->packets);
+               assert(!flow_table->bytes);
+
+               next_flow_table = flow_table->next;
+               assert(next_flow_table || flow_table == dump_tail);
+               free(flow_table);
+       }
+
+       assert(!dump_flows);
+       dump_head = dump_tail = NULL;
+
+       if (dbh) {
+               mysql_close(dbh);
+
+               gettimeofday(&end, NULL);
+               timersub(&end, &interval_start, &end);
+
+               ulogd_log(ULOGD_NOTICE,
+                         "Updated %u rows in %u tables (%u bytes) in %u.%06u s "
+                         "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
+                         rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
+                         (unsigned) end.tv_sec, (unsigned) end.tv_usec,
+                         table_collisions, row_collisions, packets, bytes);
+       } else {
+               /* Could not connect to database */
+               ulogd_log(ULOGD_ERROR,
+                         "Lost %u rows in %u tables (%u bytes) "
+                         "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
+                         rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
+                         table_collisions, row_collisions, packets, bytes);
+       }
+
+       if (fp) {
+               fclose(fp);
+               assert(fd >= 0);
+               close(fd);
+       }
+
+       pthread_exit(NULL);
+       return NULL;
+}
+
+static void
+start_dump(void)
+{
+       /* Wait for previous dump thread to complete */
+       if (dump_thread) {
+               pthread_join(dump_thread, NULL);
+               dump_thread = 0;
+       }
+
+       /* Switch buffers */
+       assert(!dump_head);
+       assert(!dump_tail);
+       assert(!dump_flows);
+       dump_head = flow_tables_head;
+       dump_tail = flow_tables_tail;
+       dump_flows = flows;
+
+       /* Start up the dump thread if necessary */
+       if (pthread_create(&dump_thread, NULL, dump_interval, NULL)) {
+               ulogd_log(ULOGD_ERROR, "pthread_create: %s\n", strerror(errno));
+               /* Try again later */
+               dump_thread = 0;
+               dump_head = dump_tail = NULL;
+               dump_flows = 0;
+       } else {
+               /* Clear hash */
+               memset(flow_tables, 0, sizeof(flow_tables));
+               flow_tables_head = flow_tables_tail = NULL;
+               flows = 0;
+       }
+}
+
+static void
+update_flow(struct flow *flow, time_t now, unsigned int packets, unsigned int bytes)
+{
+       /* Update flow */
+       if (now > flow->end_time)
+               flow->end_time = now;
+       if (!flow->start_time || now < flow->start_time)
+               flow->start_time = now;
+       flow->packets += packets;
+       flow->bytes += bytes;
+
+       /* Update table summary */
+       if (now > flow->table->end_time)
+               flow->table->end_time = now;
+       if (!flow->table->start_time || now < flow->table->start_time)
+               flow->table->start_time = now;
+       flow->table->packets += packets;
+       flow->table->bytes += bytes;
+}
+
+#ifndef STANDALONE
+
+struct intr_id {
+       char* name;
+       ulog_iret_t *res;
+};
+
+/* Interesting keys */
+enum {
+       OOB_TIME_SEC = 0,
+       OOB_MARK,
+       IP_SADDR,
+       IP_DADDR,
+       IP_TOTLEN,
+       IP_PROTOCOL,
+       TCP_SPORT,
+       TCP_DPORT,
+       TCP_ACK,
+       TCP_RST,
+       UDP_SPORT,
+       UDP_DPORT,
+       ICMP_TYPE,
+       ICMP_CODE,
+       GRE_FLAG_KEY,
+       GRE_VERSION,
+       GRE_KEY,
+       PPTP_CALLID,
+};
+
+#define INTR_IDS       (sizeof(intr_ids)/sizeof(intr_ids[0]))
+static struct intr_id intr_ids[] = {
+       [OOB_TIME_SEC] = { "oob.time.sec", 0 },
+       [OOB_MARK] = { "oob.mark", 0 },
+       [IP_SADDR] = { "ip.saddr", 0 },
+       [IP_DADDR] = { "ip.daddr", 0 },
+       [IP_TOTLEN] = { "ip.totlen", 0 },
+       [IP_PROTOCOL] = { "ip.protocol", 0 },
+       [TCP_SPORT] = { "tcp.sport", 0 },
+       [TCP_DPORT] { "tcp.dport", 0 },
+       [TCP_ACK] = { "tcp.ack", 0 },
+       [TCP_RST] = { "tcp.rst", 0 },
+       [UDP_SPORT] = { "udp.sport", 0 },
+       [UDP_DPORT] = { "udp.dport", 0 },
+       [ICMP_TYPE] = { "icmp.type", 0 },
+       [ICMP_CODE] = { "icmp.code", 0 },
+       [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
+       [GRE_VERSION] = { "gre.version", 0 },
+       [GRE_KEY] = { "gre.key", 0 },
+       [PPTP_CALLID] = { "pptp.callid", 0 },
+};
+
+#define GET_VALUE(x)   intr_ids[x].res->value
+
+#define DATE(t)                ((t) / (24*60*60) * (24*60*60))
+
+static int _output_netflow(ulog_iret_t *res)
+{
+       u_int8_t protocol;
+       u_int32_t src_ip, dst_ip;
+       u_int16_t src_port, dst_port;
+       int xid;
+       struct flow *flow = NULL;
+       time_t now;
+
+       now = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
+
+       if (flow_tables_head) {
+               /* If we have collected for at least 5 minutes, or
+                * collected the maximum number of flows, or it is now
+                * the next day, dump this interval.
+                */
+               if ((now - flow_tables_head->start_time) >= (interval.u.value * 60) ||
+                   flows >= MAX_FLOWS ||
+                   DATE(flow_tables_head->start_time) != DATE(now)) {
+                       /* Out of memory */
+                       if (flows >= MAX_FLOWS)
+                               ulogd_log(ULOGD_ERROR, "dumping %d flows early\n", flows);
+
+                       start_dump();
+               }
+       }
+
+       protocol = GET_VALUE(IP_PROTOCOL).ui8;
+       src_ip = GET_VALUE(IP_SADDR).ui32;
+       dst_ip = GET_VALUE(IP_DADDR).ui32;
+       xid = GET_VALUE(OOB_MARK).ui32;
+
+       switch (protocol) {
+
+       case IPPROTO_TCP:
+               src_port = GET_VALUE(TCP_SPORT).ui16;
+               dst_port = GET_VALUE(TCP_DPORT).ui16;
+
+               /* check for root termination */
+               if (xid == ROOT_XID) {
+                       if ((flow = get_flow(src_ip, UNKNOWN_XID, NULL, protocol, src_port, dst_ip, dst_port))) {
+                               /*
+                                * this is supposed to catch some of the cases where the
+                                * network stack responds on behalf of the user but the
+                                * slice is incorrectly accounted for, e.g. on socket
+                                * shutdown
+                                */
+                               assert(flow->table);
+                               xid = flow->table->xid;
+                       } else {
+                               /*
+                                * we have not seen any packets on this flow during the
+                                * current interval, check for the connection refused
+                                */
+                               if (GET_VALUE(TCP_RST).b && GET_VALUE(TCP_ACK).b)
+                                       xid = CONNECTION_REFUSED_XID;
+                       }
+               }
+               break;
+
+       case IPPROTO_UDP:
+               /*
+                * we could record the source port, however this pretty much
+                * kills any notion of UDP flows and therefore consume large
+                * quantities of space, so we set the source port to 0
+                * tuple.sport = GET_VALUE(UDP_SPORT).ui16;
+                */
+               src_port = 0;
+
+               /*
+                * traceroutes create a large number of flows in the db
+                * this is a quick hack to catch the most common form
+                * of traceroute (basically we're mapping any UDP packet
+                * in the 33435-33524 range to the "trace" port, 33524 is
+                * 3 packets * nhops (30).
+                */
+               dst_port = GET_VALUE(UDP_DPORT).ui16;
+               if (dst_port >= 33435 && dst_port <= 33524)
+                       dst_port = 33435;
+               break;
+
+       case IPPROTO_ICMP:
+               src_port = GET_VALUE(ICMP_TYPE).ui8;
+               dst_port = GET_VALUE(ICMP_CODE).ui8;
+
+               /*
+                * We special case some of the ICMP traffic that the kernel
+                * always generates. Since this is attributed to root, it 
+                * creates significant "noise" in the output. We want to be
+                * able to quickly see that root is generating traffic.
+                */
+               if (xid == ROOT_XID) {
+                       if (src_port == ICMP_ECHOREPLY)
+                               xid = ICMP_ECHOREPLY_XID;
+                       else if (src_port == ICMP_UNREACH)
+                               xid = ICMP_UNREACH_XID;
+               }
+               break;
+
+       case IPPROTO_GRE:
+               if (GET_VALUE(GRE_FLAG_KEY).b) {
+                       if (GET_VALUE(GRE_VERSION).ui8 == 1) {
+                               /* Get PPTP call ID */
+                               src_port = GET_VALUE(PPTP_CALLID).ui16;
+                       } else {
+                               /* XXX Truncate GRE keys to 16 bits */
+                               src_port = (u_int16_t) GET_VALUE(GRE_KEY).ui32;
+                       }
+               } else {
+                       /* No key available */
+                       src_port = 0;
+               }
+               dst_port = 0;
+               break;
+
+       default:
+               /* This is the default key for packets from unsupported protocols */
+               src_port = 0;
+               dst_port = 0;
+               break;
+       }
+
+       /* Record the flow */
+       if (!flow) {
+               flow = get_flow(src_ip, xid, NULL, protocol, src_port, dst_ip, dst_port);
+               if (!flow)
+                       return -ENOMEM;
+       }
+
+       /* Update the flow */
+       update_flow(flow, now, 1, GET_VALUE(IP_TOTLEN).ui16);
+
+       return 0;
+}
+
+/* get all key id's for the keys we are intrested in */
+static int get_ids(void)
+{
+       int i;
+       struct intr_id *cur_id;
+
+       for (i = 0; i < INTR_IDS; i++) {
+               cur_id = &intr_ids[i];
+               cur_id->res = keyh_getres(keyh_getid(cur_id->name));
+               if (!cur_id->res) {
+                       ulogd_log(ULOGD_ERROR, 
+                               "Cannot resolve keyhash id for %s\n", 
+                               cur_id->name);
+                       return 1;
+               }
+       }       
+       return 0;
+}
+
+
+static int _netflow_init(void)
+{
+       /* have the opts parsed */
+       config_parse_file("NETFLOW",config_entries);
+
+       if (get_ids()) {
+               ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
+               exit(2);
+       }
+
+       /* Seed the hash function */
+       salt = getpid() ^ time(NULL);
+
+       return 0;
+}
+
+
+static void _netflow_fini(void)
+{
+  /* should probably stop the dump thread?! */
+
+  /* do nothing */
+}
+
+
+static ulog_output_t netflow_op = {
+  .name = "netflow",
+  .output = &_output_netflow,
+  .init = _netflow_init,
+  .fini = _netflow_fini,
+};
+
+void _init(void)
+{
+  register_output(&netflow_op);
+}
+
+#else
+
+static FILE *logfile = NULL;           /* logfile pointer */
+static int loglevel = 5;               /* current loglevel */
+
+/* log message to the logfile */
+void __ulogd_log(int level, char *file, int line, const char *format, ...)
+{
+       char *timestr;
+       va_list ap;
+       time_t tm;
+       FILE *outfd;
+
+       /* log only messages which have level at least as high as loglevel */
+       if (level < loglevel)
+               return;
+
+       if (logfile)
+               outfd = logfile;
+       else
+               outfd = stderr;
+
+       va_start(ap, format);
+
+       tm = time(NULL);
+       timestr = ctime(&tm);
+       timestr[strlen(timestr)-1] = '\0';
+       fprintf(outfd, "%s <%1.1d> %s:%d ", timestr, level, file, line);
+       
+       vfprintf(outfd, format, ap);
+       va_end(ap);
+
+       /* flush glibc's buffer */
+       fflush(outfd);
+}
+
+int
+main(int argc, char **argv)
+{
+       /* Get options */
+       while (1) {
+               int option_index = 0;
+               static struct option long_options[] = {
+                       { "user", required_argument, NULL, 'u' },
+                       { "database", required_argument, NULL, 'd' },
+                       { "password", required_argument, NULL, 'p' },
+                       { "verbose", required_argument, NULL, 'v' },
+                       { "host", required_argument, NULL, 'h' },
+                       { "help", required_argument, NULL, '?' },
+                       { 0, 0, 0, 0 }
+               };
+               struct option *opt;
+               int c;
+
+               c = getopt_long(argc, argv, "u:d:p:v:h:", long_options, &option_index);
+               if (c == -1)
+                       break;
+
+               switch (c) {
+               case 'u':
+                       strncpy(mysqluser.u.string, optarg, sizeof(mysqluser.u.string));
+                       break;
+               case 'd':
+                       strncpy(mysqldb.u.string, optarg, sizeof(mysqldb.u.string));
+                       break;
+               case 'p':
+                       strncpy(mysqlpass.u.string, optarg, sizeof(mysqlpass.u.string));
+                       break;
+               case 'v':
+                       loglevel = atoi(optarg);
+                       break;
+               case 'h':
+                       strncpy(mysqlhost.u.string, optarg, sizeof(mysqlhost.u.string));
+                       break;
+               default:
+                       fprintf(stderr, "usage: %s [OPTION]... YY/mm-dd.log[.bz2|.gz]...\n", argv[0]);
+                       for (opt = long_options; opt->name; opt++)
+                               fprintf(stderr, "\t-%c, --%s%s\n", opt->val, opt->name, required_argument ? "=ARGUMENT" : "");
+                       return 1;
+               }
+       }
+
+       /* Seed the hash function */
+       salt = getpid() ^ time(NULL);
+
+       /* Don't lookup slice names in /etc/passwd */
+       strcpy(slicemap.u.string, "/dev/null");
+
+       /* All times in the log files are in GMT */
+       putenv("TZ=GMT");
+
+       /* Parse the rest of the non-option arguments (files to import) */
+       for (argv = &argv[optind]; *argv; argv++) {
+               char pathname[PATH_MAX], *s, *next;
+               const char *cmd = NULL, *opts = NULL;
+               pid_t pid = 0;
+               FILE *fp = NULL;
+               char *line = NULL;
+               size_t len = 0;
+               int fds[2] = { -1, -1 };
+               time_t now;
+               struct tm tm;
+
+               if (!realpath(*argv, pathname)) {
+                       ulogd_log(ULOGD_ERROR, "%s: %s\n", *argv, strerror(errno));
+                       goto next_file;
+               }
+
+               /* We may need to fork a child to decompress the log file */
+               if (strstr(pathname, ".bz2")) {
+                       cmd = "bzip2";
+                       opts = "-cdfq";
+               } else if (strstr(pathname, ".gz")) {
+                       cmd = "gzip";
+                       opts = "-cdfq";
+               } else if (strstr(pathname, ".zip")) {
+                       cmd = "unzip";
+                       opts = "-p";
+               }
+
+               /* Fork a child to decompress the log file */
+               if (cmd) {
+                       /* Open a pipe */
+                       if (pipe(fds)) {
+                               ulogd_log(ULOGD_ERROR, "pipe: %s\n", strerror(errno));
+                               goto next_file;
+                       }
+                       switch ((pid = fork())) {
+                       case -1:
+                               ulogd_log(ULOGD_ERROR, "fork: %s\n", strerror(errno));
+                               goto next_file;
+                       case 0:
+                               close(fds[0]);
+                               fds[0] = -1;
+                               /* Redirect stdout to the write end of the pipe */
+                               if (dup2(fds[1], fileno(stdout)) < 0) {
+                                       ulogd_log(ULOGD_ERROR, "dup2: %s\n", strerror(errno));
+                                       exit(errno);
+                               }
+                               execlp(cmd, cmd, opts, pathname, NULL);
+                               ulogd_log(ULOGD_ERROR, "execlp: %s\n", strerror(errno));
+                               goto next_file;
+                       default:
+                               close(fds[1]);
+                               fds[1] = -1;
+                               /* Open the read end of the pipe */
+                               if (!(fp = fdopen(fds[0], "r"))) {
+                                       ulogd_log(ULOGD_ERROR, "fdopen: %s\n", strerror(errno));
+                                       goto next_file;
+                               }
+                               break;
+                       }
+               }
+
+               /* Just open the file */
+               else if (!(fp = fopen(pathname, "r"))) {
+                       ulogd_log(ULOGD_ERROR, "%s: %s\n", pathname, strerror(errno));
+                       goto next_file;
+               }
+
+               /* Parse date from the pathname (e.g. [.*]/05/01-25.log[.bz2|gz]) */
+               now = time(NULL);
+               gmtime_r(&now, &tm);
+
+               next = pathname;
+               while ((s = strsep(&next, "/"))) {
+                       int mon, mday, year;
+
+                       if (sscanf(s, "%02u-%02u.log%*s", &mon, &mday) == 2) {
+                               tm.tm_mon = mon - 1;
+                               tm.tm_mday = mday;
+                       } else if (strlen(s) == 2 && sscanf(s, "%02u", &year) == 1) {
+                               /* Use strptime(3) strategy: ...values in the range
+                                * 69-99 refer to years in the twentieth century
+                                * (1969-1999); values in the range 00-68 refer to
+                                * years in the twenty-first century (2000-2068).
+                                */
+                               if (year < 69)
+                                       year += 100;
+                               tm.tm_year = year;
+                       }
+               }
+
+               /* Reset to midnight and recalculate derived fields with mktime() */
+               tm.tm_hour = tm.tm_min = tm.tm_sec = 0;
+               mktime(&tm);
+
+               while (getline(&line, &len, fp) >= 0) {
+                       char *s, *next;
+                       char *slice;
+                       u_int8_t protocol;
+                       struct in_addr src_ip, dst_ip;
+                       unsigned int src_port, dst_port;
+                       unsigned int packets, bytes;
+                       struct flow *flow;
+
+                       /* Parse the flow record */
+                       next = line;
+
+                       if (!(s = strsep(&next, ",")) ||
+                           sscanf(s, "%02u-%02u-%02u", &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3)
+                               continue;
+                       now = mktime(&tm);
+
+                       if (!(s = strsep(&next, ",")))
+                               continue;
+                       if (!strcasecmp(s, "TCP"))
+                               protocol = IPPROTO_TCP;
+                       else if (!strcasecmp(s, "UDP"))
+                               protocol = IPPROTO_UDP;
+                       else if (!strcasecmp(s, "ICMP"))
+                               protocol = IPPROTO_ICMP;
+                       else if (!strcasecmp(s, "GRE"))
+                               protocol = IPPROTO_GRE;
+                       else
+                               protocol = atoi(s);
+
+                       if (!(slice = strsep(&next, ",")))
+                               continue;
+
+                       if (!(s = strsep(&next, ",")) ||
+                           !inet_aton(s, &src_ip))
+                               continue;
+
+                       if (!(s = strsep(&next, ",")) ||
+                           sscanf(s, "%u", &src_port) != 1)
+                               continue;
+
+                       if (!(s = strsep(&next, ",")) ||
+                           !inet_aton(s, &dst_ip))
+                               continue;
+
+                       if (!(s = strsep(&next, ",")) ||
+                           sscanf(s, "%u", &dst_port) != 1)
+                               continue;
+
+                       if (!(s = strsep(&next, ",")) ||
+                           sscanf(s, "%u", &packets) != 1)
+                               continue;
+
+                       if (!(s = strsep(&next, ",")) ||
+                           sscanf(s, "%u", &bytes) != 1)
+                               continue;
+
+                       /* Record the flow */
+                       flow = get_flow(ntohl(src_ip.s_addr), 0, slice,
+                                       protocol, src_port, ntohl(dst_ip.s_addr), dst_port);
+                       if (!flow)
+                               continue;
+
+                       /* Update flow */
+                       update_flow(flow, now, packets, bytes);
+               }
+               if (line)
+                       free(line);
+
+               start_dump();
+
+       next_file:
+               if (pid && kill(pid, SIGTERM))
+                       ulogd_log(ULOGD_ERROR, "kill: %s\n", strerror(errno));
+               wait(NULL);
+               if (fp)
+                       fclose(fp);
+               if (fds[0] >= 0)
+                       close(fds[0]);
+               if (fds[1] >= 0)
+                       close(fds[1]);
+       }
+
+       /* Wait for previous dump thread to complete */
+       if (dump_thread) {
+               pthread_join(dump_thread, NULL);
+               dump_thread = 0;
+       }
+
+       return 0;
+}
+
+#endif