2 * ulogd output target for IP flow analysis
4 * Mark Huang <mlhuang@cs.princeton.edu>
5 * Copyright (C) 2004-2005 The Trustees of Princeton University
7 * Based on admindump.pl by Mic Bowman and Paul Brett
8 * Copyright (c) 2002 Intel Corporation
10 * $Id: ulogd_NETFLOW.c,v 1.19 2005/04/20 21:10:05 mlhuang Exp $
13 /* Enable GNU glibc extensions */
19 /* va_start() and friends */
25 /* strstr() and friends */
28 /* dirname() and basename() */
31 /* fork() and wait() */
32 #include <sys/types.h>
39 /* errno and assert() */
46 /* time() and friends */
51 #include <sys/socket.h>
52 #include <netinet/in.h>
53 #include <arpa/inet.h>
55 /* ICMP definitions */
56 #include <netinet/ip.h>
57 #include <netinet/ip_icmp.h>
62 /* pthread_create() */
68 #include <ulogd/ulogd.h>
69 #include <ulogd/conffile.h>
71 #if !defined(STANDALONE) && HAVE_LIBPROPER
72 #include <proper/prop.h>
76 * /etc/ulogd.conf configuration options
79 /* Dump interval in minutes */
80 static config_entry_t interval = {
83 .type = CONFIG_TYPE_INT,
84 .options = CONFIG_OPT_NONE,
88 /* Slice map (in /etc/passwd format) */
89 static config_entry_t slicemap = {
92 .type = CONFIG_TYPE_STRING,
93 .options = CONFIG_OPT_NONE,
94 .u = { .string = "/etc/passwd" },
97 /* MySQL database name */
98 static config_entry_t mysqldb = {
101 .type = CONFIG_TYPE_STRING,
102 .options = CONFIG_OPT_NONE,
103 .u = { .string = "netflow" },
106 /* MySQL database user */
107 static config_entry_t mysqluser = {
110 .type = CONFIG_TYPE_STRING,
111 .options = CONFIG_OPT_NONE,
112 .u = { .string = "netflow" },
115 /* MySQL database password */
116 static config_entry_t mysqlpass = {
119 .type = CONFIG_TYPE_STRING,
120 .options = CONFIG_OPT_NONE,
121 .u = { .string = "" },
124 /* MySQL database host */
125 static config_entry_t mysqlhost = {
128 .type = CONFIG_TYPE_STRING,
129 .options = CONFIG_OPT_NONE,
130 .u = { .string = "" },
133 /* Latest flows in CSV format */
134 static config_entry_t csv = {
137 .type = CONFIG_TYPE_STRING,
138 .options = CONFIG_OPT_NONE,
139 .u = { .string = "/var/www/html/flows.csv" },
142 #define config_entries (&csv)
145 * Debug functionality
152 #define NIPQUAD(addr) \
153 ((unsigned char *)&addr)[0], \
154 ((unsigned char *)&addr)[1], \
155 ((unsigned char *)&addr)[2], \
156 ((unsigned char *)&addr)[3]
158 #define IPQUAD(addr) \
159 ((unsigned char *)&addr)[3], \
160 ((unsigned char *)&addr)[2], \
161 ((unsigned char *)&addr)[1], \
162 ((unsigned char *)&addr)[0]
168 #include <mysql/mysql.h>
170 /* mysql_query() with printf() syntax */
172 mysql_queryf(MYSQL *dbh, const char *fmt, ...)
177 int ret = 2 * strlen(fmt);
181 if (!(buf = realloc(buf, len))) {
182 ulogd_log(ULOGD_ERROR, "realloc: %s\n", strerror(errno));
187 ret = vsnprintf(buf, len, fmt, ap);
190 ulogd_log(ULOGD_ERROR, "vsnprintf: %s\n", strerror(errno));
193 } while (ret >= len);
195 ret = mysql_query(dbh, buf);
197 ulogd_log(ULOGD_ERROR, "%s: %s\n", buf, mysql_error(dbh));
206 * Jenkins hash support
210 typedef u_int16_t u16;
211 typedef u_int32_t u32;
212 #include <linux/jhash.h>
214 /* Salt for the hash functions */
218 * Hash slice name lookups on context ID.
221 /* Special context IDs */
222 #define UNKNOWN_XID -1
226 CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */
238 /* Must be a power of 2 */
239 #define SLICE_HASH_SIZE 128
240 struct slice *slice_hash[SLICE_HASH_SIZE];
245 return jhash_1word(xid, salt) & (SLICE_HASH_SIZE - 1);
248 static struct slice *
255 for (slice = slice_hash[i]; slice; slice = slice->next) {
256 if (slice->xid == xid)
263 static struct slice *
264 add_slice(int xid, char *name)
269 slice = malloc(sizeof(struct slice));
272 memset(slice, 0, sizeof(struct slice));
275 slice->name = strdup(name);
281 /* Add to hashed list */
283 slice->next = slice_hash[i];
284 slice_hash[i] = slice;
295 struct slice *slice, *next;
298 #if !defined(STANDALONE) && HAVE_LIBPROPER
299 if ((fd = prop_open_file(slicemap.u.string, PROP_OPEN_READ)) < 0) {
300 /* prop_open_file() returns -errno */
304 fd = open(slicemap.u.string, O_RDONLY);
307 if (fd < 0 || !(fp = fdopen(fd, "r"))) {
308 ulogd_log(ULOGD_ERROR, "%s: %s\n", slicemap.u.string, strerror(errno));
315 for (i = 0; i < SLICE_HASH_SIZE; i++) {
316 for (slice = slice_hash[i]; slice; slice = next) {
322 slice_hash[i] = NULL;
325 /* (Re)build slice hash */
326 while ((pw = fgetpwent(fp)))
327 add_slice(pw->pw_uid, pw->pw_name);
335 * Maintain flows in a two level hash table. The first level hashes
336 * on (src_ip, slice). The second level hashes on (protocol, dst_ip,
337 * src_port, dst_port). This structure mirrors how we store flows in
338 * the database (in many tables where each table contains flows with
339 * the same (src_ip, slice) parameters and unique (protocol,
340 * src_port, dst_ip, dst_port) rows).
343 /* Must be powers of 2 */
345 #define TABLE_HASH_SIZE 16
346 #define FLOW_HASH_SIZE 65536
348 #define TABLE_HASH_SIZE 8
349 #define FLOW_HASH_SIZE 1024
353 struct flow *nexth; /* Next in hash */
354 struct flow *next; /* Next in ordered list */
355 struct flow_table *table; /* Back pointer */
356 u_int8_t protocol; /* IP protocol number */
357 u_int16_t src_port; /* IP source port (host order) */
358 u_int16_t dst_port; /* IP destination port (host order) */
359 u_int32_t dst_ip; /* IP destination address (host order) */
360 time_t start_time; /* Timestamp of the first packet in the flow */
361 time_t end_time; /* Timestamp of the first packet in the flow */
362 unsigned long long packets; /* Number of IP packets */
363 unsigned long long bytes; /* Number of IP bytes (including headers) */
367 struct flow_table *nexth; /* Next in hash */
368 struct flow_table *next; /* Next in ordered list */
369 u_int32_t src_ip; /* IP source address (host order) */
370 int xid; /* Context ID */
371 char slice[32]; /* Slice name */
372 unsigned long rows; /* Total number of rows to insert/update */
373 unsigned long collisions; /* Total number of row hash collisions */
374 unsigned long long packets; /* Total number of packets sent in all flows in this table */
375 unsigned long long bytes; /* Total number of bytes sent in all flows in this table */
376 time_t start_time; /* Start time of the earliest flow in the table */
377 time_t end_time; /* End time of the latest flow in the table */
378 /* Hashed list of flows */
379 struct flow *flows[FLOW_HASH_SIZE];
380 /* Ordered list of flows */
381 struct flow *flows_head, *flows_tail;
384 /* Hashed list of flow tables */
385 static struct flow_table *flow_tables[TABLE_HASH_SIZE];
386 /* Ordered list of flow tables */
387 static struct flow_table *flow_tables_head, *flow_tables_tail;
389 /* Maximum total number of outstanding allocated flows */
390 #define MAX_FLOWS 65536 /* ip_conntrack_max on a 1 GB machine */
393 /* Double buffer the ordered list and dump it in another thread */
394 static struct flow_table *dump_head, *dump_tail;
395 static int dump_flows;
396 static pthread_t dump_thread;
399 hash_flow_table(u_int32_t src_ip, int xid, char *slice)
402 return jhash(slice, strlen(slice), salt) & (TABLE_HASH_SIZE - 1);
404 return jhash_2words(src_ip, xid, salt) & (TABLE_HASH_SIZE - 1);
407 static struct flow_table *
408 get_flow_table(u_int32_t src_ip, int xid, char *slice)
410 struct flow_table *flow_table;
413 /* See if it already exists */
414 i = hash_flow_table(src_ip, xid, slice);
415 for (flow_table = flow_tables[i]; flow_table; flow_table = flow_table->nexth) {
416 if (flow_table->src_ip == src_ip &&
417 flow_table->xid == xid &&
418 (!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1)))
423 /* Allocate a new flow table */
424 flow_table = malloc(sizeof(struct flow_table));
426 ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
431 memset(flow_table, 0, sizeof(struct flow_table));
432 flow_table->src_ip = src_ip;
433 flow_table->xid = xid;
435 strncpy(flow_table->slice, slice, sizeof(flow_table->slice) - 1);
437 /* Add to hashed list */
438 i = hash_flow_table(src_ip, xid, slice);
439 flow_table->nexth = flow_tables[i];
440 flow_tables[i] = flow_table;
442 /* Add to ordered list */
443 if (flow_tables_tail) {
444 assert(flow_tables_head);
445 flow_tables_tail->next = flow_table;
446 flow_tables_tail = flow_table;
448 assert(!flow_tables_head);
449 flow_tables_head = flow_tables_tail = flow_table;
454 assert(flow_table->src_ip == src_ip);
455 assert(flow_table->xid == xid);
456 assert(!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1));
462 hash_flow(u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
464 return jhash_3words(protocol, dst_ip, (src_port << 16) | dst_port, salt) & (FLOW_HASH_SIZE - 1);
468 get_flow(u_int32_t src_ip, int xid, char *slice,
469 u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port)
471 struct flow_table *flow_table;
475 if (xid != UNKNOWN_XID)
476 flow_table = get_flow_table(src_ip, xid, slice);
478 /* Support searching for flows without specifying a context ID */
479 flow_table = flow_tables_head;
482 for (; flow_table; flow_table = flow_table->next) {
483 i = hash_flow(protocol, src_port, dst_ip, dst_port);
484 for (flow = flow_table->flows[i]; flow; flow = flow->nexth) {
485 if (flow->protocol == protocol &&
486 flow->src_port == src_port &&
487 flow->dst_ip == dst_ip &&
488 flow->dst_port == dst_port)
491 if (xid != UNKNOWN_XID)
499 /* Allocate a new flow */
500 flow = malloc(sizeof(struct flow));
502 ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno));
507 memset(flow, 0, sizeof(struct flow));
508 flow->table = flow_table;
509 flow->protocol = protocol;
510 flow->src_port = src_port;
511 flow->dst_ip = dst_ip;
512 flow->dst_port = dst_port;
514 /* Add to hashed list */
515 i = hash_flow(protocol, src_port, dst_ip, dst_port);
516 flow->nexth = flow_table->flows[i];
517 flow_table->flows[i] = flow;
519 /* Add to ordered list */
520 if (flow_table->flows_tail) {
521 assert(flow_table->flows_head);
522 flow_table->flows_tail->next = flow;
523 flow_table->flows_tail = flow;
525 assert(!flow_table->flows_head);
526 flow_table->flows_head = flow_table->flows_tail = flow;
529 /* Update table statistics */
530 flow_table->collisions += flow->nexth ? 1 : 0;
533 /* Update total number of outstanding flows */
538 assert(flow->table == flow_table);
539 assert(flow->protocol == protocol);
540 assert(flow->src_port == src_port);
541 assert(flow->dst_ip == dst_ip);
542 assert(flow->dst_port == dst_port);
548 * The dump thread imports one set of flows while the collection
549 * thread continues to record new flows.
553 dump_interval(void *unused)
556 struct flow_table *flow_table, *next_flow_table;
557 struct flow *flow, *next_flow, bind_flow;
560 char table_name[NAME_LEN + 1], *c;
564 unsigned int tables, table_collisions, row_collisions, rows;
565 unsigned long long packets, bytes;
566 struct timeval start, end, interval_start;
570 /* Time this interval */
571 gettimeofday(&interval_start, NULL);
573 /* Hash slice name lookups */
576 /* Open, lock, and truncate CSV file */
577 if ((fd = open(csv.u.string, O_CREAT | O_WRONLY, 0644)) >= 0) {
580 while (flock(fd, LOCK_EX | LOCK_NB)) {
585 ulogd_log(ULOGD_ERROR, "Could not acquire lock on %s\n", csv.u.string);
586 if (!(fp = fdopen(fd, "w")))
594 if (!(dbh = mysql_init(NULL)))
595 ulogd_log(ULOGD_ERROR, "mysql_init: failed\n");
596 if (dbh && !mysql_real_connect(dbh,
597 mysqlhost.u.string[0] ? mysqlhost.u.string : NULL,
599 mysqlpass.u.string[0] ? mysqlpass.u.string : NULL,
602 ulogd_log(ULOGD_ERROR,
605 mysqlhost.u.string[0] ? mysqlhost.u.string : "localhost",
613 /* All times in the database are in GMT */
614 mysql_query(dbh, "SET time_zone='+00:00'");
617 /* Initialize statistics */
618 tables = table_collisions = row_collisions = rows = 0;
623 for (flow_table = dump_head; flow_table; flow_table = next_flow_table) {
624 /* Keep track of some basic statistics */
626 table_collisions += flow_table->nexth ? 1 : 0;
627 row_collisions += flow_table->collisions;
628 rows += flow_table->rows;
629 packets += flow_table->packets;
630 bytes += flow_table->bytes;
633 slice_name[sizeof(slice_name) - 1] = '\0';
634 switch (flow_table->xid) {
635 case CONNECTION_REFUSED_XID:
636 strncpy(slice_name, "connection-refused", sizeof(slice_name));
638 case ICMP_ECHOREPLY_XID:
639 strncpy(slice_name, "icmp-reply", sizeof(slice_name));
641 case ICMP_UNREACH_XID:
642 strncpy(slice_name, "icmp-unreachable", sizeof(slice_name));
645 if ((slice = get_slice(flow_table->xid)))
646 strncpy(slice_name, slice->name, sizeof(slice_name));
647 else if (flow_table->slice[0])
648 strncpy(slice_name, flow_table->slice, sizeof(slice_name));
650 snprintf(slice_name, sizeof(slice_name), "%u", flow_table->xid);
656 gmtime_r(&flow_table->start_time, &tm);
658 /* Form a human readable table name */
659 snprintf(table_name, sizeof(table_name),
660 "%u_%u_%u_%u_%s_%u_%02u_%02u",
661 IPQUAD(flow_table->src_ip),
663 1900 + tm.tm_year, tm.tm_mon + 1, tm.tm_mday);
664 assert(strlen(table_name) <= NAME_LEN);
666 /* Replace punctation with underscores */
667 for (c = table_name; *c; c++) {
672 /* Time database operations on each table */
673 gettimeofday(&start, NULL);
675 /* Insert/update table summary */
676 if (mysql_queryf(dbh,
677 "INSERT INTO flow_tables "
678 "(table_name,src_ip,slice,"
679 "start_time,end_time,packets,bytes) "
680 "VALUES ('%s',%u,'%s',"
681 "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
682 "ON DUPLICATE KEY UPDATE "
683 "end_time=FROM_UNIXTIME(%u),"
684 "packets=packets+%llu,"
689 (unsigned) flow_table->start_time,
690 (unsigned) flow_table->end_time,
693 (unsigned) flow_table->end_time,
696 goto free_flow_table;
699 if (mysql_queryf(dbh,
700 "CREATE TABLE IF NOT EXISTS %s LIKE flow_template",
702 goto free_flow_table;
705 mysql_queryf(dbh, "ALTER TABLE %s DISABLE KEYS", table_name);
706 mysql_queryf(dbh, "LOCK TABLES %s WRITE", table_name);
708 /* Prepare the insert statement */
709 if (!(stmt = mysql_stmt_init(dbh)))
710 ulogd_log(ULOGD_ERROR, "mysql_stmt_init: %s\n", mysql_error(dbh));
713 MYSQL_BIND bind[] = {
714 { .buffer_type = MYSQL_TYPE_TINY,
715 .buffer = &bind_flow.protocol,
716 .buffer_length = sizeof(bind_flow.protocol),
717 .length = &bind[0].buffer_length,
720 { .buffer_type = MYSQL_TYPE_SHORT,
721 .buffer = &bind_flow.src_port,
722 .buffer_length = sizeof(bind_flow.src_port),
723 .length = &bind[1].buffer_length,
726 { .buffer_type = MYSQL_TYPE_LONG,
727 .buffer = &bind_flow.dst_ip,
728 .buffer_length = sizeof(bind_flow.dst_ip),
729 .length = &bind[2].buffer_length,
732 { .buffer_type = MYSQL_TYPE_SHORT,
733 .buffer = &bind_flow.dst_port,
734 .buffer_length = sizeof(bind_flow.dst_port),
735 .length = &bind[3].buffer_length,
738 { .buffer_type = MYSQL_TYPE_LONG,
739 .buffer = &bind_flow.start_time,
740 .buffer_length = sizeof(bind_flow.start_time),
741 .length = &bind[4].buffer_length,
744 { .buffer_type = MYSQL_TYPE_LONG,
745 .buffer = &bind_flow.end_time,
746 .buffer_length = sizeof(bind_flow.end_time),
747 .length = &bind[5].buffer_length,
750 { .buffer_type = MYSQL_TYPE_LONGLONG,
751 .buffer = &bind_flow.packets,
752 .buffer_length = sizeof(bind_flow.packets),
753 .length = &bind[6].buffer_length,
756 { .buffer_type = MYSQL_TYPE_LONGLONG,
757 .buffer = &bind_flow.bytes,
758 .buffer_length = sizeof(bind_flow.bytes),
759 .length = &bind[7].buffer_length,
762 { .buffer_type = MYSQL_TYPE_LONG,
763 .buffer = &bind_flow.end_time,
764 .buffer_length = sizeof(bind_flow.end_time),
765 .length = &bind[8].buffer_length,
768 { .buffer_type = MYSQL_TYPE_LONGLONG,
769 .buffer = &bind_flow.packets,
770 .buffer_length = sizeof(bind_flow.packets),
771 .length = &bind[9].buffer_length,
774 { .buffer_type = MYSQL_TYPE_LONGLONG,
775 .buffer = &bind_flow.bytes,
776 .buffer_length = sizeof(bind_flow.bytes),
777 .length = &bind[10].buffer_length,
782 snprintf(query, sizeof(query),
784 "(protocol,src_port,dst_ip,dst_port,"
785 "start_time,end_time,packets,bytes) "
787 "FROM_UNIXTIME(?),FROM_UNIXTIME(?),?,?) "
788 "ON DUPLICATE KEY UPDATE "
789 "end_time=FROM_UNIXTIME(?),"
794 if (mysql_stmt_prepare(stmt, query, strlen(query)) ||
795 mysql_stmt_bind_param(stmt, bind)) {
796 ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
797 mysql_stmt_close(stmt);
804 for (flow = flow_table->flows_head; flow; flow = flow->next) {
812 slice_name, flow->protocol,
813 IPQUAD(flow_table->src_ip), flow->src_port,
814 IPQUAD(flow->dst_ip), flow->dst_port,
815 (unsigned) flow->start_time,
816 (unsigned) flow->end_time,
821 /* Insert/update flow record */
825 if (mysql_stmt_execute(stmt))
826 ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt));
830 "(protocol,src_port,dst_ip,dst_port,"
831 "start_time,end_time,packets,bytes) "
832 "VALUES (%u,%u,%u,%u,"
833 "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) "
834 "ON DUPLICATE KEY UPDATE "
835 "end_time=FROM_UNIXTIME(%u),"
836 "packets=packets+%llu,"
843 (unsigned) flow->start_time,
844 (unsigned) flow->end_time,
847 (unsigned) flow->end_time,
856 mysql_stmt_close(stmt);
859 mysql_query(dbh, "UNLOCK TABLES");
860 mysql_queryf(dbh, "ALTER TABLE %s ENABLE KEYS", table_name);
862 /* Update table summary */
864 "UPDATE flow_tables "
865 "SET flows=(SELECT COUNT(protocol) FROM %s) "
866 "WHERE table_name='%s'",
870 gettimeofday(&end, NULL);
871 timersub(&end, &start, &end);
874 ulogd_log(ULOGD_DEBUG,
875 "Updated %u rows in %s in %u.%06u s (%u collisions, %llu packets, %llu bytes)\n",
876 flow_table->rows, table_name,
877 (unsigned) end.tv_sec, (unsigned) end.tv_usec,
878 flow_table->collisions, flow_table->packets, flow_table->bytes);
881 for (flow = flow_table->flows_head; flow; flow = next_flow) {
883 flow_table->packets -= flow->packets;
884 flow_table->bytes -= flow->bytes;
886 next_flow = flow->next;
887 assert(next_flow || flow == flow_table->flows_tail);
891 assert(!flow_table->rows);
892 assert(!flow_table->packets);
893 assert(!flow_table->bytes);
895 next_flow_table = flow_table->next;
896 assert(next_flow_table || flow_table == dump_tail);
901 dump_head = dump_tail = NULL;
906 gettimeofday(&end, NULL);
907 timersub(&end, &interval_start, &end);
909 ulogd_log(ULOGD_NOTICE,
910 "Updated %u rows in %u tables (%u bytes) in %u.%06u s "
911 "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
912 rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
913 (unsigned) end.tv_sec, (unsigned) end.tv_usec,
914 table_collisions, row_collisions, packets, bytes);
916 /* Could not connect to database */
917 ulogd_log(ULOGD_ERROR,
918 "Lost %u rows in %u tables (%u bytes) "
919 "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n",
920 rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow),
921 table_collisions, row_collisions, packets, bytes);
937 /* Wait for previous dump thread to complete */
939 pthread_join(dump_thread, NULL);
947 dump_head = flow_tables_head;
948 dump_tail = flow_tables_tail;
951 /* Start up the dump thread if necessary */
952 if (pthread_create(&dump_thread, NULL, dump_interval, NULL)) {
953 ulogd_log(ULOGD_ERROR, "pthread_create: %s\n", strerror(errno));
954 /* Try again later */
956 dump_head = dump_tail = NULL;
960 memset(flow_tables, 0, sizeof(flow_tables));
961 flow_tables_head = flow_tables_tail = NULL;
967 update_flow(struct flow *flow, time_t now, unsigned int packets, unsigned int bytes)
970 if (now > flow->end_time)
971 flow->end_time = now;
972 if (!flow->start_time || now < flow->start_time)
973 flow->start_time = now;
974 flow->packets += packets;
975 flow->bytes += bytes;
977 /* Update table summary */
978 if (now > flow->table->end_time)
979 flow->table->end_time = now;
980 if (!flow->table->start_time || now < flow->table->start_time)
981 flow->table->start_time = now;
982 flow->table->packets += packets;
983 flow->table->bytes += bytes;
993 /* Interesting keys */
1015 #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0]))
1016 static struct intr_id intr_ids[] = {
1017 [OOB_TIME_SEC] = { "oob.time.sec", 0 },
1018 [OOB_MARK] = { "oob.mark", 0 },
1019 [IP_SADDR] = { "ip.saddr", 0 },
1020 [IP_DADDR] = { "ip.daddr", 0 },
1021 [IP_TOTLEN] = { "ip.totlen", 0 },
1022 [IP_PROTOCOL] = { "ip.protocol", 0 },
1023 [TCP_SPORT] = { "tcp.sport", 0 },
1024 [TCP_DPORT] { "tcp.dport", 0 },
1025 [TCP_ACK] = { "tcp.ack", 0 },
1026 [TCP_RST] = { "tcp.rst", 0 },
1027 [UDP_SPORT] = { "udp.sport", 0 },
1028 [UDP_DPORT] = { "udp.dport", 0 },
1029 [ICMP_TYPE] = { "icmp.type", 0 },
1030 [ICMP_CODE] = { "icmp.code", 0 },
1031 [GRE_FLAG_KEY] = { "gre.flag.key", 0 },
1032 [GRE_VERSION] = { "gre.version", 0 },
1033 [GRE_KEY] = { "gre.key", 0 },
1034 [PPTP_CALLID] = { "pptp.callid", 0 },
1037 #define GET_VALUE(x) intr_ids[x].res->value
1039 #define DATE(t) ((t) / (24*60*60) * (24*60*60))
1041 static int _output_netflow(ulog_iret_t *res)
1044 u_int32_t src_ip, dst_ip;
1045 u_int16_t src_port, dst_port;
1047 struct flow *flow = NULL;
1050 now = (time_t) GET_VALUE(OOB_TIME_SEC).ui32;
1052 if (flow_tables_head) {
1053 /* If we have collected for at least 5 minutes, or
1054 * collected the maximum number of flows, or it is now
1055 * the next day, dump this interval.
1057 if ((now - flow_tables_head->start_time) >= (interval.u.value * 60) ||
1058 flows >= MAX_FLOWS ||
1059 DATE(flow_tables_head->start_time) != DATE(now)) {
1061 if (flows >= MAX_FLOWS)
1062 ulogd_log(ULOGD_ERROR, "dumping %d flows early\n", flows);
1068 protocol = GET_VALUE(IP_PROTOCOL).ui8;
1069 src_ip = GET_VALUE(IP_SADDR).ui32;
1070 dst_ip = GET_VALUE(IP_DADDR).ui32;
1071 xid = GET_VALUE(OOB_MARK).ui32;
1076 src_port = GET_VALUE(TCP_SPORT).ui16;
1077 dst_port = GET_VALUE(TCP_DPORT).ui16;
1079 /* check for root termination */
1080 if (xid == ROOT_XID) {
1081 if ((flow = get_flow(src_ip, UNKNOWN_XID, NULL, protocol, src_port, dst_ip, dst_port))) {
1083 * this is supposed to catch some of the cases where the
1084 * network stack responds on behalf of the user but the
1085 * slice is incorrectly accounted for, e.g. on socket
1088 assert(flow->table);
1089 xid = flow->table->xid;
1092 * we have not seen any packets on this flow during the
1093 * current interval, check for the connection refused
1095 if (GET_VALUE(TCP_RST).b && GET_VALUE(TCP_ACK).b)
1096 xid = CONNECTION_REFUSED_XID;
1103 * we could record the source port, however this pretty much
1104 * kills any notion of UDP flows and therefore consume large
1105 * quantities of space, so we set the source port to 0
1106 * tuple.sport = GET_VALUE(UDP_SPORT).ui16;
1111 * traceroutes create a large number of flows in the db
1112 * this is a quick hack to catch the most common form
1113 * of traceroute (basically we're mapping any UDP packet
1114 * in the 33435-33524 range to the "trace" port, 33524 is
1115 * 3 packets * nhops (30).
1117 dst_port = GET_VALUE(UDP_DPORT).ui16;
1118 if (dst_port >= 33435 && dst_port <= 33524)
1123 src_port = GET_VALUE(ICMP_TYPE).ui8;
1124 dst_port = GET_VALUE(ICMP_CODE).ui8;
1127 * We special case some of the ICMP traffic that the kernel
1128 * always generates. Since this is attributed to root, it
1129 * creates significant "noise" in the output. We want to be
1130 * able to quickly see that root is generating traffic.
1132 if (xid == ROOT_XID) {
1133 if (src_port == ICMP_ECHOREPLY)
1134 xid = ICMP_ECHOREPLY_XID;
1135 else if (src_port == ICMP_UNREACH)
1136 xid = ICMP_UNREACH_XID;
1141 if (GET_VALUE(GRE_FLAG_KEY).b) {
1142 if (GET_VALUE(GRE_VERSION).ui8 == 1) {
1143 /* Get PPTP call ID */
1144 src_port = GET_VALUE(PPTP_CALLID).ui16;
1146 /* XXX Truncate GRE keys to 16 bits */
1147 src_port = (u_int16_t) GET_VALUE(GRE_KEY).ui32;
1150 /* No key available */
1157 /* This is the default key for packets from unsupported protocols */
1163 /* Record the flow */
1165 flow = get_flow(src_ip, xid, NULL, protocol, src_port, dst_ip, dst_port);
1170 /* Update the flow */
1171 update_flow(flow, now, 1, GET_VALUE(IP_TOTLEN).ui16);
1176 /* get all key id's for the keys we are intrested in */
1177 static int get_ids(void)
1180 struct intr_id *cur_id;
1182 for (i = 0; i < INTR_IDS; i++) {
1183 cur_id = &intr_ids[i];
1184 cur_id->res = keyh_getres(keyh_getid(cur_id->name));
1186 ulogd_log(ULOGD_ERROR,
1187 "Cannot resolve keyhash id for %s\n",
1196 static int _netflow_init(void)
1198 /* have the opts parsed */
1199 config_parse_file("NETFLOW",config_entries);
1202 ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n");
1206 /* Seed the hash function */
1207 salt = getpid() ^ time(NULL);
1213 static void _netflow_fini(void)
1215 /* should probably stop the dump thread?! */
1221 static ulog_output_t netflow_op = {
1223 .output = &_output_netflow,
1224 .init = _netflow_init,
1225 .fini = _netflow_fini,
1230 register_output(&netflow_op);
1235 static FILE *logfile = NULL; /* logfile pointer */
1236 static int loglevel = 5; /* current loglevel */
1238 /* log message to the logfile */
1239 void __ulogd_log(int level, char *file, int line, const char *format, ...)
1246 /* log only messages which have level at least as high as loglevel */
1247 if (level < loglevel)
1255 va_start(ap, format);
1258 timestr = ctime(&tm);
1259 timestr[strlen(timestr)-1] = '\0';
1260 fprintf(outfd, "%s <%1.1d> %s:%d ", timestr, level, file, line);
1262 vfprintf(outfd, format, ap);
1265 /* flush glibc's buffer */
1270 main(int argc, char **argv)
1274 int option_index = 0;
1275 static struct option long_options[] = {
1276 { "user", required_argument, NULL, 'u' },
1277 { "database", required_argument, NULL, 'd' },
1278 { "password", required_argument, NULL, 'p' },
1279 { "verbose", required_argument, NULL, 'v' },
1280 { "host", required_argument, NULL, 'h' },
1281 { "help", required_argument, NULL, '?' },
1287 c = getopt_long(argc, argv, "u:d:p:v:h:", long_options, &option_index);
1293 strncpy(mysqluser.u.string, optarg, sizeof(mysqluser.u.string));
1296 strncpy(mysqldb.u.string, optarg, sizeof(mysqldb.u.string));
1299 strncpy(mysqlpass.u.string, optarg, sizeof(mysqlpass.u.string));
1302 loglevel = atoi(optarg);
1305 strncpy(mysqlhost.u.string, optarg, sizeof(mysqlhost.u.string));
1308 fprintf(stderr, "usage: %s [OPTION]... YY/mm-dd.log[.bz2|.gz]...\n", argv[0]);
1309 for (opt = long_options; opt->name; opt++)
1310 fprintf(stderr, "\t-%c, --%s%s\n", opt->val, opt->name, required_argument ? "=ARGUMENT" : "");
1315 /* Seed the hash function */
1316 salt = getpid() ^ time(NULL);
1318 /* Don't lookup slice names in /etc/passwd */
1319 strcpy(slicemap.u.string, "/dev/null");
1321 /* All times in the log files are in GMT */
1324 /* Parse the rest of the non-option arguments (files to import) */
1325 for (argv = &argv[optind]; *argv; argv++) {
1326 char pathname[PATH_MAX], *s, *next;
1327 const char *cmd = NULL, *opts = NULL;
1332 int fds[2] = { -1, -1 };
1336 if (!realpath(*argv, pathname)) {
1337 ulogd_log(ULOGD_ERROR, "%s: %s\n", *argv, strerror(errno));
1341 /* We may need to fork a child to decompress the log file */
1342 if (strstr(pathname, ".bz2")) {
1345 } else if (strstr(pathname, ".gz")) {
1348 } else if (strstr(pathname, ".zip")) {
1353 /* Fork a child to decompress the log file */
1357 ulogd_log(ULOGD_ERROR, "pipe: %s\n", strerror(errno));
1360 switch ((pid = fork())) {
1362 ulogd_log(ULOGD_ERROR, "fork: %s\n", strerror(errno));
1367 /* Redirect stdout to the write end of the pipe */
1368 if (dup2(fds[1], fileno(stdout)) < 0) {
1369 ulogd_log(ULOGD_ERROR, "dup2: %s\n", strerror(errno));
1372 execlp(cmd, cmd, opts, pathname, NULL);
1373 ulogd_log(ULOGD_ERROR, "execlp: %s\n", strerror(errno));
1378 /* Open the read end of the pipe */
1379 if (!(fp = fdopen(fds[0], "r"))) {
1380 ulogd_log(ULOGD_ERROR, "fdopen: %s\n", strerror(errno));
1387 /* Just open the file */
1388 else if (!(fp = fopen(pathname, "r"))) {
1389 ulogd_log(ULOGD_ERROR, "%s: %s\n", pathname, strerror(errno));
1393 /* Parse date from the pathname (e.g. [.*]/05/01-25.log[.bz2|gz]) */
1395 gmtime_r(&now, &tm);
1398 while ((s = strsep(&next, "/"))) {
1399 int mon, mday, year;
1401 if (sscanf(s, "%02u-%02u.log%*s", &mon, &mday) == 2) {
1402 tm.tm_mon = mon - 1;
1404 } else if (strlen(s) == 2 && sscanf(s, "%02u", &year) == 1) {
1405 /* Use strptime(3) strategy: ...values in the range
1406 * 69-99 refer to years in the twentieth century
1407 * (1969-1999); values in the range 00-68 refer to
1408 * years in the twenty-first century (2000-2068).
1416 /* Reset to midnight and recalculate derived fields with mktime() */
1417 tm.tm_hour = tm.tm_min = tm.tm_sec = 0;
1420 while (getline(&line, &len, fp) >= 0) {
1424 struct in_addr src_ip, dst_ip;
1425 unsigned int src_port, dst_port;
1426 unsigned int packets, bytes;
1429 /* Parse the flow record */
1432 if (!(s = strsep(&next, ",")) ||
1433 sscanf(s, "%02u-%02u-%02u", &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3)
1437 if (!(s = strsep(&next, ",")))
1439 if (!strcasecmp(s, "TCP"))
1440 protocol = IPPROTO_TCP;
1441 else if (!strcasecmp(s, "UDP"))
1442 protocol = IPPROTO_UDP;
1443 else if (!strcasecmp(s, "ICMP"))
1444 protocol = IPPROTO_ICMP;
1445 else if (!strcasecmp(s, "GRE"))
1446 protocol = IPPROTO_GRE;
1450 if (!(slice = strsep(&next, ",")))
1453 if (!(s = strsep(&next, ",")) ||
1454 !inet_aton(s, &src_ip))
1457 if (!(s = strsep(&next, ",")) ||
1458 sscanf(s, "%u", &src_port) != 1)
1461 if (!(s = strsep(&next, ",")) ||
1462 !inet_aton(s, &dst_ip))
1465 if (!(s = strsep(&next, ",")) ||
1466 sscanf(s, "%u", &dst_port) != 1)
1469 if (!(s = strsep(&next, ",")) ||
1470 sscanf(s, "%u", &packets) != 1)
1473 if (!(s = strsep(&next, ",")) ||
1474 sscanf(s, "%u", &bytes) != 1)
1477 /* Record the flow */
1478 flow = get_flow(ntohl(src_ip.s_addr), 0, slice,
1479 protocol, src_port, ntohl(dst_ip.s_addr), dst_port);
1484 update_flow(flow, now, packets, bytes);
1492 if (pid && kill(pid, SIGTERM))
1493 ulogd_log(ULOGD_ERROR, "kill: %s\n", strerror(errno));
1503 /* Wait for previous dump thread to complete */
1505 pthread_join(dump_thread, NULL);