/* * ulogd output target for IP flow analysis * * Mark Huang * Copyright (C) 2004-2005 The Trustees of Princeton University * * Based on admindump.pl by Mic Bowman and Paul Brett * Copyright (c) 2002 Intel Corporation * * $Id: ulogd_NETFLOW.c,v 1.19 2005/04/20 21:10:05 mlhuang Exp $ */ /* Enable GNU glibc extensions */ #define _GNU_SOURCE #include #include /* va_start() and friends */ #include /* ispunct() */ #include /* strstr() and friends */ #include /* dirname() and basename() */ #include /* fork() and wait() */ #include #include #include /* fgetpwent() */ #include /* errno and assert() */ #include #include /* getopt_long() */ #include /* time() and friends */ #include #include /* inet_aton() */ #include #include #include /* ICMP definitions */ #include #include /* stat() */ #include /* pthread_create() */ #include /* flock() */ #include #include #include #if !defined(STANDALONE) && HAVE_LIBPROPER #include #endif /* * /etc/ulogd.conf configuration options */ /* Dump interval in minutes */ static config_entry_t interval = { .next = NULL, .key = "interval", .type = CONFIG_TYPE_INT, .options = CONFIG_OPT_NONE, .u = { .value = 5 }, }; /* Slice map (in /etc/passwd format) */ static config_entry_t slicemap = { .next = &interval, .key = "slicemap", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "/etc/passwd" }, }; /* MySQL database name */ static config_entry_t mysqldb = { .next = &slicemap, .key = "mysqldb", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "netflow" }, }; /* MySQL database user */ static config_entry_t mysqluser = { .next = &mysqldb, .key = "mysqluser", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "netflow" }, }; /* MySQL database password */ static config_entry_t mysqlpass = { .next = &mysqluser, .key = "mysqlpass", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "" }, }; /* MySQL database host */ static config_entry_t mysqlhost = { .next = &mysqlpass, .key = "mysqlhost", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "" }, }; /* Latest flows in CSV format */ static config_entry_t csv = { .next = &mysqlhost, .key = "csv", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, .u = { .string = "/var/www/html/flows.csv" }, }; #define config_entries (&csv) /* * Debug functionality */ #ifdef DMALLOC #include #endif #define NIPQUAD(addr) \ ((unsigned char *)&addr)[0], \ ((unsigned char *)&addr)[1], \ ((unsigned char *)&addr)[2], \ ((unsigned char *)&addr)[3] #define IPQUAD(addr) \ ((unsigned char *)&addr)[3], \ ((unsigned char *)&addr)[2], \ ((unsigned char *)&addr)[1], \ ((unsigned char *)&addr)[0] /* * MySQL support */ #include /* mysql_query() with printf() syntax */ static int mysql_queryf(MYSQL *dbh, const char *fmt, ...) { char *buf = NULL; size_t len; va_list ap; int ret = 2 * strlen(fmt); do { len = ret + 1; if (!(buf = realloc(buf, len))) { ulogd_log(ULOGD_ERROR, "realloc: %s\n", strerror(errno)); ret = -errno; goto done; } va_start(ap, fmt); ret = vsnprintf(buf, len, fmt, ap); va_end(ap); if (ret < 0) { ulogd_log(ULOGD_ERROR, "vsnprintf: %s\n", strerror(errno)); goto done; } } while (ret >= len); ret = mysql_query(dbh, buf); if (ret) ulogd_log(ULOGD_ERROR, "%s: %s\n", buf, mysql_error(dbh)); done: if (buf) free(buf); return ret; } /* * Jenkins hash support */ typedef u_int8_t u8; typedef u_int16_t u16; typedef u_int32_t u32; #include /* Salt for the hash functions */ static int salt; /* * Hash slice name lookups on context ID. */ /* Special context IDs */ #define UNKNOWN_XID -1 #define ROOT_XID 0 enum { CONNECTION_REFUSED_XID = 65536, /* MAX_S_CONTEXT + 1 */ ICMP_ECHOREPLY_XID, ICMP_UNREACH_XID, }; /* A slice */ struct slice { struct slice *next; int xid; char *name; }; /* Must be a power of 2 */ #define SLICE_HASH_SIZE 128 struct slice *slice_hash[SLICE_HASH_SIZE]; static inline int hash_slice(int xid) { return jhash_1word(xid, salt) & (SLICE_HASH_SIZE - 1); } static struct slice * get_slice(int xid) { struct slice *slice; int i; i = hash_slice(xid); for (slice = slice_hash[i]; slice; slice = slice->next) { if (slice->xid == xid) break; } return slice; } static struct slice * add_slice(int xid, char *name) { struct slice *slice; int i; slice = malloc(sizeof(struct slice)); if (!slice) return NULL; memset(slice, 0, sizeof(struct slice)); slice->xid = xid; slice->name = strdup(name); if (!slice->name) { free(slice); return NULL; } /* Add to hashed list */ i = hash_slice(xid); slice->next = slice_hash[i]; slice_hash[i] = slice; return slice; } static int hash_passwd(void) { int fd; FILE *fp; struct passwd *pw; struct slice *slice, *next; int i; #if !defined(STANDALONE) && HAVE_LIBPROPER if ((fd = prop_open_file(slicemap.u.string, PROP_OPEN_READ)) < 0) { /* prop_open_file() returns -errno */ errno = -fd; } #else fd = open(slicemap.u.string, O_RDONLY); #endif if (fd < 0 || !(fp = fdopen(fd, "r"))) { ulogd_log(ULOGD_ERROR, "%s: %s\n", slicemap.u.string, strerror(errno)); if (fd >= 0) close(fd); return errno; } /* Clean hash */ for (i = 0; i < SLICE_HASH_SIZE; i++) { for (slice = slice_hash[i]; slice; slice = next) { next = slice->next; assert(slice->name); free(slice->name); free(slice); } slice_hash[i] = NULL; } /* (Re)build slice hash */ while ((pw = fgetpwent(fp))) add_slice(pw->pw_uid, pw->pw_name); fclose(fp); close(fd); return 0; } /* * Maintain flows in a two level hash table. The first level hashes * on (src_ip, slice). The second level hashes on (protocol, dst_ip, * src_port, dst_port). This structure mirrors how we store flows in * the database (in many tables where each table contains flows with * the same (src_ip, slice) parameters and unique (protocol, * src_port, dst_ip, dst_port) rows). */ /* Must be powers of 2 */ #ifdef STANDALONE #define TABLE_HASH_SIZE 16 #define FLOW_HASH_SIZE 65536 #else #define TABLE_HASH_SIZE 8 #define FLOW_HASH_SIZE 1024 #endif struct flow { struct flow *nexth; /* Next in hash */ struct flow *next; /* Next in ordered list */ struct flow_table *table; /* Back pointer */ u_int8_t protocol; /* IP protocol number */ u_int16_t src_port; /* IP source port (host order) */ u_int16_t dst_port; /* IP destination port (host order) */ u_int32_t dst_ip; /* IP destination address (host order) */ time_t start_time; /* Timestamp of the first packet in the flow */ time_t end_time; /* Timestamp of the first packet in the flow */ unsigned long long packets; /* Number of IP packets */ unsigned long long bytes; /* Number of IP bytes (including headers) */ }; struct flow_table { struct flow_table *nexth; /* Next in hash */ struct flow_table *next; /* Next in ordered list */ u_int32_t src_ip; /* IP source address (host order) */ int xid; /* Context ID */ char slice[32]; /* Slice name */ unsigned long rows; /* Total number of rows to insert/update */ unsigned long collisions; /* Total number of row hash collisions */ unsigned long long packets; /* Total number of packets sent in all flows in this table */ unsigned long long bytes; /* Total number of bytes sent in all flows in this table */ time_t start_time; /* Start time of the earliest flow in the table */ time_t end_time; /* End time of the latest flow in the table */ /* Hashed list of flows */ struct flow *flows[FLOW_HASH_SIZE]; /* Ordered list of flows */ struct flow *flows_head, *flows_tail; }; /* Hashed list of flow tables */ static struct flow_table *flow_tables[TABLE_HASH_SIZE]; /* Ordered list of flow tables */ static struct flow_table *flow_tables_head, *flow_tables_tail; /* Maximum total number of outstanding allocated flows */ #define MAX_FLOWS 65536 /* ip_conntrack_max on a 1 GB machine */ static int flows; /* Double buffer the ordered list and dump it in another thread */ static struct flow_table *dump_head, *dump_tail; static int dump_flows; static pthread_t dump_thread; static inline int hash_flow_table(u_int32_t src_ip, int xid, char *slice) { if (slice) return jhash(slice, strlen(slice), salt) & (TABLE_HASH_SIZE - 1); else return jhash_2words(src_ip, xid, salt) & (TABLE_HASH_SIZE - 1); } static struct flow_table * get_flow_table(u_int32_t src_ip, int xid, char *slice) { struct flow_table *flow_table; int i; /* See if it already exists */ i = hash_flow_table(src_ip, xid, slice); for (flow_table = flow_tables[i]; flow_table; flow_table = flow_table->nexth) { if (flow_table->src_ip == src_ip && flow_table->xid == xid && (!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1))) break; } if (!flow_table) { /* Allocate a new flow table */ flow_table = malloc(sizeof(struct flow_table)); if (!flow_table) { ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno)); return NULL; } /* Initialize */ memset(flow_table, 0, sizeof(struct flow_table)); flow_table->src_ip = src_ip; flow_table->xid = xid; if (slice) strncpy(flow_table->slice, slice, sizeof(flow_table->slice) - 1); /* Add to hashed list */ i = hash_flow_table(src_ip, xid, slice); flow_table->nexth = flow_tables[i]; flow_tables[i] = flow_table; /* Add to ordered list */ if (flow_tables_tail) { assert(flow_tables_head); flow_tables_tail->next = flow_table; flow_tables_tail = flow_table; } else { assert(!flow_tables_head); flow_tables_head = flow_tables_tail = flow_table; } } assert(flow_table); assert(flow_table->src_ip == src_ip); assert(flow_table->xid == xid); assert(!slice || !strncmp(flow_table->slice, slice, sizeof(flow_table->slice) - 1)); return flow_table; } static inline int hash_flow(u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port) { return jhash_3words(protocol, dst_ip, (src_port << 16) | dst_port, salt) & (FLOW_HASH_SIZE - 1); } static struct flow * get_flow(u_int32_t src_ip, int xid, char *slice, u_int8_t protocol, u_int16_t src_port, u_int32_t dst_ip, u_int16_t dst_port) { struct flow_table *flow_table; struct flow *flow; int i; if (xid != UNKNOWN_XID) flow_table = get_flow_table(src_ip, xid, slice); else { /* Support searching for flows without specifying a context ID */ flow_table = flow_tables_head; } for (; flow_table; flow_table = flow_table->next) { i = hash_flow(protocol, src_port, dst_ip, dst_port); for (flow = flow_table->flows[i]; flow; flow = flow->nexth) { if (flow->protocol == protocol && flow->src_port == src_port && flow->dst_ip == dst_ip && flow->dst_port == dst_port) break; } if (xid != UNKNOWN_XID) break; } if (!flow_table) return NULL; if (!flow) { /* Allocate a new flow */ flow = malloc(sizeof(struct flow)); if (!flow) { ulogd_log(ULOGD_ERROR, "malloc: %s\n", strerror(errno)); return NULL; } /* Initialize */ memset(flow, 0, sizeof(struct flow)); flow->table = flow_table; flow->protocol = protocol; flow->src_port = src_port; flow->dst_ip = dst_ip; flow->dst_port = dst_port; /* Add to hashed list */ i = hash_flow(protocol, src_port, dst_ip, dst_port); flow->nexth = flow_table->flows[i]; flow_table->flows[i] = flow; /* Add to ordered list */ if (flow_table->flows_tail) { assert(flow_table->flows_head); flow_table->flows_tail->next = flow; flow_table->flows_tail = flow; } else { assert(!flow_table->flows_head); flow_table->flows_head = flow_table->flows_tail = flow; } /* Update table statistics */ flow_table->collisions += flow->nexth ? 1 : 0; flow_table->rows++; /* Update total number of outstanding flows */ flows++; } assert(flow); assert(flow->table == flow_table); assert(flow->protocol == protocol); assert(flow->src_port == src_port); assert(flow->dst_ip == dst_ip); assert(flow->dst_port == dst_port); return flow; } /* * The dump thread imports one set of flows while the collection * thread continues to record new flows. */ void * dump_interval(void *unused) { MYSQL *dbh; struct flow_table *flow_table, *next_flow_table; struct flow *flow, *next_flow, bind_flow; struct slice *slice; char slice_name[64]; char table_name[NAME_LEN + 1], *c; struct tm tm; MYSQL_STMT *stmt; char query[512]; unsigned int tables, table_collisions, row_collisions, rows; unsigned long long packets, bytes; struct timeval start, end, interval_start; int fd; FILE *fp; /* Time this interval */ gettimeofday(&interval_start, NULL); /* Hash slice name lookups */ hash_passwd(); /* Open, lock, and truncate CSV file */ if ((fd = open(csv.u.string, O_CREAT | O_WRONLY, 0644)) >= 0) { int tries = 10; while (flock(fd, LOCK_EX | LOCK_NB)) { sleep(1); tries--; } if (tries == 0) ulogd_log(ULOGD_ERROR, "Could not acquire lock on %s\n", csv.u.string); if (!(fp = fdopen(fd, "w"))) close(fd); else ftruncate(fd, 0); } else fp = NULL; /* Connect to DB */ if (!(dbh = mysql_init(NULL))) ulogd_log(ULOGD_ERROR, "mysql_init: failed\n"); if (dbh && !mysql_real_connect(dbh, mysqlhost.u.string[0] ? mysqlhost.u.string : NULL, mysqluser.u.string, mysqlpass.u.string[0] ? mysqlpass.u.string : NULL, mysqldb.u.string, 0, NULL, 0)) { ulogd_log(ULOGD_ERROR, "%s@%s:%s: %s\n", mysqluser.u.string, mysqlhost.u.string[0] ? mysqlhost.u.string : "localhost", mysqldb.u.string, mysql_error(dbh)); mysql_close(dbh); dbh = NULL; } if (dbh) { /* All times in the database are in GMT */ mysql_query(dbh, "SET time_zone='+00:00'"); } /* Initialize statistics */ tables = table_collisions = row_collisions = rows = 0; packets = bytes = 0; assert(dump_head); for (flow_table = dump_head; flow_table; flow_table = next_flow_table) { /* Keep track of some basic statistics */ tables++; table_collisions += flow_table->nexth ? 1 : 0; row_collisions += flow_table->collisions; rows += flow_table->rows; packets += flow_table->packets; bytes += flow_table->bytes; /* Get slice name */ slice_name[sizeof(slice_name) - 1] = '\0'; switch (flow_table->xid) { case CONNECTION_REFUSED_XID: strncpy(slice_name, "connection-refused", sizeof(slice_name)); break; case ICMP_ECHOREPLY_XID: strncpy(slice_name, "icmp-reply", sizeof(slice_name)); break; case ICMP_UNREACH_XID: strncpy(slice_name, "icmp-unreachable", sizeof(slice_name)); break; default: if ((slice = get_slice(flow_table->xid))) strncpy(slice_name, slice->name, sizeof(slice_name)); else if (flow_table->slice[0]) strncpy(slice_name, flow_table->slice, sizeof(slice_name)); else snprintf(slice_name, sizeof(slice_name), "%u", flow_table->xid); break; } if (dbh) { /* Get day */ gmtime_r(&flow_table->start_time, &tm); /* Form a human readable table name */ snprintf(table_name, sizeof(table_name), "%u_%u_%u_%u_%s_%u_%02u_%02u", IPQUAD(flow_table->src_ip), slice_name, 1900 + tm.tm_year, tm.tm_mon + 1, tm.tm_mday); assert(strlen(table_name) <= NAME_LEN); /* Replace punctation with underscores */ for (c = table_name; *c; c++) { if (ispunct(*c)) *c = '_'; } /* Time database operations on each table */ gettimeofday(&start, NULL); /* Insert/update table summary */ if (mysql_queryf(dbh, "INSERT INTO flow_tables " "(table_name,src_ip,slice," "start_time,end_time,packets,bytes) " "VALUES ('%s',%u,'%s'," "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) " "ON DUPLICATE KEY UPDATE " "end_time=FROM_UNIXTIME(%u)," "packets=packets+%llu," "bytes=bytes+%llu", table_name, flow_table->src_ip, slice_name, (unsigned) flow_table->start_time, (unsigned) flow_table->end_time, flow_table->packets, flow_table->bytes, (unsigned) flow_table->end_time, flow_table->packets, flow_table->bytes)) goto free_flow_table; /* Create table */ if (mysql_queryf(dbh, "CREATE TABLE IF NOT EXISTS %s LIKE flow_template", table_name)) goto free_flow_table; /* Lock for speed */ mysql_queryf(dbh, "ALTER TABLE %s DISABLE KEYS", table_name); mysql_queryf(dbh, "LOCK TABLES %s WRITE", table_name); /* Prepare the insert statement */ if (!(stmt = mysql_stmt_init(dbh))) ulogd_log(ULOGD_ERROR, "mysql_stmt_init: %s\n", mysql_error(dbh)); else { my_bool is_null = 0; MYSQL_BIND bind[] = { { .buffer_type = MYSQL_TYPE_TINY, .buffer = &bind_flow.protocol, .buffer_length = sizeof(bind_flow.protocol), .length = &bind[0].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_SHORT, .buffer = &bind_flow.src_port, .buffer_length = sizeof(bind_flow.src_port), .length = &bind[1].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONG, .buffer = &bind_flow.dst_ip, .buffer_length = sizeof(bind_flow.dst_ip), .length = &bind[2].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_SHORT, .buffer = &bind_flow.dst_port, .buffer_length = sizeof(bind_flow.dst_port), .length = &bind[3].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONG, .buffer = &bind_flow.start_time, .buffer_length = sizeof(bind_flow.start_time), .length = &bind[4].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONG, .buffer = &bind_flow.end_time, .buffer_length = sizeof(bind_flow.end_time), .length = &bind[5].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONGLONG, .buffer = &bind_flow.packets, .buffer_length = sizeof(bind_flow.packets), .length = &bind[6].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONGLONG, .buffer = &bind_flow.bytes, .buffer_length = sizeof(bind_flow.bytes), .length = &bind[7].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONG, .buffer = &bind_flow.end_time, .buffer_length = sizeof(bind_flow.end_time), .length = &bind[8].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONGLONG, .buffer = &bind_flow.packets, .buffer_length = sizeof(bind_flow.packets), .length = &bind[9].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, { .buffer_type = MYSQL_TYPE_LONGLONG, .buffer = &bind_flow.bytes, .buffer_length = sizeof(bind_flow.bytes), .length = &bind[10].buffer_length, .is_null = &is_null, .is_unsigned = 1, }, }; snprintf(query, sizeof(query), "INSERT INTO %s " "(protocol,src_port,dst_ip,dst_port," "start_time,end_time,packets,bytes) " "VALUES (?,?,?,?," "FROM_UNIXTIME(?),FROM_UNIXTIME(?),?,?) " "ON DUPLICATE KEY UPDATE " "end_time=FROM_UNIXTIME(?)," "packets=packets+?," "bytes=bytes+?", table_name); if (mysql_stmt_prepare(stmt, query, strlen(query)) || mysql_stmt_bind_param(stmt, bind)) { ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt)); mysql_stmt_close(stmt); stmt = NULL; } } } else stmt = NULL; for (flow = flow_table->flows_head; flow; flow = flow->next) { /* Dump to CSV */ if (fp) { fprintf(fp, "%s,%u," "%u.%u.%u.%u,%u," "%u.%u.%u.%u,%u," "%u,%u,%llu,%llu\n", slice_name, flow->protocol, IPQUAD(flow_table->src_ip), flow->src_port, IPQUAD(flow->dst_ip), flow->dst_port, (unsigned) flow->start_time, (unsigned) flow->end_time, flow->packets, flow->bytes); } /* Insert/update flow record */ if (dbh) { if (stmt) { bind_flow = *flow; if (mysql_stmt_execute(stmt)) ulogd_log(ULOGD_ERROR, "%s: %s\n", query, mysql_stmt_error(stmt)); } else { mysql_queryf(dbh, "INSERT INTO %s " "(protocol,src_port,dst_ip,dst_port," "start_time,end_time,packets,bytes) " "VALUES (%u,%u,%u,%u," "FROM_UNIXTIME(%u),FROM_UNIXTIME(%u),%llu,%llu) " "ON DUPLICATE KEY UPDATE " "end_time=FROM_UNIXTIME(%u)," "packets=packets+%llu," "bytes=bytes+%llu", table_name, flow->protocol, flow->src_port, flow->dst_ip, flow->dst_port, (unsigned) flow->start_time, (unsigned) flow->end_time, flow->packets, flow->bytes, (unsigned) flow->end_time, flow->packets, flow->bytes); } } } if (dbh) { if (stmt) mysql_stmt_close(stmt); /* Unlock */ mysql_query(dbh, "UNLOCK TABLES"); mysql_queryf(dbh, "ALTER TABLE %s ENABLE KEYS", table_name); /* Update table summary */ mysql_queryf(dbh, "UPDATE flow_tables " "SET flows=(SELECT COUNT(protocol) FROM %s) " "WHERE table_name='%s'", table_name, table_name); gettimeofday(&end, NULL); timersub(&end, &start, &end); } ulogd_log(ULOGD_DEBUG, "Updated %u rows in %s in %u.%06u s (%u collisions, %llu packets, %llu bytes)\n", flow_table->rows, table_name, (unsigned) end.tv_sec, (unsigned) end.tv_usec, flow_table->collisions, flow_table->packets, flow_table->bytes); free_flow_table: for (flow = flow_table->flows_head; flow; flow = next_flow) { flow_table->rows--; flow_table->packets -= flow->packets; flow_table->bytes -= flow->bytes; dump_flows--; next_flow = flow->next; assert(next_flow || flow == flow_table->flows_tail); free(flow); } assert(!flow_table->rows); assert(!flow_table->packets); assert(!flow_table->bytes); next_flow_table = flow_table->next; assert(next_flow_table || flow_table == dump_tail); free(flow_table); } assert(!dump_flows); dump_head = dump_tail = NULL; if (dbh) { mysql_close(dbh); gettimeofday(&end, NULL); timersub(&end, &interval_start, &end); ulogd_log(ULOGD_NOTICE, "Updated %u rows in %u tables (%u bytes) in %u.%06u s " "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n", rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow), (unsigned) end.tv_sec, (unsigned) end.tv_usec, table_collisions, row_collisions, packets, bytes); } else { /* Could not connect to database */ ulogd_log(ULOGD_ERROR, "Lost %u rows in %u tables (%u bytes) " "(%u table collisions, %u row collisions, %llu packets, %llu bytes)\n", rows, tables, tables * sizeof(struct flow_table) + flows * sizeof(struct flow), table_collisions, row_collisions, packets, bytes); } if (fp) { fclose(fp); assert(fd >= 0); close(fd); } pthread_exit(NULL); return NULL; } static void start_dump(void) { /* Wait for previous dump thread to complete */ if (dump_thread) { pthread_join(dump_thread, NULL); dump_thread = 0; } /* Switch buffers */ assert(!dump_head); assert(!dump_tail); assert(!dump_flows); dump_head = flow_tables_head; dump_tail = flow_tables_tail; dump_flows = flows; /* Start up the dump thread if necessary */ if (pthread_create(&dump_thread, NULL, dump_interval, NULL)) { ulogd_log(ULOGD_ERROR, "pthread_create: %s\n", strerror(errno)); /* Try again later */ dump_thread = 0; dump_head = dump_tail = NULL; dump_flows = 0; } else { /* Clear hash */ memset(flow_tables, 0, sizeof(flow_tables)); flow_tables_head = flow_tables_tail = NULL; flows = 0; } } static void update_flow(struct flow *flow, time_t now, unsigned int packets, unsigned int bytes) { /* Update flow */ if (now > flow->end_time) flow->end_time = now; if (!flow->start_time || now < flow->start_time) flow->start_time = now; flow->packets += packets; flow->bytes += bytes; /* Update table summary */ if (now > flow->table->end_time) flow->table->end_time = now; if (!flow->table->start_time || now < flow->table->start_time) flow->table->start_time = now; flow->table->packets += packets; flow->table->bytes += bytes; } #ifndef STANDALONE struct intr_id { char* name; ulog_iret_t *res; }; /* Interesting keys */ enum { OOB_TIME_SEC = 0, OOB_MARK, IP_SADDR, IP_DADDR, IP_TOTLEN, IP_PROTOCOL, TCP_SPORT, TCP_DPORT, TCP_ACK, TCP_RST, UDP_SPORT, UDP_DPORT, ICMP_TYPE, ICMP_CODE, GRE_FLAG_KEY, GRE_VERSION, GRE_KEY, PPTP_CALLID, }; #define INTR_IDS (sizeof(intr_ids)/sizeof(intr_ids[0])) static struct intr_id intr_ids[] = { [OOB_TIME_SEC] = { "oob.time.sec", 0 }, [OOB_MARK] = { "oob.mark", 0 }, [IP_SADDR] = { "ip.saddr", 0 }, [IP_DADDR] = { "ip.daddr", 0 }, [IP_TOTLEN] = { "ip.totlen", 0 }, [IP_PROTOCOL] = { "ip.protocol", 0 }, [TCP_SPORT] = { "tcp.sport", 0 }, [TCP_DPORT] { "tcp.dport", 0 }, [TCP_ACK] = { "tcp.ack", 0 }, [TCP_RST] = { "tcp.rst", 0 }, [UDP_SPORT] = { "udp.sport", 0 }, [UDP_DPORT] = { "udp.dport", 0 }, [ICMP_TYPE] = { "icmp.type", 0 }, [ICMP_CODE] = { "icmp.code", 0 }, [GRE_FLAG_KEY] = { "gre.flag.key", 0 }, [GRE_VERSION] = { "gre.version", 0 }, [GRE_KEY] = { "gre.key", 0 }, [PPTP_CALLID] = { "pptp.callid", 0 }, }; #define GET_VALUE(x) intr_ids[x].res->value #define DATE(t) ((t) / (24*60*60) * (24*60*60)) static int _output_netflow(ulog_iret_t *res) { u_int8_t protocol; u_int32_t src_ip, dst_ip; u_int16_t src_port, dst_port; int xid; struct flow *flow = NULL; time_t now; now = (time_t) GET_VALUE(OOB_TIME_SEC).ui32; if (flow_tables_head) { /* If we have collected for at least 5 minutes, or * collected the maximum number of flows, or it is now * the next day, dump this interval. */ if ((now - flow_tables_head->start_time) >= (interval.u.value * 60) || flows >= MAX_FLOWS || DATE(flow_tables_head->start_time) != DATE(now)) { /* Out of memory */ if (flows >= MAX_FLOWS) ulogd_log(ULOGD_ERROR, "dumping %d flows early\n", flows); start_dump(); } } protocol = GET_VALUE(IP_PROTOCOL).ui8; src_ip = GET_VALUE(IP_SADDR).ui32; dst_ip = GET_VALUE(IP_DADDR).ui32; xid = GET_VALUE(OOB_MARK).ui32; switch (protocol) { case IPPROTO_TCP: src_port = GET_VALUE(TCP_SPORT).ui16; dst_port = GET_VALUE(TCP_DPORT).ui16; /* check for root termination */ if (xid == ROOT_XID) { if ((flow = get_flow(src_ip, UNKNOWN_XID, NULL, protocol, src_port, dst_ip, dst_port))) { /* * this is supposed to catch some of the cases where the * network stack responds on behalf of the user but the * slice is incorrectly accounted for, e.g. on socket * shutdown */ assert(flow->table); xid = flow->table->xid; } else { /* * we have not seen any packets on this flow during the * current interval, check for the connection refused */ if (GET_VALUE(TCP_RST).b && GET_VALUE(TCP_ACK).b) xid = CONNECTION_REFUSED_XID; } } break; case IPPROTO_UDP: /* * we could record the source port, however this pretty much * kills any notion of UDP flows and therefore consume large * quantities of space, so we set the source port to 0 * tuple.sport = GET_VALUE(UDP_SPORT).ui16; */ src_port = 0; /* * traceroutes create a large number of flows in the db * this is a quick hack to catch the most common form * of traceroute (basically we're mapping any UDP packet * in the 33435-33524 range to the "trace" port, 33524 is * 3 packets * nhops (30). */ dst_port = GET_VALUE(UDP_DPORT).ui16; if (dst_port >= 33435 && dst_port <= 33524) dst_port = 33435; break; case IPPROTO_ICMP: src_port = GET_VALUE(ICMP_TYPE).ui8; dst_port = GET_VALUE(ICMP_CODE).ui8; /* * We special case some of the ICMP traffic that the kernel * always generates. Since this is attributed to root, it * creates significant "noise" in the output. We want to be * able to quickly see that root is generating traffic. */ if (xid == ROOT_XID) { if (src_port == ICMP_ECHOREPLY) xid = ICMP_ECHOREPLY_XID; else if (src_port == ICMP_UNREACH) xid = ICMP_UNREACH_XID; } break; case IPPROTO_GRE: if (GET_VALUE(GRE_FLAG_KEY).b) { if (GET_VALUE(GRE_VERSION).ui8 == 1) { /* Get PPTP call ID */ src_port = GET_VALUE(PPTP_CALLID).ui16; } else { /* XXX Truncate GRE keys to 16 bits */ src_port = (u_int16_t) GET_VALUE(GRE_KEY).ui32; } } else { /* No key available */ src_port = 0; } dst_port = 0; break; default: /* This is the default key for packets from unsupported protocols */ src_port = 0; dst_port = 0; break; } /* Record the flow */ if (!flow) { flow = get_flow(src_ip, xid, NULL, protocol, src_port, dst_ip, dst_port); if (!flow) return -ENOMEM; } /* Update the flow */ update_flow(flow, now, 1, GET_VALUE(IP_TOTLEN).ui16); return 0; } /* get all key id's for the keys we are intrested in */ static int get_ids(void) { int i; struct intr_id *cur_id; for (i = 0; i < INTR_IDS; i++) { cur_id = &intr_ids[i]; cur_id->res = keyh_getres(keyh_getid(cur_id->name)); if (!cur_id->res) { ulogd_log(ULOGD_ERROR, "Cannot resolve keyhash id for %s\n", cur_id->name); return 1; } } return 0; } static int _netflow_init(void) { /* have the opts parsed */ config_parse_file("NETFLOW",config_entries); if (get_ids()) { ulogd_log(ULOGD_ERROR, "can't resolve all keyhash id's\n"); exit(2); } /* Seed the hash function */ salt = getpid() ^ time(NULL); return 0; } static void _netflow_fini(void) { /* should probably stop the dump thread?! */ /* do nothing */ } static ulog_output_t netflow_op = { .name = "netflow", .output = &_output_netflow, .init = _netflow_init, .fini = _netflow_fini, }; void _init(void) { register_output(&netflow_op); } #else static FILE *logfile = NULL; /* logfile pointer */ static int loglevel = 5; /* current loglevel */ /* log message to the logfile */ void __ulogd_log(int level, char *file, int line, const char *format, ...) { char *timestr; va_list ap; time_t tm; FILE *outfd; /* log only messages which have level at least as high as loglevel */ if (level < loglevel) return; if (logfile) outfd = logfile; else outfd = stderr; va_start(ap, format); tm = time(NULL); timestr = ctime(&tm); timestr[strlen(timestr)-1] = '\0'; fprintf(outfd, "%s <%1.1d> %s:%d ", timestr, level, file, line); vfprintf(outfd, format, ap); va_end(ap); /* flush glibc's buffer */ fflush(outfd); } int main(int argc, char **argv) { /* Get options */ while (1) { int option_index = 0; static struct option long_options[] = { { "user", required_argument, NULL, 'u' }, { "database", required_argument, NULL, 'd' }, { "password", required_argument, NULL, 'p' }, { "verbose", required_argument, NULL, 'v' }, { "host", required_argument, NULL, 'h' }, { "help", required_argument, NULL, '?' }, { 0, 0, 0, 0 } }; struct option *opt; int c; c = getopt_long(argc, argv, "u:d:p:v:h:", long_options, &option_index); if (c == -1) break; switch (c) { case 'u': strncpy(mysqluser.u.string, optarg, sizeof(mysqluser.u.string)); break; case 'd': strncpy(mysqldb.u.string, optarg, sizeof(mysqldb.u.string)); break; case 'p': strncpy(mysqlpass.u.string, optarg, sizeof(mysqlpass.u.string)); break; case 'v': loglevel = atoi(optarg); break; case 'h': strncpy(mysqlhost.u.string, optarg, sizeof(mysqlhost.u.string)); break; default: fprintf(stderr, "usage: %s [OPTION]... YY/mm-dd.log[.bz2|.gz]...\n", argv[0]); for (opt = long_options; opt->name; opt++) fprintf(stderr, "\t-%c, --%s%s\n", opt->val, opt->name, required_argument ? "=ARGUMENT" : ""); return 1; } } /* Seed the hash function */ salt = getpid() ^ time(NULL); /* Don't lookup slice names in /etc/passwd */ strcpy(slicemap.u.string, "/dev/null"); /* All times in the log files are in GMT */ putenv("TZ=GMT"); /* Parse the rest of the non-option arguments (files to import) */ for (argv = &argv[optind]; *argv; argv++) { char pathname[PATH_MAX], *s, *next; const char *cmd = NULL, *opts = NULL; pid_t pid = 0; FILE *fp = NULL; char *line = NULL; size_t len = 0; int fds[2] = { -1, -1 }; time_t now; struct tm tm; if (!realpath(*argv, pathname)) { ulogd_log(ULOGD_ERROR, "%s: %s\n", *argv, strerror(errno)); goto next_file; } /* We may need to fork a child to decompress the log file */ if (strstr(pathname, ".bz2")) { cmd = "bzip2"; opts = "-cdfq"; } else if (strstr(pathname, ".gz")) { cmd = "gzip"; opts = "-cdfq"; } else if (strstr(pathname, ".zip")) { cmd = "unzip"; opts = "-p"; } /* Fork a child to decompress the log file */ if (cmd) { /* Open a pipe */ if (pipe(fds)) { ulogd_log(ULOGD_ERROR, "pipe: %s\n", strerror(errno)); goto next_file; } switch ((pid = fork())) { case -1: ulogd_log(ULOGD_ERROR, "fork: %s\n", strerror(errno)); goto next_file; case 0: close(fds[0]); fds[0] = -1; /* Redirect stdout to the write end of the pipe */ if (dup2(fds[1], fileno(stdout)) < 0) { ulogd_log(ULOGD_ERROR, "dup2: %s\n", strerror(errno)); exit(errno); } execlp(cmd, cmd, opts, pathname, NULL); ulogd_log(ULOGD_ERROR, "execlp: %s\n", strerror(errno)); goto next_file; default: close(fds[1]); fds[1] = -1; /* Open the read end of the pipe */ if (!(fp = fdopen(fds[0], "r"))) { ulogd_log(ULOGD_ERROR, "fdopen: %s\n", strerror(errno)); goto next_file; } break; } } /* Just open the file */ else if (!(fp = fopen(pathname, "r"))) { ulogd_log(ULOGD_ERROR, "%s: %s\n", pathname, strerror(errno)); goto next_file; } /* Parse date from the pathname (e.g. [.*]/05/01-25.log[.bz2|gz]) */ now = time(NULL); gmtime_r(&now, &tm); next = pathname; while ((s = strsep(&next, "/"))) { int mon, mday, year; if (sscanf(s, "%02u-%02u.log%*s", &mon, &mday) == 2) { tm.tm_mon = mon - 1; tm.tm_mday = mday; } else if (strlen(s) == 2 && sscanf(s, "%02u", &year) == 1) { /* Use strptime(3) strategy: ...values in the range * 69-99 refer to years in the twentieth century * (1969-1999); values in the range 00-68 refer to * years in the twenty-first century (2000-2068). */ if (year < 69) year += 100; tm.tm_year = year; } } /* Reset to midnight and recalculate derived fields with mktime() */ tm.tm_hour = tm.tm_min = tm.tm_sec = 0; mktime(&tm); while (getline(&line, &len, fp) >= 0) { char *s, *next; char *slice; u_int8_t protocol; struct in_addr src_ip, dst_ip; unsigned int src_port, dst_port; unsigned int packets, bytes; struct flow *flow; /* Parse the flow record */ next = line; if (!(s = strsep(&next, ",")) || sscanf(s, "%02u-%02u-%02u", &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 3) continue; now = mktime(&tm); if (!(s = strsep(&next, ","))) continue; if (!strcasecmp(s, "TCP")) protocol = IPPROTO_TCP; else if (!strcasecmp(s, "UDP")) protocol = IPPROTO_UDP; else if (!strcasecmp(s, "ICMP")) protocol = IPPROTO_ICMP; else if (!strcasecmp(s, "GRE")) protocol = IPPROTO_GRE; else protocol = atoi(s); if (!(slice = strsep(&next, ","))) continue; if (!(s = strsep(&next, ",")) || !inet_aton(s, &src_ip)) continue; if (!(s = strsep(&next, ",")) || sscanf(s, "%u", &src_port) != 1) continue; if (!(s = strsep(&next, ",")) || !inet_aton(s, &dst_ip)) continue; if (!(s = strsep(&next, ",")) || sscanf(s, "%u", &dst_port) != 1) continue; if (!(s = strsep(&next, ",")) || sscanf(s, "%u", &packets) != 1) continue; if (!(s = strsep(&next, ",")) || sscanf(s, "%u", &bytes) != 1) continue; /* Record the flow */ flow = get_flow(ntohl(src_ip.s_addr), 0, slice, protocol, src_port, ntohl(dst_ip.s_addr), dst_port); if (!flow) continue; /* Update flow */ update_flow(flow, now, packets, bytes); } if (line) free(line); start_dump(); next_file: if (pid && kill(pid, SIGTERM)) ulogd_log(ULOGD_ERROR, "kill: %s\n", strerror(errno)); wait(NULL); if (fp) fclose(fp); if (fds[0] >= 0) close(fds[0]); if (fds[1] >= 0) close(fds[1]); } /* Wait for previous dump thread to complete */ if (dump_thread) { pthread_join(dump_thread, NULL); dump_thread = 0; } return 0; } #endif