--- /dev/null
+/*
+ * ulogd output target for IP flow analysis
+ *
+ * Mark Huang <mlhuang@cs.princeton.edu>
+ * Copyright (C) 2004-2005 The Trustees of Princeton University
+ *
+ * Based on admindump.pl by Mic Bowman and Paul Brett
+ * Copyright (c) 2002 Intel Corporation
+ *
+ * $Id: ulogd_NETFLOW.c,v 1.19 2005/04/20 21:10:05 mlhuang Exp $
+ */
+
+/* Enable GNU glibc extensions */
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+
+/* va_start() and friends */
+#include <stdarg.h>
+
+/* ispunct() */
+#include <ctype.h>
+
+/* strstr() and friends */
+#include <string.h>
+
+/* dirname() and basename() */
+#include <libgen.h>
+
+/* fork() and wait() */
+#include <sys/types.h>
+#include <unistd.h>
+#include <sys/wait.h>
+
+/* fgetpwent() */
+#include <pwd.h>
+
+/* errno and assert() */
+#include <errno.h>
+#include <assert.h>
+
+/* getopt_long() */
+#include <getopt.h>
+
+/* time() and friends */
+#include <time.h>
+#include <sys/time.h>
+
+/* inet_aton() */
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+/* ICMP definitions */
+#include <netinet/ip.h>
+#include <netinet/ip_icmp.h>
+
+/* stat() */
+#include <sys/stat.h>
+
+/* pthread_create() */
+#include <pthread.h>
+
+/* flock() */
+#include <sys/file.h>
+
+#include <ulogd/ulogd.h>
+#include <ulogd/conffile.h>
+
+#if !defined(STANDALONE) && HAVE_LIBPROPER
+#include <proper/prop.h>
+#endif
+
+/*
+ * /etc/ulogd.conf configuration options
+ */
+
+/* Dump interval in minutes */
+static config_entry_t interval = {
+ .next = NULL,
+ .key = "interval",
+ .type = CONFIG_TYPE_INT,
+ .options = CONFIG_OPT_NONE,
+ .u = { .value = 5 },
+};
+
+/* Slice map (in /etc/passwd format) */
+static config_entry_t slicemap = {
+ .next = &interval,
+ .key = "slicemap",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "/etc/passwd" },
+};
+
+/* MySQL database name */
+static config_entry_t mysqldb = {
+ .next = &slicemap,
+ .key = "mysqldb",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "netflow" },
+};
+
+/* MySQL database user */
+static config_entry_t mysqluser = {
+ .next = &mysqldb,
+ .key = "mysqluser",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "netflow" },
+};
+
+/* MySQL database password */
+static config_entry_t mysqlpass = {
+ .next = &mysqluser,
+ .key = "mysqlpass",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "" },
+};
+
+/* MySQL database host */
+static config_entry_t mysqlhost = {
+ .next = &mysqlpass,
+ .key = "mysqlhost",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "" },
+};
+
+/* Latest flows in CSV format */
+static config_entry_t csv = {
+ .next = &mysqlhost,
+ .key = "csv",
+ .type = CONFIG_TYPE_STRING,
+ .options = CONFIG_OPT_NONE,
+ .u = { .string = "/var/www/html/flows.csv" },
+};
+
+#define config_entries (&csv)
+
+/*
+ * Debug functionality
+ */
+
+#ifdef DMALLOC
+#include <dmalloc.h>
+#endif
+
+#define NIPQUAD(addr) \
+ ((unsigned char *)&addr)[0], \
+ ((unsigned char *)&addr)[1], \
+ ((unsigned char *)&addr)[2], \
+ ((unsigned char *)&addr)[3]
+
+#define IPQUAD(addr) \
+ ((unsigned char *)&addr)[3], \
+ ((unsigned char *)&addr)[2], \
+ ((unsigned char *)&addr)[1], \
+ ((unsigned char *)&addr)[0]
+
+/*
+ * MySQL support
+ */
+
+#include <mysql/mysql.h>
+
+/* mysql_query() with printf() syntax */
+static int
+mysql_queryf(MYSQL *dbh, const char *fmt, ...)
+{
+ char *buf = NULL;
+ size_t len;
+ va_list ap;
+ int ret = 2 * strlen(fmt);
+
+ do {
+ len = ret + 1;
+ if (!(buf = realloc(buf, len))) {
+ ulogd_log(ULOGD_ERROR, "realloc: %s\n", strerror(errno));
+ ret = -errno;
+ goto done;
+ }
+ va_start(ap, fmt);
+ ret = vsnprintf(buf, len, fmt, ap);
+ va_end(ap);
+ if (ret < 0) {
+ ulogd_log(ULOGD_ERROR, "vsnprintf: %s\n", strerror(errno));
+ goto done;
+ }
+ } while (ret >= len);
+
+ ret = mysql_query(dbh, buf);
+ if (ret)
+ ulogd_log(ULOGD_ERROR, "%s: %s\n", buf, mysql_error(dbh));
+
+ done:
+ if (buf)
+ free(buf);
+ return ret;
+}
+
+/*
+ * Jenkins hash support
+ */
+
+typedef u_int8_t u8;
+typedef u_int16_t u16;
+typedef u_int32_t u32;
+#include <linux/jhash.h>
+
+/* Salt for the hash functions */
+static int salt;
+
+/*
+ * Hash slice name lookups on context ID.
+ */
+
+/* Special context IDs */
+#define UNKNOWN_XID -1
+#define ROOT_XID 0
+
+enum {
+ CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
+ ICMP_ECHOREPLY_XID,
+ ICMP_UNREACH_XID,
+};
+
+/* A slice */
+struct slice {
+ struct slice *next;
+ int xid;
+ char *name;
+};
+
+/* Must be a power of 2 */
+#define SLICE_HASH_SIZE 128
+struct slice *slice_hash[SLICE_HASH_SIZE];
+
+static inline int
+hash_slice(int xid)
+{
+ return jhash_1word(xid, salt) & (SLICE_HASH_SIZE - 1);
+}
+
+static struct slice *
+get_slice(int xid)
+{
+ struct slice *slice;
+ int i;
+
+ i = hash_slice(xid);
+ for (slice = slice_hash[i]; slice; slice = slice->next) {
+ if (slice->xid == xid)
+ break;
+ }
+
+ return slice;
+}
+
+static struct slice *
+add_slice(int xid, char *name)
+{
+ struct slice *slice;
+ int i;
+
+ slice = malloc(sizeof(struct slice));
+ if (!slice)
+ return NULL;
+ memset(slice, 0, sizeof(struct slice));
+
+ slice->xid = xid;
+ slice->name = strdup(name);
+ if (!slice->name) {
+ free(slice);
+ return NULL;
+ }
+
+ /* Add to hashed list */
+ i = hash_slice(xid);
+ slice->next = slice_hash[i];
+ slice_hash[i] = slice;
+
+ return slice;
+}
+
+static int
+hash_passwd(void)
+{
+ int fd;
+ FILE *fp;
+ struct passwd *pw;
+ struct slice *slice, *next;
+ int i;
+
+#if !defined(STANDALONE) && HAVE_LIBPROPER
+ if ((fd = prop_open_file(slicemap.u.string, PROP_OPEN_READ)) < 0) {
+ /* prop_open_file() returns -errno */
+ errno = -fd;
+ }
+#else
+ fd = open(slicemap.u.string, O_RDONLY);
+#endif
+
+ if (fd < 0 || !(fp = fdopen(fd, "r"))) {
+ ulogd_log(ULOGD_ERROR, "%s: %s\n", slicemap.u.string, strerror(errno));
+ if (fd >= 0)
+ close(fd);
+ return errno;
+ }
+
+ /* Clean hash */
+ for (i = 0; i < SLICE_HASH_SIZE; i++) {
+ for (slice = slice_hash[i]; slice; slice = next) {
+ next = slice->next;
+ assert(slice->name);
+ free(slice->name);
+ free(slice);
+ }
+ slice_hash[i] = NULL;
+ }
+
+ /* (Re)build slice hash */
+ while ((pw = fgetpwent(fp)))
+ add_slice(pw->pw_uid, pw->pw_name);
+
+ fclose(fp);
+ close(fd);
+ return 0;
+}
+
+/*
+ * Maintain flows in a two level hash table. The first level hashes
+ * on (src_ip, slice). The second level hashes on (protocol, dst_ip,
+ * src_port, dst_port). This structure mirrors how we store flows in
+ * the database (in many tables where each table contains flows with
+ * the same (src_ip, slice) parameters and unique (protocol,
+ * src_port, dst_ip, dst_port) rows).
+ */
+
+/* Must be powers of 2 */
+#ifdef STANDALONE
+#define TABLE_HASH_SIZE 16
+#define FLOW_HASH_SIZE 65536
+#else
+#define TABLE_HASH_SIZE 8
+#define FLOW_HASH_SIZE 1024
+#endif
+
+struct flow {
+ struct flow *nexth; /* Next in hash */
+ struct flow *next; /* Next in ordered list */
+ struct flow_table *table; /* Back pointer */
+ u_int8_t protocol; /* IP protocol number */
+ u_int16_t src_port; /* IP source port (host order) */
+ u_int16_t dst_port; /* IP destination port (host order) */
+ u_int32_t dst_ip; /* IP destination address (host order) */
+ time_t start_time; /* Timestamp of the first packet in the flow */
+ time_t end_time; /* Timestamp of the first packet in the flow */
+ unsigned long long packets; /* Number of IP packets */
+ unsigned long long bytes; /* Number of IP bytes (including headers) */
+};
+
+struct flow_table {
+ struct flow_table *nexth; /* Next in hash */
+ struct flow_table *next; /* Next in ordered list */
+ u_int32_t src_ip; /* IP source address (host order) */
+ int xid; /* Context ID */
+ char slice[32]; /* Slice name */
+ unsigned long rows; /* Total number of rows to insert/update */
+ unsigned long collisions; /* Total number of row hash collisions */
+ unsigned long long packets; /* Total number of packets sent in all flows in this table */
+ unsigned long long bytes; /* Total number of bytes sent in all flows in this table */
+ time_t start_time; /* Start time of the earliest flow in the table */
+ time_t end_time; /* End time of the latest flow in the table */
+ /* Hashed list of flows */
+ struct flow *flows[FLOW_HASH_SIZE];
+ /* Ordered list of flows */
+ struct flow *flows_head, *flows_tail;
+};
+
+/* Hashed list of flow tables */
+static struct flow_table *flow_tables[TABLE_HASH_SIZE];
+/* Ordered list of flow tables */
+static struct flow_table *flow_tables_head, *flow_tables_tail;
+
+/* Maximum total number of outstanding allocated flows */
+#define MAX_FLOWS 65536 /* ip_conntrack_max on a 1 GB machine */
+static int flows;
+
+/* Double buffer the ordered list and dump it in another thread */
+static struct flow_table *dump_head, *dump_tail;
+static int dump_flows;
+static pthread_t dump_thread;
+
+static inline int
+hash_flow_table(u_int32_t src_ip, int xid, char *slice)
+{
+ if (slice)
+ return jhash(slice, strlen(slice), salt) & (TABLE_HASH_SIZE - 1);
+ else
+ return jhash_2words(src_ip, xid, salt) & (TABLE_HASH_SIZE - 1);
+}
+
+static struct flow_table *
+get_flow_table(u_int32_t src_ip, int xid, char *slice)
+{
+ struct flow_table *flow_table;
+ int i;
+
+ /* See if it already exists */
+ i = hash_flow_table(src_ip, xid, slice);
+ for (flow_table = flow_tables[i]; flow_table; flow_table = flow_table->nexth) {
+ if (flow_table->src_ip == src_ip &&
+ flow_table->xid == xid &&
+ (!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1)))
+ break;
+ }
+
+ if (!flow_table) {
+ /* Allocate a new flow table */
+ flow_table = malloc(sizeof(struct flow_table));
+ if (!flow_table) {
+ ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
+ return NULL;
+ }
+
+ /* Initialize */
+ memset(flow_table, 0, sizeof(struct flow_table));
+ flow_table->src_ip = src_ip;
+ flow_table->xid = xid;
+ if (slice)
+ strncpy(flow_table->slice, slice, sizeof(flow_table->slice) - 1);
+
+ /* Add to hashed list */
+ i = hash_flow_table(src_ip, xid, slice);
+ flow_table->nexth = flow_tables[i];
+ flow_tables[i] = flow_table;
+
+ /* Add to ordered list */
+ if (flow_tables_tail) {
+ assert(flow_tables_head);
+ flow_tables_tail->next = flow_table;
+ flow_tables_tail = flow_table;
+ } else {
+ assert(!flow_tables_head);
+ flow_tables_head = flow_tables_tail = flow_table;
+ }
+ }
+
+ assert(flow_table);
+ assert(flow_table->src_ip == src_ip);
+ assert(flow_table->xid == xid);
+ assert(!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1));
+
+ return flow_table;
+}
+
+static inline int
+hash_flow(u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
+{
+ return jhash_3words(protocol, dst_ip, (src_port << 16) | dst_port, salt) & (FLOW_HASH_SIZE - 1);
+}
+
+static struct flow *
+get_flow(u_int32_t src_ip, int xid, char *slice,
+ u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
+{
+ struct flow_table *flow_table;
+ struct flow *flow;
+ int i;
+
+ if (xid != UNKNOWN_XID)
+ flow_table = get_flow_table(src_ip, xid, slice);
+ else {
+ /* Support searching for flows without specifying a context ID */
+ flow_table = flow_tables_head;
+ }
+
+ for (; flow_table; flow_table = flow_table->next) {
+ i = hash_flow(protocol, src_port, dst_ip, dst_port);
+ for (flow = flow_table->flows[i]; flow; flow = flow->nexth) {
+ if (flow->protocol == protocol &&
+ flow->src_port == src_port &&
+ flow->dst_ip == dst_ip &&
+ flow->dst_port == dst_port)
+ break;
+ }
+ if (xid != UNKNOWN_XID)
+ break;
+ }
+
+ if (!flow_table)
+ return NULL;
+
+ if (!flow) {
+ /* Allocate a new flow */
+ flow = malloc(sizeof(struct flow));
+ if (!flow) {
+ ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
+ return NULL;
+ }
+
+ /* Initialize */
+ memset(flow, 0, sizeof(struct flow));
+ flow->table = flow_table;
+ flow->protocol = protocol;
+ flow->src_port = src_port;
+ flow->dst_ip = dst_ip;
+ flow->dst_port = dst_port;
+
+ /* Add to hashed list */
+ i = hash_flow(protocol, src_port, dst_ip, dst_port);
+ flow->nexth = flow_table->flows[i];
+ flow_table->flows[i] = flow;
+
+ /* Add to ordered list */
+ if (flow_table->flows_tail) {
+ assert(flow_table->flows_head);
+ flow_table->flows_tail->next = flow;
+ flow_table->flows_tail = flow;
+ } else {
+ assert(!flow_table->flows_head);
+ flow_table->flows_head = flow_table->flows_tail = flow;
+ }
+
+ /* Update table statistics */
+ flow_table->collisions += flow->nexth ? 1 : 0;
+ flow_table->rows++;
+
+ /* Update total number of outstanding flows */
+ flows++;
+ }
+
+ assert(flow);
+ assert(flow->table == flow_table);
+ assert(flow->protocol == protocol);
+ assert(flow->src_port == src_port);
+ assert(flow->dst_ip == dst_ip);
+ assert(flow->dst_port == dst_port);
+
+ return flow;
+}
+
+/*
+ * The dump thread imports one set of flows while the collection
+ * thread continues to record new flows.
+ */
+
+void *
+dump_interval(void *unused)
+{
+ MYSQL *dbh;
+ struct flow_table *flow_table, *next_flow_table;
+ struct flow *flow, *next_flow, bind_flow;
+ struct slice *slice;
+ char slice_name[64];
+ char table_name[NAME_LEN + 1], *c;
+ struct tm tm;
+ MYSQL_STMT *stmt;
+ char query[512];
+ unsigned int tables, table_collisions, row_collisions, rows;
+ unsigned long long packets, bytes;
+ struct timeval start, end, interval_start;
+ int fd;
+ FILE *fp;
+
+ /* Time this interval */
+ gettimeofday(&interval_start, NULL);
+
+ /* Hash slice name lookups */
+ hash_passwd();
+
+ /* Open, lock, and truncate CSV file */
+ if ((fd = open(csv.u.string, O_CREAT | O_WRONLY, 0644)) >= 0) {
+ int tries = 10;
+
+ while (flock(fd, LOCK_EX | LOCK_NB)) {
+ sleep(1);
+ tries--;
+ }
+ if (tries == 0)
+ ulogd_log(ULOGD_ERROR, "Could not acquire lock on %s\n", csv.u.string);
+ if (!(fp = fdopen(fd, "w")))
+ close(fd);
+ else
+ ftruncate(fd, 0);
+ } else
+ fp = NULL;
+
+ /* Connect to DB */
+ if (!(dbh = mysql_init(NULL)))
+ ulogd_log(ULOGD_ERROR, "mysql_init: failed\n");
+ if (dbh && !mysql_real_connect(dbh,
+ mysqlhost.u.string[0] ? mysqlhost.u.string : NULL,
+ mysqluser.u.string,
+ mysqlpass.u.string[0] ? mysqlpass.u.string : NULL,
+ mysqldb.u.string,
+ 0, NULL, 0)) {
+ ulogd_log(ULOGD_ERROR,
+ "%s@%s:%s: %s\n",
+ mysqluser.u.string,
+ mysqlhost.u.string[0] ? mysqlhost.u.string : "localhost",
+ mysqldb.u.string,
+ mysql_error(dbh));
+ mysql_close(dbh);
+ dbh = NULL;
+ }
+
+ if (dbh) {
+ /* All times in the database are in GMT */
+ mysql_query(dbh, "SET time_zone='+00:00'");
+ }
+
+ /* Initialize statistics */
+ tables = table_collisions = row_collisions = rows = 0;
+ packets = bytes = 0;
+
+ assert(dump_head);
+
+ for (flow_table = dump_head; flow_table; flow_table = next_flow_table) {
+ /* Keep track of some basic statistics */
+ tables++;
+ table_collisions += flow_table->nexth ? 1 : 0;
+ row_collisions += flow_table->collisions;
+ rows += flow_table->rows;
+ packets += flow_table->packets;
+ bytes += flow_table->bytes;
+
+ /* Get slice name */
+ slice_name[sizeof(slice_name) - 1] = '\0';
+ switch (flow_table->xid) {
+ case CONNECTION_REFUSED_XID:
+ strncpy(slice_name, "connection-refused", sizeof(slice_name));
+ break;
+ case ICMP_ECHOREPLY_XID:
+ strncpy(slice_name, "icmp-reply", sizeof(slice_name));
+ break;
+ case ICMP_UNREACH_XID:
+ strncpy(slice_name, "icmp-unreachable", sizeof(slice_name));
+ break;
+ default:
+ if ((slice = get_slice(flow_table->xid)))
+ strncpy(slice_name, slice->name, sizeof(slice_name));
+ else if (flow_table->slice[0])
+ strncpy(slice_name, flow_table->slice, sizeof(slice_name));
+ else
+ snprintf(slice_name, sizeof(slice_name), "%u", flow_table->xid);
+ break;
+ }
+
+ if (dbh) {
+ /* Get day */
+ gmtime_r(&flow_table->start_time, &tm);
+
+ /* Form a human readable table name */
+ snprintf(table_name, sizeof(table_name),
+ "%u_%u_%u_%u_%s_%u_%02u_%02u",
+ IPQUAD(flow_table->src_ip),
+ slice_name,
+ 1900 + tm.tm_year, tm.tm_mon + 1, tm.tm_mday);
+ assert(strlen(table_name) <= NAME_LEN);
+
+ /* Replace punctation with underscores */
+ for (c = table_name; *c; c++) {
+ if (ispunct(*c))
+ *c = '_';
+ }
+
+ /* Time database operations on each table */
+ gettimeofday(&start, NULL);
+
+ /* Insert/update table summary */
+ if (mysql_queryf(dbh,
+ "INSERT INTO flow_tables "
+ "(table_name,src_ip,slice,"
+ "start_time,end_time,packets,bytes) "
+ "VALUES ('%s',%u,'%s',"
+ "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
+ "ON DUPLICATE KEY UPDATE "
+ "end_time=FROM_UNIXTIME(%u),"
+ "packets=packets+%llu,"
+ "bytes=bytes+%llu",
+ table_name,
+ flow_table->src_ip,
+ slice_name,
+ (unsigned) flow_table->start_time,
+ (unsigned) flow_table->end_time,
+ flow_table->packets,
+ flow_table->bytes,
+ (unsigned) flow_table->end_time,
+ flow_table->packets,
+ flow_table->bytes))
+ goto free_flow_table;
+
+ /* Create table */
+ if (mysql_queryf(dbh,
+ "CREATE TABLE IF NOT EXISTS %s LIKE flow_template",
+ table_name))
+ goto free_flow_table;
+
+ /* Lock for speed */
+ mysql_queryf(dbh, "ALTER TABLE %s DISABLE KEYS", table_name);
+ mysql_queryf(dbh, "LOCK TABLES %s WRITE", table_name);
+
+ /* Prepare the insert statement */
+ if (!(stmt = mysql_stmt_init(dbh)))
+ ulogd_log(ULOGD_ERROR, "mysql_stmt_init: %s\n", mysql_error(dbh));
+ else {
+ my_bool is_null = 0;
+ MYSQL_BIND bind[] = {
+ { .buffer_type = MYSQL_TYPE_TINY,
+ .buffer = &bind_flow.protocol,
+ .buffer_length = sizeof(bind_flow.protocol),
+ .length = &bind[0].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_SHORT,
+ .buffer = &bind_flow.src_port,
+ .buffer_length = sizeof(bind_flow.src_port),
+ .length = &bind[1].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONG,
+ .buffer = &bind_flow.dst_ip,
+ .buffer_length = sizeof(bind_flow.dst_ip),
+ .length = &bind[2].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_SHORT,
+ .buffer = &bind_flow.dst_port,
+ .buffer_length = sizeof(bind_flow.dst_port),
+ .length = &bind[3].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONG,
+ .buffer = &bind_flow.start_time,
+ .buffer_length = sizeof(bind_flow.start_time),
+ .length = &bind[4].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONG,
+ .buffer = &bind_flow.end_time,
+ .buffer_length = sizeof(bind_flow.end_time),
+ .length = &bind[5].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONGLONG,
+ .buffer = &bind_flow.packets,
+ .buffer_length = sizeof(bind_flow.packets),
+ .length = &bind[6].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONGLONG,
+ .buffer = &bind_flow.bytes,
+ .buffer_length = sizeof(bind_flow.bytes),
+ .length = &bind[7].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONG,
+ .buffer = &bind_flow.end_time,
+ .buffer_length = sizeof(bind_flow.end_time),
+ .length = &bind[8].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONGLONG,
+ .buffer = &bind_flow.packets,
+ .buffer_length = sizeof(bind_flow.packets),
+ .length = &bind[9].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ { .buffer_type = MYSQL_TYPE_LONGLONG,
+ .buffer = &bind_flow.bytes,
+ .buffer_length = sizeof(bind_flow.bytes),
+ .length = &bind[10].buffer_length,
+ .is_null = &is_null,
+ .is_unsigned = 1, },
+ };
+
+ snprintf(query, sizeof(query),
+ "INSERT INTO %s "
+ "(protocol,src_port,dst_ip,dst_port,"
+ "start_time,end_time,packets,bytes) "
+ "VALUES (?,?,?,?,"
+ "FROM_UNIXTIME(?),FROM_UNIXTIME(?),?,?) "
+ "ON DUPLICATE KEY UPDATE "
+ "end_time=FROM_UNIXTIME(?),"
+ "packets=packets+?,"
+ "bytes=bytes+?",
+ table_name);
+
+ if (mysql_stmt_prepare(stmt, query, strlen(query)) ||
+ mysql_stmt_bind_param(stmt, bind)) {
+ ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
+ mysql_stmt_close(stmt);
+ stmt = NULL;
+ }
+ }
+ } else
+ stmt = NULL;
+
+ for (flow = flow_table->flows_head; flow; flow = flow->next) {
+ /* Dump to CSV */
+ if (fp) {
+ fprintf(fp,
+ "%s,%u,"
+ "%u.%u.%u.%u,%u,"
+ "%u.%u.%u.%u,%u,"
+ "%u,%u,%llu,%llu\n",
+ slice_name, flow->protocol,
+ IPQUAD(flow_table->src_ip), flow->src_port,
+ IPQUAD(flow->dst_ip), flow->dst_port,
+ (unsigned) flow->start_time,
+ (unsigned) flow->end_time,
+ flow->packets,
+ flow->bytes);
+ }
+
+ /* Insert/update flow record */
+ if (dbh) {
+ if (stmt) {
+ bind_flow = *flow;
+ if (mysql_stmt_execute(stmt))
+ ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
+ } else {
+ mysql_queryf(dbh,
+ "INSERT INTO %s "
+ "(protocol,src_port,dst_ip,dst_port,"
+ "start_time,end_time,packets,bytes) "
+ "VALUES (%u,%u,%u,%u,"
+ "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
+ "ON DUPLICATE KEY UPDATE "
+ "end_time=FROM_UNIXTIME(%u),"
+ "packets=packets+%llu,"
+ "bytes=bytes+%llu",
+ table_name,
+ flow->protocol,
+ flow->src_port,
+ flow->dst_ip,
+ flow->dst_port,
+ (unsigned) flow->start_time,
+ (unsigned) flow->end_time,
+ flow->packets,
+ flow->bytes,
+ (unsigned) flow->end_time,
+ flow->packets,
+ flow->bytes);
+ }
+ }
+ }
+
+ if (dbh) {
+ if (stmt)
+ mysql_stmt_close(stmt);
+
+ /* Unlock */
+ mysql_query(dbh, "UNLOCK TABLES");
+ mysql_queryf(dbh, "ALTER TABLE %s ENABLE KEYS", table_name);
+
+ /* Update table summary */
+ mysql_queryf(dbh,
+ "UPDATE flow_tables "
+ "SET flows=(SELECT COUNT(protocol) FROM %s) "
+ "WHERE table_name='%s'",
+ table_name,
+ table_name);
+
+ gettimeofday(&end, NULL);
+ timersub(&end, &start, &end);
+ }
+
+ ulogd_log(ULOGD_DEBUG,
+ "Updated %u rows in %s in %u.%06u s (%u collisions, %llu packets, %llu bytes)\n",
+ flow_table->rows, table_name,
+ (unsigned) end.tv_sec, (unsigned) end.tv_usec,
+ flow_table->collisions, flow_table->packets, flow_table->bytes);
+
+ free_flow_table:
+ for (flow = flow_table->flows_head; flow; flow = next_flow) {
+ flow_table->rows--;
+ flow_table->packets -= flow->packets;
+ flow_table->bytes -= flow->bytes;
+ dump_flows--;
+ next_flow = flow->next;
+ assert(next_flow || flow == flow_table->flows_tail);
+ free(flow);
+ }
+
+ assert(!flow_table->rows);
+ assert(!flow_table->packets);
+ assert(!flow_table->bytes);
+
+ next_flow_table = flow_table->next;
+ assert(next_flow_table || flow_table == dump_tail);
+ free(flow_table);
+ }
+
+ assert(!dump_flows);
+ dump_head = dump_tail = NULL;
+
+ if (dbh) {
+ mysql_close(dbh);
+
+ gettimeofday(&end, NULL);
+ timersub(&end, &interval_start, &end);
+
+ ulogd_log(ULOGD_NOTICE,
+ "Updated %u rows in %u tables (%u bytes) in %u.%06u s "
+ "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
+ rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
+ (unsigned) end.tv_sec, (unsigned) end.tv_usec,
+ table_collisions, row_collisions, packets, bytes);
+ } else {
+ /* Could not connect to database */
+ ulogd_log(ULOGD_ERROR,
+ "Lost %u rows in %u tables (%u bytes) "
+ "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
+ rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
+ table_collisions, row_collisions, packets, bytes);
+ }
+
+ if (fp) {
+ fclose(fp);
+ assert(fd >= 0);
+ close(fd);
+ }
+
+ pthread_exit(NULL);
+ return NULL;
+}
+
+static void
+start_dump(void)
+{
+ /* Wait for previous dump thread to complete */
+ if (dump_thread) {
+ pthread_join(dump_thread, NULL);
+ dump_thread = 0;
+ }
+
+ /* Switch buffers */
+ assert(!dump_head);
+ assert(!dump_tail);
+ assert(!dump_flows);
+ dump_head = flow_tables_head;
+ dump_tail = flow_tables_tail;
+ dump_flows = flows;
+
+ /* Start up the dump thread if necessary */
+ if (pthread_create(&dump_thread, NULL, dump_interval, NULL)) {
+ ulogd_log(ULOGD_ERROR, "pthread_create: %s\n", strerror(errno));
+ /* Try again later */
+ dump_thread = 0;
+ dump_head = dump_tail = NULL;
+ dump_flows = 0;
+ } else {
+ /* Clear hash */
+ memset(flow_tables, 0, sizeof(flow_tables));
+ flow_tables_head = flow_tables_tail = NULL;
+ flows = 0;
+ }
+}
+
+static void
+update_flow(struct flow *flow, time_t now, unsigned int packets, unsigned int bytes)
+{
+ /* Update flow */
+ if (now > flow->end_time)
+ flow->end_time = now;
+ if (!flow->start_time || now < flow->start_time)
+ flow->start_time = now;
+ flow->packets += packets;
+ flow->bytes += bytes;
+
+ /* Update table summary */
+ if (now > flow->table->end_time)
+ flow->table->end_time = now;
+ if (!flow->table->start_time || now < flow->table->start_time)
+ flow->table->start_time = now;
+ flow->table->packets += packets;
+ flow->table->bytes += bytes;
+}
+
+#ifndef STANDALONE
+
+struct intr_id {
+ char* name;
+ ulog_iret_t *res;
+};
+
+/* Interesting keys */
+enum {
+ OOB_TIME_SEC = 0,
+ OOB_MARK,
+ IP_SADDR,
+ IP_DADDR,
+ IP_TOTLEN,
+ IP_PROTOCOL,
+ TCP_SPORT,
+ TCP_DPORT,
+ TCP_ACK,
+ TCP_RST,
+ UDP_SPORT,
+ UDP_DPORT,
+ ICMP_TYPE,
+ ICMP_CODE,
+ GRE_FLAG_KEY,
+ GRE_VERSION,
+ GRE_KEY,
+ PPTP_CALLID,
+};
+
+#define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
+static struct intr_id intr_ids[] = {
+ [OOB_TIME_SEC] = { "oob.time.sec", 0 },
+ [OOB_MARK] = { "oob.mark", 0 },
+ [IP_SADDR] = { "ip.saddr", 0 },
+ [IP_DADDR] = { "ip.daddr", 0 },
+ [IP_TOTLEN] = { "ip.totlen", 0 },
+ [IP_PROTOCOL] = { "ip.protocol", 0 },
+ [TCP_SPORT] = { "tcp.sport", 0 },
+ [TCP_DPORT] { "tcp.dport", 0 },
+ [TCP_ACK] = { "tcp.ack", 0 },
+ [TCP_RST] = { "tcp.rst", 0 },
+ [UDP_SPORT] = { "udp.sport", 0 },
+ [UDP_DPORT] = { "udp.dport", 0 },
+ [ICMP_TYPE] = { "icmp.type", 0 },
+ [ICMP_CODE] = { "icmp.code", 0 },
+ [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
+ [GRE_VERSION] = { "gre.version", 0 },
+ [GRE_KEY] = { "gre.key", 0 },
+ [PPTP_CALLID] = { "pptp.callid", 0 },
+};
+
+#define GET_VALUE(x) intr_ids[x].res->value
+
+#define DATE(t) ((t) / (24*60*60) * (24*60*60))
+
+static int _output_netflow(ulog_iret_t *res)
+{
+ u_int8_t protocol;
+ u_int32_t src_ip, dst_ip;
+ u_int16_t src_port, dst_port;
+ int xid;
+ struct flow *flow = NULL;
+ time_t now;
+
+ now = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
+
+ if (flow_tables_head) {
+ /* If we have collected for at least 5 minutes, or
+ * collected the maximum number of flows, or it is now
+ * the next day, dump this interval.
+ */
+ if ((now - flow_tables_head->start_time) >= (interval.u.value * 60) ||
+ flows >= MAX_FLOWS ||
+ DATE(flow_tables_head->start_time) != DATE(now)) {
+ /* Out of memory */
+ if (flows >= MAX_FLOWS)
+ ulogd_log(ULOGD_ERROR, "dumping %d flows early\n", flows);
+
+ start_dump();
+ }
+ }
+
+ protocol = GET_VALUE(IP_PROTOCOL).ui8;
+ src_ip = GET_VALUE(IP_SADDR).ui32;
+ dst_ip = GET_VALUE(IP_DADDR).ui32;
+ xid = GET_VALUE(OOB_MARK).ui32;
+
+ switch (protocol) {
+
+ case IPPROTO_TCP:
+ src_port = GET_VALUE(TCP_SPORT).ui16;
+ dst_port = GET_VALUE(TCP_DPORT).ui16;
+
+ /* check for root termination */
+ if (xid == ROOT_XID) {
+ if ((flow = get_flow(src_ip, UNKNOWN_XID, NULL, protocol, src_port, dst_ip, dst_port))) {
+ /*
+ * this is supposed to catch some of the cases where the
+ * network stack responds on behalf of the user but the
+ * slice is incorrectly accounted for, e.g. on socket
+ * shutdown
+ */
+ assert(flow->table);
+ xid = flow->table->xid;
+ } else {
+ /*
+ * we have not seen any packets on this flow during the
+ * current interval, check for the connection refused
+ */
+ if (GET_VALUE(TCP_RST).b && GET_VALUE(TCP_ACK).b)
+ xid = CONNECTION_REFUSED_XID;
+ }
+ }
+ break;
+
+ case IPPROTO_UDP:
+ /*
+ * we could record the source port, however this pretty much
+ * kills any notion of UDP flows and therefore consume large
+ * quantities of space, so we set the source port to 0
+ * tuple.sport = GET_VALUE(UDP_SPORT).ui16;
+ */
+ src_port = 0;
+
+ /*
+ * traceroutes create a large number of flows in the db
+ * this is a quick hack to catch the most common form
+ * of traceroute (basically we're mapping any UDP packet
+ * in the 33435-33524 range to the "trace" port, 33524 is
+ * 3 packets * nhops (30).
+ */
+ dst_port = GET_VALUE(UDP_DPORT).ui16;
+ if (dst_port >= 33435 && dst_port <= 33524)
+ dst_port = 33435;
+ break;
+
+ case IPPROTO_ICMP:
+ src_port = GET_VALUE(ICMP_TYPE).ui8;
+ dst_port = GET_VALUE(ICMP_CODE).ui8;
+
+ /*
+ * We special case some of the ICMP traffic that the kernel
+ * always generates. Since this is attributed to root, it
+ * creates significant "noise" in the output. We want to be
+ * able to quickly see that root is generating traffic.
+ */
+ if (xid == ROOT_XID) {
+ if (src_port == ICMP_ECHOREPLY)
+ xid = ICMP_ECHOREPLY_XID;
+ else if (src_port == ICMP_UNREACH)
+ xid = ICMP_UNREACH_XID;
+ }
+ break;
+
+ case IPPROTO_GRE:
+ if (GET_VALUE(GRE_FLAG_KEY).b) {
+ if (GET_VALUE(GRE_VERSION).ui8 == 1) {
+ /* Get PPTP call ID */
+ src_port = GET_VALUE(PPTP_CALLID).ui16;
+ } else {
+ /* XXX Truncate GRE keys to 16 bits */
+ src_port = (u_int16_t) GET_VALUE(GRE_KEY).ui32;
+ }
+ } else {
+ /* No key available */
+ src_port = 0;
+ }
+ dst_port = 0;
+ break;
+
+ default:
+ /* This is the default key for packets from unsupported protocols */
+ src_port = 0;
+ dst_port = 0;
+ break;
+ }
+
+ /* Record the flow */
+ if (!flow) {
+ flow = get_flow(src_ip, xid, NULL, protocol, src_port, dst_ip, dst_port);
+ if (!flow)
+ return -ENOMEM;
+ }
+
+ /* Update the flow */
+ update_flow(flow, now, 1, GET_VALUE(IP_TOTLEN).ui16);
+
+ return 0;
+}
+
+/* get all key id's for the keys we are intrested in */
+static int get_ids(void)
+{
+ int i;
+ struct intr_id *cur_id;
+
+ for (i = 0; i < INTR_IDS; i++) {
+ cur_id = &intr_ids[i];
+ cur_id->res = keyh_getres(keyh_getid(cur_id->name));
+ if (!cur_id->res) {
+ ulogd_log(ULOGD_ERROR,
+ "Cannot resolve keyhash id for %s\n",
+ cur_id->name);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+
+static int _netflow_init(void)
+{
+ /* have the opts parsed */
+ config_parse_file("NETFLOW",config_entries);
+
+ if (get_ids()) {
+ ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
+ exit(2);
+ }
+
+ /* Seed the hash function */
+ salt = getpid() ^ time(NULL);
+
+ return 0;
+}
+
+
+static void _netflow_fini(void)
+{
+ /* should probably stop the dump thread?! */
+
+ /* do nothing */
+}
+
+
+static ulog_output_t netflow_op = {
+ .name = "netflow",
+ .output = &_output_netflow,
+ .init = _netflow_init,
+ .fini = _netflow_fini,
+};
+
+void _init(void)
+{
+ register_output(&netflow_op);
+}
+
+#else
+
+static FILE *logfile = NULL; /* logfile pointer */
+static int loglevel = 5; /* current loglevel */
+
+/* log message to the logfile */
+void __ulogd_log(int level, char *file, int line, const char *format, ...)
+{
+ char *timestr;
+ va_list ap;
+ time_t tm;
+ FILE *outfd;
+
+ /* log only messages which have level at least as high as loglevel */
+ if (level < loglevel)
+ return;
+
+ if (logfile)
+ outfd = logfile;
+ else
+ outfd = stderr;
+
+ va_start(ap, format);
+
+ tm = time(NULL);
+ timestr = ctime(&tm);
+ timestr[strlen(timestr)-1] = '\0';
+ fprintf(outfd, "%s <%1.1d> %s:%d ", timestr, level, file, line);
+
+ vfprintf(outfd, format, ap);
+ va_end(ap);
+
+ /* flush glibc's buffer */
+ fflush(outfd);
+}
+
+int
+main(int argc, char **argv)
+{
+ /* Get options */
+ while (1) {
+ int option_index = 0;
+ static struct option long_options[] = {
+ { "user", required_argument, NULL, 'u' },
+ { "database", required_argument, NULL, 'd' },
+ { "password", required_argument, NULL, 'p' },
+ { "verbose", required_argument, NULL, 'v' },
+ { "host", required_argument, NULL, 'h' },
+ { "help", required_argument, NULL, '?' },
+ { 0, 0, 0, 0 }
+ };
+ struct option *opt;
+ int c;
+
+ c = getopt_long(argc, argv, "u:d:p:v:h:", long_options, &option_index);
+ if (c == -1)
+ break;
+
+ switch (c) {
+ case 'u':
+ strncpy(mysqluser.u.string, optarg, sizeof(mysqluser.u.string));
+ break;
+ case 'd':
+ strncpy(mysqldb.u.string, optarg, sizeof(mysqldb.u.string));
+ break;
+ case 'p':
+ strncpy(mysqlpass.u.string, optarg, sizeof(mysqlpass.u.string));
+ break;
+ case 'v':
+ loglevel = atoi(optarg);
+ break;
+ case 'h':
+ strncpy(mysqlhost.u.string, optarg, sizeof(mysqlhost.u.string));
+ break;
+ default:
+ fprintf(stderr, "usage: %s [OPTION]... YY/mm-dd.log[.bz2|.gz]...\n", argv[0]);
+ for (opt = long_options; opt->name; opt++)
+ fprintf(stderr, "\t-%c, --%s%s\n", opt->val, opt->name, required_argument ? "=ARGUMENT" : "");
+ return 1;
+ }
+ }
+
+ /* Seed the hash function */
+ salt = getpid() ^ time(NULL);
+
+ /* Don't lookup slice names in /etc/passwd */
+ strcpy(slicemap.u.string, "/dev/null");
+
+ /* All times in the log files are in GMT */
+ putenv("TZ=GMT");
+
+ /* Parse the rest of the non-option arguments (files to import) */
+ for (argv = &argv[optind]; *argv; argv++) {
+ char pathname[PATH_MAX], *s, *next;
+ const char *cmd = NULL, *opts = NULL;
+ pid_t pid = 0;
+ FILE *fp = NULL;
+ char *line = NULL;
+ size_t len = 0;
+ int fds[2] = { -1, -1 };
+ time_t now;
+ struct tm tm;
+
+ if (!realpath(*argv, pathname)) {
+ ulogd_log(ULOGD_ERROR, "%s: %s\n", *argv, strerror(errno));
+ goto next_file;
+ }
+
+ /* We may need to fork a child to decompress the log file */
+ if (strstr(pathname, ".bz2")) {
+ cmd = "bzip2";
+ opts = "-cdfq";
+ } else if (strstr(pathname, ".gz")) {
+ cmd = "gzip";
+ opts = "-cdfq";
+ } else if (strstr(pathname, ".zip")) {
+ cmd = "unzip";
+ opts = "-p";
+ }
+
+ /* Fork a child to decompress the log file */
+ if (cmd) {
+ /* Open a pipe */
+ if (pipe(fds)) {
+ ulogd_log(ULOGD_ERROR, "pipe: %s\n", strerror(errno));
+ goto next_file;
+ }
+ switch ((pid = fork())) {
+ case -1:
+ ulogd_log(ULOGD_ERROR, "fork: %s\n", strerror(errno));
+ goto next_file;
+ case 0:
+ close(fds[0]);
+ fds[0] = -1;
+ /* Redirect stdout to the write end of the pipe */
+ if (dup2(fds[1], fileno(stdout)) < 0) {
+ ulogd_log(ULOGD_ERROR, "dup2: %s\n", strerror(errno));
+ exit(errno);
+ }
+ execlp(cmd, cmd, opts, pathname, NULL);
+ ulogd_log(ULOGD_ERROR, "execlp: %s\n", strerror(errno));
+ goto next_file;
+ default:
+ close(fds[1]);
+ fds[1] = -1;
+ /* Open the read end of the pipe */
+ if (!(fp = fdopen(fds[0], "r"))) {
+ ulogd_log(ULOGD_ERROR, "fdopen: %s\n", strerror(errno));
+ goto next_file;
+ }
+ break;
+ }
+ }
+
+ /* Just open the file */
+ else if (!(fp = fopen(pathname, "r"))) {
+ ulogd_log(ULOGD_ERROR, "%s: %s\n", pathname, strerror(errno));
+ goto next_file;
+ }
+
+ /* Parse date from the pathname (e.g. [.*]/05/01-25.log[.bz2|gz]) */
+ now = time(NULL);
+ gmtime_r(&now, &tm);
+
+ next = pathname;
+ while ((s = strsep(&next, "/"))) {
+ int mon, mday, year;
+
+ if (sscanf(s, "%02u-%02u.log%*s", &mon, &mday) == 2) {
+ tm.tm_mon = mon - 1;
+ tm.tm_mday = mday;
+ } else if (strlen(s) == 2 && sscanf(s, "%02u", &year) == 1) {
+ /* Use strptime(3) strategy: ...values in the range
+ * 69-99 refer to years in the twentieth century
+ * (1969-1999); values in the range 00-68 refer to
+ * years in the twenty-first century (2000-2068).
+ */
+ if (year < 69)
+ year += 100;
+ tm.tm_year = year;
+ }
+ }
+
+ /* Reset to midnight and recalculate derived fields with mktime() */
+ tm.tm_hour = tm.tm_min = tm.tm_sec = 0;
+ mktime(&tm);
+
+ while (getline(&line, &len, fp) >= 0) {
+ char *s, *next;
+ char *slice;
+ u_int8_t protocol;
+ struct in_addr src_ip, dst_ip;
+ unsigned int src_port, dst_port;
+ unsigned int packets, bytes;
+ struct flow *flow;
+
+ /* Parse the flow record */
+ next = line;
+
+ if (!(s = strsep(&next, ",")) ||
+ sscanf(s, "%02u-%02u-%02u", &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3)
+ continue;
+ now = mktime(&tm);
+
+ if (!(s = strsep(&next, ",")))
+ continue;
+ if (!strcasecmp(s, "TCP"))
+ protocol = IPPROTO_TCP;
+ else if (!strcasecmp(s, "UDP"))
+ protocol = IPPROTO_UDP;
+ else if (!strcasecmp(s, "ICMP"))
+ protocol = IPPROTO_ICMP;
+ else if (!strcasecmp(s, "GRE"))
+ protocol = IPPROTO_GRE;
+ else
+ protocol = atoi(s);
+
+ if (!(slice = strsep(&next, ",")))
+ continue;
+
+ if (!(s = strsep(&next, ",")) ||
+ !inet_aton(s, &src_ip))
+ continue;
+
+ if (!(s = strsep(&next, ",")) ||
+ sscanf(s, "%u", &src_port) != 1)
+ continue;
+
+ if (!(s = strsep(&next, ",")) ||
+ !inet_aton(s, &dst_ip))
+ continue;
+
+ if (!(s = strsep(&next, ",")) ||
+ sscanf(s, "%u", &dst_port) != 1)
+ continue;
+
+ if (!(s = strsep(&next, ",")) ||
+ sscanf(s, "%u", &packets) != 1)
+ continue;
+
+ if (!(s = strsep(&next, ",")) ||
+ sscanf(s, "%u", &bytes) != 1)
+ continue;
+
+ /* Record the flow */
+ flow = get_flow(ntohl(src_ip.s_addr), 0, slice,
+ protocol, src_port, ntohl(dst_ip.s_addr), dst_port);
+ if (!flow)
+ continue;
+
+ /* Update flow */
+ update_flow(flow, now, packets, bytes);
+ }
+ if (line)
+ free(line);
+
+ start_dump();
+
+ next_file:
+ if (pid && kill(pid, SIGTERM))
+ ulogd_log(ULOGD_ERROR, "kill: %s\n", strerror(errno));
+ wait(NULL);
+ if (fp)
+ fclose(fp);
+ if (fds[0] >= 0)
+ close(fds[0]);
+ if (fds[1] >= 0)
+ close(fds[1]);
+ }
+
+ /* Wait for previous dump thread to complete */
+ if (dump_thread) {
+ pthread_join(dump_thread, NULL);
+ dump_thread = 0;
+ }
+
+ return 0;
+}
+
+#endif