--- /dev/null
+/*
+ Copyright (C) Slava Astashonok <sla@0n.ru>
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License.
+
+ $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
+*/
+
+#include <common.h>
+
+/* stdout, stderr, freopen() */
+#include <stdio.h>
+
+/* atoi(), exit() */
+#include <stdlib.h>
+
+/* getopt(), alarm(), getpid(), sedsid(), chdir() */
+#include <unistd.h>
+
+/* strerror() */
+#include <string.h>
+
+/* sig*() */
+#include <signal.h>
+
+#include <libipulog/libipulog.h>
+struct ipulog_handle {
+ int fd;
+ u_int8_t blocking;
+ struct sockaddr_nl local;
+ struct sockaddr_nl peer;
+ struct nlmsghdr* last_nlhdr;
+};
+
+/* inet_*() (Linux, FreeBSD, Solaris), getpid() */
+#include <sys/types.h>
+#include <netinet/in_systm.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <netinet/ip_icmp.h>
+#include <net/if.h>
+
+#include <sys/param.h>
+#include <pwd.h>
+#ifdef OS_LINUX
+#include <grp.h>
+#endif
+
+/* pthread_*() */
+#include <pthread.h>
+
+/* errno */
+#include <errno.h>
+
+/* getaddrinfo() */
+#include <netdb.h>
+
+/* nanosleep() */
+#include <time.h>
+
+/* gettimeofday() */
+#include <sys/time.h>
+
+/* scheduling */
+#include <sched.h>
+
+/* select() (POSIX)*/
+#include <sys/select.h>
+
+/* open() */
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <fprobe-ulog.h>
+#include <my_log.h>
+#include <my_getopt.h>
+#include <netflow.h>
+#include <hash.h>
+#include <mem.h>
+
+enum {
+ aflag,
+ Bflag,
+ bflag,
+ cflag,
+ dflag,
+ eflag,
+ fflag,
+ gflag,
+ hflag,
+ lflag,
+ mflag,
+ Mflag,
+ nflag,
+ qflag,
+ rflag,
+ sflag,
+ tflag,
+ Uflag,
+ uflag,
+ vflag,
+ Xflag,
+};
+
+static struct getopt_parms parms[] = {
+ {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'h', 0, 0, 0},
+ {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'M', 0, 0, 0},
+ {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {0, 0, 0, 0}
+};
+
+extern char *optarg;
+extern int optind, opterr, optopt;
+extern int errno;
+
+extern struct NetFlow NetFlow1;
+extern struct NetFlow NetFlow5;
+extern struct NetFlow NetFlow7;
+
+#define mark_is_tos parms[Mflag].count
+static unsigned scan_interval = 5;
+static int frag_lifetime = 30;
+static int inactive_lifetime = 60;
+static int active_lifetime = 300;
+static int sockbufsize;
+#define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
+#if (MEM_BITS == 0) || (MEM_BITS == 16)
+#define BULK_QUANTITY 10000
+#else
+#define BULK_QUANTITY 200
+#endif
+static unsigned bulk_quantity = BULK_QUANTITY;
+static unsigned pending_queue_length = 100;
+static struct NetFlow *netflow = &NetFlow5;
+static unsigned verbosity = 6;
+static unsigned log_dest = MY_LOG_SYSLOG;
+static struct Time start_time;
+static long start_time_offset;
+static int off_tl;
+/* From mem.c */
+extern unsigned total_elements;
+extern unsigned free_elements;
+extern unsigned total_memory;
+#if ((DEBUG) & DEBUG_I)
+static unsigned emit_pkts, emit_queue;
+static uint64_t size_total;
+static unsigned pkts_total, pkts_total_fragmented;
+static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
+static unsigned pkts_pending, pkts_pending_done;
+static unsigned pending_queue_trace, pending_queue_trace_candidate;
+static unsigned flows_total, flows_fragmented;
+#endif
+static unsigned emit_count;
+static uint32_t emit_sequence;
+static unsigned emit_rate_bytes, emit_rate_delay;
+static struct Time emit_time;
+static uint8_t emit_packet[NETFLOW_MAX_PACKET];
+static pthread_t thid;
+static sigset_t sig_mask;
+static struct sched_param schedp;
+static int sched_min, sched_max;
+static int npeers, npeers_rot;
+static struct peer *peers;
+static int sigs;
+
+static struct Flow *flows[1 << HASH_BITS];
+static pthread_mutex_t flows_mutex[1 << HASH_BITS];
+
+static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
+
+static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
+static struct Flow *pending_head, *pending_tail;
+static struct Flow *scan_frag_dreg;
+
+static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
+static struct Flow *flows_emit;
+
+static char ident[256] = "fprobe-ulog";
+static FILE *pidfile;
+static char *pidfilepath;
+static pid_t pid;
+static int killed;
+static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
+static struct ipulog_handle *ulog_handle;
+static uint32_t ulog_gmask = 1;
+static char *cap_buf;
+static int nsnmp_rules;
+static struct snmp_rule *snmp_rules;
+static struct passwd *pw = 0;
+
+void usage()
+{
+ fprintf(stdout,
+ "fprobe-ulog: a NetFlow probe. Version %s\n"
+ "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
+ "\n"
+ "-h\t\tDisplay this help\n"
+ "-U <mask>\tULOG group bitwise mask [1]\n"
+ "-s <seconds>\tHow often scan for expired flows [5]\n"
+ "-g <seconds>\tFragmented flow lifetime [30]\n"
+ "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
+ "-f <filename>\tLog flow data in a file\n"
+ "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
+ "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
+ "-a <address>\tUse <address> as source for NetFlow flow\n"
+ "-X <rules>\tInterface name to SNMP-index conversion rules\n"
+ "-M\t\tUse netfilter mark value as ToS flag\n"
+ "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
+ "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
+ "-q <flows>\tPending queue length [100]\n"
+ "-B <kilobytes>\tKernel capture buffer size [0]\n"
+ "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
+ "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
+ "-c <directory>\tDirectory to chroot to\n"
+ "-u <user>\tUser to run as\n"
+ "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
+ "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
+ "-y <remote:port>\tAddress of the NetFlow collector\n",
+ "-f <writable file>\tFile to write data into\n"
+ VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
+ exit(0);
+}
+
+#if ((DEBUG) & DEBUG_I)
+void info_debug()
+{
+ my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
+ pkts_total, pkts_total_fragmented, size_total,
+ pkts_pending - pkts_pending_done, pending_queue_trace);
+ my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
+ pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
+ my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
+ flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
+ my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
+ total_elements, free_elements, total_memory);
+}
+#endif
+
+void sighandler(int sig)
+{
+ switch (sig) {
+ case SIGTERM:
+ sigs |= SIGTERM_MASK;
+ break;
+#if ((DEBUG) & DEBUG_I)
+ case SIGUSR1:
+ sigs |= SIGUSR1_MASK;
+ break;
+#endif
+ }
+}
+
+void gettime(struct Time *now)
+{
+ struct timeval t;
+
+ gettimeofday(&t, 0);
+ now->sec = t.tv_sec;
+ now->usec = t.tv_usec;
+}
+
+inline time_t cmpmtime(struct Time *t1, struct Time *t2)
+{
+ return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
+}
+
+/* Uptime in miliseconds */
+uint32_t getuptime(struct Time *t)
+{
+ /* Maximum uptime is about 49/2 days */
+ return cmpmtime(t, &start_time);
+}
+
+hash_t hash_flow(struct Flow *flow)
+{
+ if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
+ else return hash(flow, sizeof(struct Flow_TL));
+}
+
+uint16_t snmp_index(char *name) {
+ uint32_t i;
+
+ if (!*name) return 0;
+
+ for (i = 0; (int) i < nsnmp_rules; i++) {
+ if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
+ return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
+ }
+
+ if ((i = if_nametoindex(name))) return i;
+
+ return -1;
+}
+
+inline void copy_flow(struct Flow *src, struct Flow *dst)
+{
+ dst->iif = src->iif;
+ dst->oif = src->oif;
+ dst->sip = src->sip;
+ dst->dip = src->dip;
+ dst->tos = src->tos;
+ dst->proto = src->proto;
+ dst->tcp_flags = src->tcp_flags;
+ dst->id = src->id;
+ dst->sp = src->sp;
+ dst->dp = src->dp;
+ dst->pkts = src->pkts;
+ dst->size = src->size;
+ dst->sizeF = src->sizeF;
+ dst->sizeP = src->sizeP;
+ dst->ctime = src->ctime;
+ dst->mtime = src->mtime;
+ dst->flags = src->flags;
+}
+
+struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
+{
+ struct Flow **flowpp;
+
+#ifdef WALL
+ flowpp = 0;
+#endif
+
+ if (prev) flowpp = *prev;
+
+ while (where) {
+ if (where->sip.s_addr == what->sip.s_addr
+ && where->dip.s_addr == what->dip.s_addr
+ && where->proto == what->proto) {
+ switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
+ case 0:
+ /* Both unfragmented */
+ if ((what->sp == where->sp)
+ && (what->dp == where->dp)) goto done;
+ break;
+ case 2:
+ /* Both fragmented */
+ if (where->id == what->id) goto done;
+ break;
+ }
+ }
+ flowpp = &where->next;
+ where = where->next;
+ }
+done:
+ if (prev) *prev = flowpp;
+ return where;
+}
+
+int put_into(struct Flow *flow, int flag
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ , char *logbuf
+#endif
+)
+{
+ int ret = 0;
+ hash_t h;
+ struct Flow *flown, **flowpp;
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ char buf[64];
+#endif
+
+ h = hash_flow(flow);
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ sprintf(buf, " %x H:%04x", (unsigned) flow, h);
+ strcat(logbuf, buf);
+#endif
+ pthread_mutex_lock(&flows_mutex[h]);
+ flowpp = &flows[h];
+ if (!(flown = find(flows[h], flow, &flowpp))) {
+ /* No suitable flow found - add */
+ if (flag == COPY_INTO) {
+ if ((flown = mem_alloc())) {
+ copy_flow(flow, flown);
+ flow = flown;
+ } else {
+#if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
+ my_log(LOG_ERR, "%s %s. %s",
+ "mem_alloc():", strerror(errno), "packet lost");
+#endif
+ return -1;
+ }
+ }
+ flow->next = flows[h];
+ flows[h] = flow;
+#if ((DEBUG) & DEBUG_I)
+ flows_total++;
+ if (flow->flags & FLOW_FRAG) flows_fragmented++;
+#endif
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ if (flown) {
+ sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
+ strcat(logbuf, buf);
+ }
+#endif
+ } else {
+ /* Found suitable flow - update */
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ sprintf(buf, " +> %x", (unsigned) flown);
+ strcat(logbuf, buf);
+#endif
+ if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
+ flown->mtime = flow->mtime;
+ if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
+ flown->ctime = flow->ctime;
+ flown->tcp_flags |= flow->tcp_flags;
+ flown->size += flow->size;
+ flown->pkts += flow->pkts;
+ if (flow->flags & FLOW_FRAG) {
+ /* Fragmented flow require some additional work */
+ if (flow->flags & FLOW_TL) {
+ /*
+ ?FIXME?
+ Several packets with FLOW_TL (attack)
+ */
+ flown->sp = flow->sp;
+ flown->dp = flow->dp;
+ }
+ if (flow->flags & FLOW_LASTFRAG) {
+ /*
+ ?FIXME?
+ Several packets with FLOW_LASTFRAG (attack)
+ */
+ flown->sizeP = flow->sizeP;
+ }
+ flown->flags |= flow->flags;
+ flown->sizeF += flow->sizeF;
+ if ((flown->flags & FLOW_LASTFRAG)
+ && (flown->sizeF >= flown->sizeP)) {
+ /* All fragments received - flow reassembled */
+ *flowpp = flown->next;
+ pthread_mutex_unlock(&flows_mutex[h]);
+#if ((DEBUG) & DEBUG_I)
+ flows_total--;
+ flows_fragmented--;
+#endif
+ flown->id = 0;
+ flown->flags &= ~FLOW_FRAG;
+#if ((DEBUG) & (DEBUG_U | DEBUG_S))
+ strcat(logbuf," R");
+#endif
+ ret = put_into(flown, MOVE_INTO
+#if ((DEBUG) & (DEBUG_U | DEBUG_S))
+ , logbuf
+#endif
+ );
+ }
+ }
+ if (flag == MOVE_INTO) mem_free(flow);
+ }
+ pthread_mutex_unlock(&flows_mutex[h]);
+ return ret;
+}
+
+void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
+{
+ int i;
+
+ for (i = 0; i < fields; i++) {
+#if ((DEBUG) & DEBUG_F)
+ my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
+#endif
+ switch (format[i]) {
+ case NETFLOW_IPV4_SRC_ADDR:
+ ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
+ p += NETFLOW_IPV4_SRC_ADDR_SIZE;
+ break;
+
+ case NETFLOW_IPV4_DST_ADDR:
+ ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
+ p += NETFLOW_IPV4_DST_ADDR_SIZE;
+ break;
+
+ case NETFLOW_INPUT_SNMP:
+ *((uint16_t *) p) = htons(flow->iif);
+ p += NETFLOW_INPUT_SNMP_SIZE;
+ break;
+
+ case NETFLOW_OUTPUT_SNMP:
+ *((uint16_t *) p) = htons(flow->oif);
+ p += NETFLOW_OUTPUT_SNMP_SIZE;
+ break;
+
+ case NETFLOW_PKTS_32:
+ *((uint32_t *) p) = htonl(flow->pkts);
+ p += NETFLOW_PKTS_32_SIZE;
+ break;
+
+ case NETFLOW_BYTES_32:
+ *((uint32_t *) p) = htonl(flow->size);
+ p += NETFLOW_BYTES_32_SIZE;
+ break;
+
+ case NETFLOW_FIRST_SWITCHED:
+ *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
+ p += NETFLOW_FIRST_SWITCHED_SIZE;
+ break;
+
+ case NETFLOW_LAST_SWITCHED:
+ *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
+ p += NETFLOW_LAST_SWITCHED_SIZE;
+ break;
+
+ case NETFLOW_L4_SRC_PORT:
+ *((uint16_t *) p) = flow->sp;
+ p += NETFLOW_L4_SRC_PORT_SIZE;
+ break;
+
+ case NETFLOW_L4_DST_PORT:
+ *((uint16_t *) p) = flow->dp;
+ p += NETFLOW_L4_DST_PORT_SIZE;
+ break;
+
+ case NETFLOW_PROT:
+ *((uint8_t *) p) = flow->proto;
+ p += NETFLOW_PROT_SIZE;
+ break;
+
+ case NETFLOW_SRC_TOS:
+ *((uint8_t *) p) = flow->tos;
+ p += NETFLOW_SRC_TOS_SIZE;
+ break;
+
+ case NETFLOW_TCP_FLAGS:
+ *((uint8_t *) p) = flow->tcp_flags;
+ p += NETFLOW_TCP_FLAGS_SIZE;
+ break;
+
+ case NETFLOW_VERSION:
+ *((uint16_t *) p) = htons(netflow->Version);
+ p += NETFLOW_VERSION_SIZE;
+ break;
+
+ case NETFLOW_COUNT:
+ *((uint16_t *) p) = htons(emit_count);
+ p += NETFLOW_COUNT_SIZE;
+ break;
+
+ case NETFLOW_UPTIME:
+ *((uint32_t *) p) = htonl(getuptime(&emit_time));
+ p += NETFLOW_UPTIME_SIZE;
+ break;
+
+ case NETFLOW_UNIX_SECS:
+ *((uint32_t *) p) = htonl(emit_time.sec);
+ p += NETFLOW_UNIX_SECS_SIZE;
+ break;
+
+ case NETFLOW_UNIX_NSECS:
+ *((uint32_t *) p) = htonl(emit_time.usec * 1000);
+ p += NETFLOW_UNIX_NSECS_SIZE;
+ break;
+
+ case NETFLOW_FLOW_SEQUENCE:
+ //*((uint32_t *) p) = htonl(emit_sequence);
+ *((uint32_t *) p) = 0;
+ p += NETFLOW_FLOW_SEQUENCE_SIZE;
+ break;
+
+ case NETFLOW_PAD8:
+ /* Unsupported (uint8_t) */
+ case NETFLOW_ENGINE_TYPE:
+ case NETFLOW_ENGINE_ID:
+ case NETFLOW_FLAGS7_1:
+ case NETFLOW_SRC_MASK:
+ case NETFLOW_DST_MASK:
+ *((uint8_t *) p) = 0;
+ p += NETFLOW_PAD8_SIZE;
+ break;
+
+ case NETFLOW_PAD16:
+ /* Unsupported (uint16_t) */
+ case NETFLOW_SRC_AS:
+ case NETFLOW_DST_AS:
+ case NETFLOW_FLAGS7_2:
+ *((uint16_t *) p) = 0;
+ p += NETFLOW_PAD16_SIZE;
+ break;
+
+ case NETFLOW_PAD32:
+ /* Unsupported (uint32_t) */
+ case NETFLOW_IPV4_NEXT_HOP:
+ case NETFLOW_ROUTER_SC:
+ *((uint32_t *) p) = 0;
+ p += NETFLOW_PAD32_SIZE;
+ break;
+
+ default:
+ my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
+ format, i, format[i]);
+ exit(1);
+ }
+ }
+#if ((DEBUG) & DEBUG_F)
+ my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
+#endif
+ return p;
+}
+
+void setuser() {
+ /*
+ Workaround for clone()-based threads
+ Try to change EUID independently of main thread
+ */
+ if (pw) {
+ setgroups(0, NULL);
+ setregid(pw->pw_gid, pw->pw_gid);
+ setreuid(pw->pw_uid, pw->pw_uid);
+ }
+}
+
+void *emit_thread()
+{
+ struct Flow *flow;
+ void *p;
+ struct timeval now;
+ struct timespec timeout;
+ int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
+
+ p = (void *) &emit_packet + netflow->HeaderSize;
+ timeout.tv_nsec = 0;
+
+ setuser();
+
+ for (;;) {
+ pthread_mutex_lock(&emit_mutex);
+ while (!flows_emit) {
+ gettimeofday(&now, 0);
+ timeout.tv_sec = now.tv_sec + emit_timeout;
+ /* Do not wait until emit_packet will filled - it may be too long */
+ if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
+ pthread_mutex_unlock(&emit_mutex);
+ goto sendit;
+ }
+ }
+ flow = flows_emit;
+ flows_emit = flows_emit->next;
+#if ((DEBUG) & DEBUG_I)
+ emit_queue--;
+#endif
+ pthread_mutex_unlock(&emit_mutex);
+
+#ifdef UPTIME_TRICK
+ if (!emit_count) {
+ gettime(&start_time);
+ start_time.sec -= start_time_offset;
+ }
+#endif
+ p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
+ mem_free(flow);
+ emit_count++;
+#ifdef PF2_DEBUG
+ printf("Emit count = %d\n", emit_count);
+ fflush(stdout);
+#endif
+ if (emit_count == netflow->MaxFlows) {
+ sendit:
+ gettime(&emit_time);
+ p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
+ size = netflow->HeaderSize + emit_count * netflow->FlowSize;
+ /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
+ if (size < 1464) size = 1464;
+ peer_rot_cur = 0;
+ for (i = 0; i < npeers; i++) {
+ if (peers[0].type == PEER_FILE) {
+ if (netflow->SeqOffset)
+ *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
+#define MESSAGES
+ ret = write(peers[0].write_fd, emit_packet, size);
+ if (ret < size) {
+#if ((DEBUG) & DEBUG_E) || defined MESSAGES
+ my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
+ i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
+#endif
+#undef MESSAGES
+ }
+#if ((DEBUG) & DEBUG_E)
+ commaneelse {
+ my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+ emit_count, i + 1, peers[i].seq);
+ }
+#endif
+ peers[0].seq += emit_count;
+
+ /* Rate limit */
+ if (emit_rate_bytes) {
+ sent += size;
+ delay = sent / emit_rate_bytes;
+ if (delay) {
+ sent %= emit_rate_bytes;
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = emit_rate_delay * delay;
+ while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+ }
+ }
+ }
+ else
+ if (peers[i].type == PEER_MIRROR) goto sendreal;
+ else
+ if (peers[i].type == PEER_ROTATE)
+ if (peer_rot_cur++ == peer_rot_work) {
+ sendreal:
+ if (netflow->SeqOffset)
+ *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
+ ret = send(peers[i].write_fd, emit_packet, size, 0);
+ if (ret < size) {
+#if ((DEBUG) & DEBUG_E) || defined MESSAGES
+ my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
+ i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
+#endif
+ }
+#if ((DEBUG) & DEBUG_E)
+ commaneelse {
+ my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+ emit_count, i + 1, peers[i].seq);
+ }
+#endif
+ peers[i].seq += emit_count;
+
+ /* Rate limit */
+ if (emit_rate_bytes) {
+ sent += size;
+ delay = sent / emit_rate_bytes;
+ if (delay) {
+ sent %= emit_rate_bytes;
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = emit_rate_delay * delay;
+ while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+ }
+ }
+ }
+ }
+ if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
+ emit_sequence += emit_count;
+ emit_count = 0;
+#if ((DEBUG) & DEBUG_I)
+ emit_pkts++;
+#endif
+ }
+ }
+}
+
+void *unpending_thread()
+{
+ struct timeval now;
+ struct timespec timeout;
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ char logbuf[256];
+#endif
+
+ setuser();
+
+ timeout.tv_nsec = 0;
+ pthread_mutex_lock(&unpending_mutex);
+
+ for (;;) {
+ while (!(pending_tail->flags & FLOW_PENDING)) {
+ gettimeofday(&now, 0);
+ timeout.tv_sec = now.tv_sec + unpending_timeout;
+ pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
+ }
+
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ *logbuf = 0;
+#endif
+ if (put_into(pending_tail, COPY_INTO
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ , logbuf
+#endif
+ ) < 0) {
+#if ((DEBUG) & DEBUG_I)
+ pkts_lost_unpending++;
+#endif
+ }
+
+#if ((DEBUG) & DEBUG_U)
+ my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
+#endif
+
+ pending_tail->flags = 0;
+ pending_tail = pending_tail->next;
+#if ((DEBUG) & DEBUG_I)
+ pkts_pending_done++;
+#endif
+ }
+}
+
+void *scan_thread()
+{
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ char logbuf[256];
+#endif
+ int i;
+ struct Flow *flow, **flowpp;
+ struct Time now;
+ struct timespec timeout;
+
+ setuser();
+
+ timeout.tv_nsec = 0;
+ pthread_mutex_lock(&scan_mutex);
+
+ for (;;) {
+ gettime(&now);
+ timeout.tv_sec = now.sec + scan_interval;
+ pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
+
+ gettime(&now);
+#if ((DEBUG) & DEBUG_S)
+ my_log(LOG_DEBUG, "S: %d", now.sec);
+#endif
+ for (i = 0; i < 1 << HASH_BITS ; i++) {
+ pthread_mutex_lock(&flows_mutex[i]);
+ flow = flows[i];
+ flowpp = &flows[i];
+ while (flow) {
+ if (flow->flags & FLOW_FRAG) {
+ /* Process fragmented flow */
+ if ((now.sec - flow->mtime.sec) > frag_lifetime) {
+ /* Fragmented flow expired - put it into special chain */
+#if ((DEBUG) & DEBUG_I)
+ flows_fragmented--;
+ flows_total--;
+#endif
+ *flowpp = flow->next;
+ flow->id = 0;
+ flow->flags &= ~FLOW_FRAG;
+ flow->next = scan_frag_dreg;
+ scan_frag_dreg = flow;
+ flow = *flowpp;
+ continue;
+ }
+ } else {
+ /* Flow is not frgamented */
+ if ((now.sec - flow->mtime.sec) > inactive_lifetime
+ || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
+ /* Flow expired */
+#if ((DEBUG) & DEBUG_S)
+ my_log(LOG_DEBUG, "S: E %x", flow);
+#endif
+#if ((DEBUG) & DEBUG_I)
+ flows_total--;
+#endif
+ *flowpp = flow->next;
+ pthread_mutex_lock(&emit_mutex);
+ flow->next = flows_emit;
+ flows_emit = flow;
+#if ((DEBUG) & DEBUG_I)
+ emit_queue++;
+#endif
+ pthread_mutex_unlock(&emit_mutex);
+ flow = *flowpp;
+ continue;
+ }
+ }
+ flowpp = &flow->next;
+ flow = flow->next;
+ } /* chain loop */
+ pthread_mutex_unlock(&flows_mutex[i]);
+ } /* hash loop */
+ if (flows_emit) pthread_cond_signal(&emit_cond);
+
+ while (scan_frag_dreg) {
+ flow = scan_frag_dreg;
+ scan_frag_dreg = flow->next;
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ *logbuf = 0;
+#endif
+ put_into(flow, MOVE_INTO
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+ , logbuf
+#endif
+ );
+#if ((DEBUG) & DEBUG_S)
+ my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
+#endif
+ }
+ }
+}
+
+void *cap_thread()
+{
+ struct ulog_packet_msg *ulog_msg;
+ struct ip *nl;
+ void *tl;
+ struct Flow *flow;
+ int len, off_frag, psize;
+#if ((DEBUG) & DEBUG_C)
+ char buf[64];
+ char logbuf[256];
+#endif
+
+ setuser();
+
+ while (!killed) {
+ len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
+ if (len <= 0) {
+ my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
+ continue;
+ }
+ while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
+
+#if ((DEBUG) & DEBUG_C)
+ sprintf(logbuf, "C: %d", ulog_msg->data_len);
+#endif
+
+ nl = (void *) &ulog_msg->payload;
+ psize = ulog_msg->data_len;
+
+ /* Sanity check */
+ if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
+#if ((DEBUG) & DEBUG_C)
+ strcat(logbuf, " U");
+ my_log(LOG_DEBUG, "%s", logbuf);
+#endif
+#if ((DEBUG) & DEBUG_I)
+ pkts_ignored++;
+#endif
+ continue;
+ }
+
+ if (pending_head->flags) {
+#if ((DEBUG) & DEBUG_C) || defined MESSAGES
+ my_log(LOG_ERR,
+# if ((DEBUG) & DEBUG_C)
+ "%s %s %s", logbuf,
+# else
+ "%s %s",
+# endif
+ "pending queue full:", "packet lost");
+#endif
+#if ((DEBUG) & DEBUG_I)
+ pkts_lost_capture++;
+#endif
+ goto done;
+ }
+
+#if ((DEBUG) & DEBUG_I)
+ pkts_total++;
+#endif
+
+ flow = pending_head;
+
+ /* ?FIXME? Add sanity check for ip_len? */
+ flow->size = ntohs(nl->ip_len);
+#if ((DEBUG) & DEBUG_I)
+ size_total += flow->size;
+#endif
+
+ flow->sip = nl->ip_src;
+ flow->dip = nl->ip_dst;
+ flow->iif = snmp_index(ulog_msg->indev_name);
+ flow->oif = snmp_index(ulog_msg->outdev_name);
+ flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
+ flow->proto = nl->ip_p;
+ flow->id = 0;
+ flow->tcp_flags = 0;
+ flow->pkts = 1;
+ flow->sizeF = 0;
+ flow->sizeP = 0;
+ /* Packets captured from OUTPUT table didn't contains valid timestamp */
+ if (ulog_msg->timestamp_sec) {
+ flow->ctime.sec = ulog_msg->timestamp_sec;
+ flow->ctime.usec = ulog_msg->timestamp_usec;
+ } else gettime(&flow->ctime);
+ flow->mtime = flow->ctime;
+
+ off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
+
+ /*
+ Offset (from network layer) to transport layer header/IP data
+ IOW IP header size ;-)
+
+ ?FIXME?
+ Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
+ */
+ off_tl = nl->ip_hl << 2;
+ tl = (void *) nl + off_tl;
+
+ /* THIS packet data size: data_size = total_size - ip_header_size*4 */
+ flow->sizeF = ntohs(nl->ip_len) - off_tl;
+ psize -= off_tl;
+ if ((signed) flow->sizeF < 0) flow->sizeF = 0;
+ if (psize > (signed) flow->sizeF) psize = flow->sizeF;
+
+ if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
+ /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
+#if ((DEBUG) & DEBUG_C)
+ strcat(logbuf, " F");
+#endif
+#if ((DEBUG) & DEBUG_I)
+ pkts_total_fragmented++;
+#endif
+ flow->flags |= FLOW_FRAG;
+ flow->id = nl->ip_id;
+
+ if (!(ntohs(nl->ip_off) & IP_MF)) {
+ /* Packet whith IP_MF contains information about whole datagram size */
+ flow->flags |= FLOW_LASTFRAG;
+ /* size = frag_offset*8 + data_size */
+ flow->sizeP = off_frag + flow->sizeF;
+ }
+ }
+
+#if ((DEBUG) & DEBUG_C)
+ sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
+ strcat(logbuf, buf);
+ sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
+ strcat(logbuf, buf);
+#endif
+
+ /*
+ Fortunately most interesting transport layer information fit
+ into first 8 bytes of IP data field (minimal nonzero size).
+ Thus we don't need actual packet reassembling to build whole
+ transport layer data. We only check the fragment offset for
+ zero value to find packet with this information.
+ */
+ if (!off_frag && psize >= 8) {
+ switch (flow->proto) {
+ case IPPROTO_TCP:
+ case IPPROTO_UDP:
+ flow->sp = ((struct udphdr *)tl)->uh_sport;
+ flow->dp = ((struct udphdr *)tl)->uh_dport;
+ goto tl_known;
+
+#ifdef ICMP_TRICK
+ case IPPROTO_ICMP:
+ flow->sp = htons(((struct icmp *)tl)->icmp_type);
+ flow->dp = htons(((struct icmp *)tl)->icmp_code);
+ goto tl_known;
+#endif
+#ifdef ICMP_TRICK_CISCO
+ case IPPROTO_ICMP:
+ flow->dp = *((int32_t *) tl);
+ goto tl_known;
+#endif
+
+ default:
+ /* Unknown transport layer */
+#if ((DEBUG) & DEBUG_C)
+ strcat(logbuf, " U");
+#endif
+ flow->sp = 0;
+ flow->dp = 0;
+ break;
+
+ tl_known:
+#if ((DEBUG) & DEBUG_C)
+ sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
+ strcat(logbuf, buf);
+#endif
+ flow->flags |= FLOW_TL;
+ }
+ }
+
+ /* Check for tcp flags presence (including CWR and ECE). */
+ if (flow->proto == IPPROTO_TCP
+ && off_frag < 16
+ && psize >= 16 - off_frag) {
+ flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
+#if ((DEBUG) & DEBUG_C)
+ sprintf(buf, " TCP:%x", flow->tcp_flags);
+ strcat(logbuf, buf);
+#endif
+ }
+
+#if ((DEBUG) & DEBUG_C)
+ sprintf(buf, " => %x", (unsigned) flow);
+ strcat(logbuf, buf);
+ my_log(LOG_DEBUG, "%s", logbuf);
+#endif
+
+#if ((DEBUG) & DEBUG_I)
+ pkts_pending++;
+ pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
+ if (pending_queue_trace < pending_queue_trace_candidate)
+ pending_queue_trace = pending_queue_trace_candidate;
+#endif
+
+ /* Flow complete - inform unpending_thread() about it */
+ pending_head->flags |= FLOW_PENDING;
+ pending_head = pending_head->next;
+ done:
+ pthread_cond_signal(&unpending_cond);
+ }
+ }
+ return 0;
+}
+
+int main(int argc, char **argv)
+{
+ char errpbuf[512];
+ char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
+ int c, i, write_fd, memory_limit = 0;
+ struct addrinfo hints, *res;
+ struct sockaddr_in saddr;
+ pthread_attr_t tattr;
+ struct sigaction sigact;
+ static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
+ struct timeval timeout;
+
+ sched_min = sched_get_priority_min(SCHED);
+ sched_max = sched_get_priority_max(SCHED);
+
+ memset(&saddr, 0 , sizeof(saddr));
+ memset(&hints, 0 , sizeof(hints));
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM;
+
+ /* Process command line options */
+
+ opterr = 0;
+ while ((c = my_getopt(argc, argv, parms)) != -1) {
+ switch (c) {
+ case '?':
+ usage();
+
+ case 'h':
+ usage();
+ }
+ }
+
+ if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
+ if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
+ if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
+ if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
+ if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
+ if (parms[nflag].count) {
+ switch (atoi(parms[nflag].arg)) {
+ case 1:
+ netflow = &NetFlow1;
+ break;
+
+ case 5:
+ break;
+
+ case 7:
+ netflow = &NetFlow7;
+ break;
+
+ default:
+ fprintf(stderr, "Illegal %s\n", "NetFlow version");
+ exit(1);
+ }
+ }
+ if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
+ if (parms[lflag].count) {
+ if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
+ *log_suffix++ = 0;
+ if (*log_suffix) {
+ sprintf(errpbuf, "[%s]", log_suffix);
+ strcat(ident, errpbuf);
+ }
+ }
+ if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
+ if (log_suffix) *--log_suffix = ':';
+ }
+ if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
+ err_malloc:
+ fprintf(stderr, "malloc(): %s\n", strerror(errno));
+ exit(1);
+ }
+ sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
+ if (parms[qflag].count) {
+ pending_queue_length = atoi(parms[qflag].arg);
+ if (pending_queue_length < 1) {
+ fprintf(stderr, "Illegal %s\n", "pending queue length");
+ exit(1);
+ }
+ }
+ if (parms[rflag].count) {
+ schedp.sched_priority = atoi(parms[rflag].arg);
+ if (schedp.sched_priority
+ && (schedp.sched_priority < sched_min
+ || schedp.sched_priority > sched_max)) {
+ fprintf(stderr, "Illegal %s\n", "realtime priority");
+ exit(1);
+ }
+ }
+ if (parms[Bflag].count) {
+ sockbufsize = atoi(parms[Bflag].arg) << 10;
+ }
+ if (parms[bflag].count) {
+ bulk_quantity = atoi(parms[bflag].arg);
+ if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
+ fprintf(stderr, "Illegal %s\n", "bulk size");
+ exit(1);
+ }
+ }
+ if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
+ if (parms[Xflag].count) {
+ for(i = 0; parms[Xflag].arg[i]; i++)
+ if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
+ if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
+ goto err_malloc;
+ rule = strtok(parms[Xflag].arg, ":");
+ for (i = 0; rule; i++) {
+ snmp_rules[i].len = strlen(rule);
+ if (snmp_rules[i].len > IFNAMSIZ) {
+ fprintf(stderr, "Illegal %s\n", "interface basename");
+ exit(1);
+ }
+ strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
+ if (!*(rule - 1)) *(rule - 1) = ',';
+ rule = strtok(NULL, ",");
+ if (!rule) {
+ fprintf(stderr, "Illegal %s\n", "SNMP rule");
+ exit(1);
+ }
+ snmp_rules[i].base = atoi(rule);
+ *(rule - 1) = ':';
+ rule = strtok(NULL, ":");
+ }
+ nsnmp_rules = i;
+ }
+ if (parms[tflag].count)
+ sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
+ if (parms[aflag].count) {
+ if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
+ bad_lhost:
+ fprintf(stderr, "Illegal %s\n", "source address");
+ exit(1);
+ } else {
+ saddr = *((struct sockaddr_in *) res->ai_addr);
+ freeaddrinfo(res);
+ }
+ }
+ if (parms[uflag].count)
+ if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
+ fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
+ exit(1);
+ }
+
+
+ /* Process collectors parameters. Brrrr... :-[ */
+
+ npeers = argc - optind;
+ if (npeers > 1) {
+ /* Send to remote Netflow collector */
+ if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
+ for (i = optind, npeers = 0; i < argc; i++, npeers++) {
+ dhost = argv[i];
+ if (!(dport = strchr(dhost, ':'))) goto bad_collector;
+ *dport++ = 0;
+ if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
+ fprintf(stderr, "socket(): %s\n", strerror(errno));
+ exit(1);
+ }
+ peers[npeers].write_fd = write_fd;
+ peers[npeers].type = PEER_MIRROR;
+ peers[npeers].laddr = saddr;
+ peers[npeers].seq = 0;
+ if ((lhost = strchr(dport, '/'))) {
+ *lhost++ = 0;
+ if ((type = strchr(lhost, '/'))) {
+ *type++ = 0;
+ switch (*type) {
+ case 0:
+ case 'm':
+ break;
+
+ case 'r':
+ peers[npeers].type = PEER_ROTATE;
+ npeers_rot++;
+ break;
+
+ default:
+ goto bad_collector;
+ }
+ }
+ if (*lhost) {
+ if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
+ peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
+ freeaddrinfo(res);
+ }
+ }
+ if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
+ sizeof(struct sockaddr_in))) {
+ fprintf(stderr, "bind(): %s\n", strerror(errno));
+ exit(1);
+ }
+ if (getaddrinfo(dhost, dport, &hints, &res)) {
+bad_collector:
+ fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
+ exit(1);
+ }
+ peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
+ freeaddrinfo(res);
+ if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
+ sizeof(struct sockaddr_in))) {
+ fprintf(stderr, "connect(): %s\n", strerror(errno));
+ exit(1);
+ }
+
+ /* Restore command line */
+ if (type) *--type = '/';
+ if (lhost) *--lhost = '/';
+ *--dport = ':';
+ }
+ }
+ else if (parms[fflag].count) {
+ // log into a file
+ char *fname;
+ if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
+ fname = parms[fflag].arg;
+ if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
+ fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
+ exit(1);
+ }
+ peers[0].write_fd = write_fd;
+ peers[0].type = PEER_FILE;
+ peers[0].seq = 0;
+ npeers++;
+ }
+ else
+ usage();
+
+
+ if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
+ ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
+ if (!ulog_handle) {
+ fprintf(stderr, "libipulog initialization error: %s",
+ ipulog_strerror(ipulog_errno));
+ exit(1);
+ }
+ if (sockbufsize)
+ if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
+ &sockbufsize, sizeof(sockbufsize)) < 0)
+ fprintf(stderr, "setsockopt(): %s", strerror(errno));
+
+ /* Daemonize (if log destination stdout-free) */
+
+ my_log_open(ident, verbosity, log_dest);
+ if (!(log_dest & 2)) {
+ switch (fork()) {
+ case -1:
+ fprintf(stderr, "fork(): %s", strerror(errno));
+ exit(1);
+
+ case 0:
+ setsid();
+ freopen("/dev/null", "r", stdin);
+ freopen("/dev/null", "w", stdout);
+ freopen("/dev/null", "w", stderr);
+ break;
+
+ default:
+ exit(0);
+ }
+ } else {
+ setvbuf(stdout, (char *)0, _IONBF, 0);
+ setvbuf(stderr, (char *)0, _IONBF, 0);
+ }
+
+ pid = getpid();
+ sprintf(errpbuf, "[%ld]", (long) pid);
+ strcat(ident, errpbuf);
+
+ /* Initialization */
+
+ hash_init(); /* Actually for crc16 only */
+ mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
+ for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
+
+#ifdef UPTIME_TRICK
+ /* Hope 12 days is enough :-/ */
+ start_time_offset = 1 << 20;
+
+ /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
+#endif
+ gettime(&start_time);
+
+ /*
+ Build static pending queue as circular buffer.
+ */
+ if (!(pending_head = mem_alloc())) goto err_mem_alloc;
+ pending_tail = pending_head;
+ for (i = pending_queue_length - 1; i--;) {
+ if (!(pending_tail->next = mem_alloc())) {
+ err_mem_alloc:
+ my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
+ exit(1);
+ }
+ pending_tail = pending_tail->next;
+ }
+ pending_tail->next = pending_head;
+ pending_tail = pending_head;
+
+ sigemptyset(&sig_mask);
+ sigact.sa_handler = &sighandler;
+ sigact.sa_mask = sig_mask;
+ sigact.sa_flags = 0;
+ sigaddset(&sig_mask, SIGTERM);
+ sigaction(SIGTERM, &sigact, 0);
+#if ((DEBUG) & DEBUG_I)
+ sigaddset(&sig_mask, SIGUSR1);
+ sigaction(SIGUSR1, &sigact, 0);
+#endif
+ if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
+ my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
+ exit(1);
+ }
+
+ my_log(LOG_INFO, "Starting %s...", VERSION);
+
+ if (parms[cflag].count) {
+ if (chdir(parms[cflag].arg) || chroot(".")) {
+ my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
+ exit(1);
+ }
+ }
+
+ schedp.sched_priority = schedp.sched_priority - THREADS + 2;
+ pthread_attr_init(&tattr);
+ for (i = 0; i < THREADS - 1; i++) {
+ if (schedp.sched_priority > 0) {
+ if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
+ (pthread_attr_setschedparam(&tattr, &schedp))) {
+ my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
+ exit(1);
+ }
+ }
+ if (pthread_create(&thid, &tattr, threads[i], 0)) {
+ my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
+ exit(1);
+ }
+ pthread_detach(thid);
+ schedp.sched_priority++;
+ }
+
+ if (pw) {
+ if (setgroups(0, NULL)) {
+ my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
+ exit(1);
+ }
+ if (setregid(pw->pw_gid, pw->pw_gid)) {
+ my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
+ exit(1);
+ }
+ if (setreuid(pw->pw_uid, pw->pw_uid)) {
+ my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
+ exit(1);
+ }
+ }
+
+ if (!(pidfile = fopen(pidfilepath, "w")))
+ my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
+ else {
+ fprintf(pidfile, "%ld\n", (long) pid);
+ fclose(pidfile);
+ }
+
+ my_log(LOG_INFO, "pid: %d", pid);
+ my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
+ "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
+ ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
+ netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
+ memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
+ emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
+ parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
+ for (i = 0; i < nsnmp_rules; i++) {
+ my_log(LOG_INFO, "SNMP rule #%d %s:%d",
+ i + 1, snmp_rules[i].basename, snmp_rules[i].base);
+ }
+ for (i = 0; i < npeers; i++) {
+ switch (peers[i].type) {
+ case PEER_MIRROR:
+ c = 'm';
+ break;
+ case PEER_ROTATE:
+ c = 'r';
+ break;
+ }
+ snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
+ my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
+ inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
+ }
+
+ pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
+
+ timeout.tv_usec = 0;
+ while (!killed
+ || (total_elements - free_elements - pending_queue_length)
+ || emit_count
+ || pending_tail->flags) {
+
+ if (!sigs) {
+ timeout.tv_sec = scan_interval;
+ select(0, 0, 0, 0, &timeout);
+ }
+
+ if (sigs & SIGTERM_MASK && !killed) {
+ sigs &= ~SIGTERM_MASK;
+ my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
+ scan_interval = 1;
+ frag_lifetime = -1;
+ active_lifetime = -1;
+ inactive_lifetime = -1;
+ emit_timeout = 1;
+ unpending_timeout = 1;
+ killed = 1;
+ pthread_cond_signal(&scan_cond);
+ pthread_cond_signal(&unpending_cond);
+ }
+
+#if ((DEBUG) & DEBUG_I)
+ if (sigs & SIGUSR1_MASK) {
+ sigs &= ~SIGUSR1_MASK;
+ info_debug();
+ }
+#endif
+ }
+ remove(pidfilepath);
+#if ((DEBUG) & DEBUG_I)
+ info_debug();
+#endif
+ my_log(LOG_INFO, "Done.");
+#ifdef WALL
+ return 0;
+#endif
+}