+++ /dev/null
-/*
- Copyright (C) Slava Astashonok <sla@0n.ru>
-
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License.
-
- $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
-*/
-
-#include <common.h>
-
-/* stdout, stderr, freopen() */
-#include <stdio.h>
-
-/* atoi(), exit() */
-#include <stdlib.h>
-
-/* getopt(), alarm(), getpid(), sedsid(), chdir() */
-#include <unistd.h>
-
-/* strerror() */
-#include <string.h>
-
-/* sig*() */
-#include <signal.h>
-
-#include <libipulog/libipulog.h>
-struct ipulog_handle {
- int fd;
- u_int8_t blocking;
- struct sockaddr_nl local;
- struct sockaddr_nl peer;
- struct nlmsghdr* last_nlhdr;
-};
-
-/* inet_*() (Linux, FreeBSD, Solaris), getpid() */
-#include <sys/types.h>
-#include <netinet/in_systm.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netinet/ip.h>
-#include <netinet/tcp.h>
-#include <netinet/udp.h>
-#include <netinet/ip_icmp.h>
-#include <net/if.h>
-
-#include <sys/param.h>
-#include <pwd.h>
-#ifdef OS_LINUX
-#include <grp.h>
-#endif
-
-/* pthread_*() */
-#include <pthread.h>
-
-/* errno */
-#include <errno.h>
-
-/* getaddrinfo() */
-#include <netdb.h>
-
-/* nanosleep() */
-#include <time.h>
-
-/* gettimeofday() */
-#include <sys/time.h>
-
-/* scheduling */
-#include <sched.h>
-
-/* select() (POSIX)*/
-#include <sys/select.h>
-
-/* open() */
-#include <sys/stat.h>
-#include <fcntl.h>
-
-#include <fprobe-ulog.h>
-#include <my_log.h>
-#include <my_getopt.h>
-#include <netflow.h>
-#include <hash.h>
-#include <mem.h>
-
-enum {
- aflag,
- Bflag,
- bflag,
- cflag,
- dflag,
- eflag,
- fflag,
- gflag,
- hflag,
- lflag,
- mflag,
- Mflag,
- nflag,
- qflag,
- rflag,
- sflag,
- tflag,
- Uflag,
- uflag,
- vflag,
- Xflag,
-};
-
-static struct getopt_parms parms[] = {
- {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'h', 0, 0, 0},
- {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'M', 0, 0, 0},
- {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
- {0, 0, 0, 0}
-};
-
-extern char *optarg;
-extern int optind, opterr, optopt;
-extern int errno;
-
-extern struct NetFlow NetFlow1;
-extern struct NetFlow NetFlow5;
-extern struct NetFlow NetFlow7;
-
-#define mark_is_tos parms[Mflag].count
-static unsigned scan_interval = 5;
-static int frag_lifetime = 30;
-static int inactive_lifetime = 60;
-static int active_lifetime = 300;
-static int sockbufsize;
-#define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
-#if (MEM_BITS == 0) || (MEM_BITS == 16)
-#define BULK_QUANTITY 10000
-#else
-#define BULK_QUANTITY 200
-#endif
-static unsigned bulk_quantity = BULK_QUANTITY;
-static unsigned pending_queue_length = 100;
-static struct NetFlow *netflow = &NetFlow5;
-static unsigned verbosity = 6;
-static unsigned log_dest = MY_LOG_SYSLOG;
-static struct Time start_time;
-static long start_time_offset;
-static int off_tl;
-/* From mem.c */
-extern unsigned total_elements;
-extern unsigned free_elements;
-extern unsigned total_memory;
-#if ((DEBUG) & DEBUG_I)
-static unsigned emit_pkts, emit_queue;
-static uint64_t size_total;
-static unsigned pkts_total, pkts_total_fragmented;
-static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
-static unsigned pkts_pending, pkts_pending_done;
-static unsigned pending_queue_trace, pending_queue_trace_candidate;
-static unsigned flows_total, flows_fragmented;
-#endif
-static unsigned emit_count;
-static uint32_t emit_sequence;
-static unsigned emit_rate_bytes, emit_rate_delay;
-static struct Time emit_time;
-static uint8_t emit_packet[NETFLOW_MAX_PACKET];
-static pthread_t thid;
-static sigset_t sig_mask;
-static struct sched_param schedp;
-static int sched_min, sched_max;
-static int npeers, npeers_rot;
-static struct peer *peers;
-static int sigs;
-
-static struct Flow *flows[1 << HASH_BITS];
-static pthread_mutex_t flows_mutex[1 << HASH_BITS];
-
-static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
-
-static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
-static struct Flow *pending_head, *pending_tail;
-static struct Flow *scan_frag_dreg;
-
-static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
-static struct Flow *flows_emit;
-
-static char ident[256] = "fprobe-ulog";
-static FILE *pidfile;
-static char *pidfilepath;
-static pid_t pid;
-static int killed;
-static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
-static struct ipulog_handle *ulog_handle;
-static uint32_t ulog_gmask = 1;
-static char *cap_buf;
-static int nsnmp_rules;
-static struct snmp_rule *snmp_rules;
-static struct passwd *pw = 0;
-
-void usage()
-{
- fprintf(stdout,
- "fprobe-ulog: a NetFlow probe. Version %s\n"
- "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
- "\n"
- "-h\t\tDisplay this help\n"
- "-U <mask>\tULOG group bitwise mask [1]\n"
- "-s <seconds>\tHow often scan for expired flows [5]\n"
- "-g <seconds>\tFragmented flow lifetime [30]\n"
- "-d <seconds>\tIdle flow lifetime (inactive timer) [60]\n"
- "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
- "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
- "-a <address>\tUse <address> as source for NetFlow flow\n"
- "-X <rules>\tInterface name to SNMP-index conversion rules\n"
- "-M\t\tUse netfilter mark value as ToS flag\n"
- "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
- "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
- "-q <flows>\tPending queue length [100]\n"
- "-B <kilobytes>\tKernel capture buffer size [0]\n"
- "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
- "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
- "-c <directory>\tDirectory to chroot to\n"
- "-u <user>\tUser to run as\n"
- "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
- "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
- "-y <remote:port>\tAddress of the NetFlow collector\n",
- "-f <writable file>\tFile to write data into\n"
- VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
- exit(0);
-}
-
-#if ((DEBUG) & DEBUG_I)
-void info_debug()
-{
- my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
- pkts_total, pkts_total_fragmented, size_total,
- pkts_pending - pkts_pending_done, pending_queue_trace);
- my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
- pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
- my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
- flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
- my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
- total_elements, free_elements, total_memory);
-}
-#endif
-
-void sighandler(int sig)
-{
- switch (sig) {
- case SIGTERM:
- sigs |= SIGTERM_MASK;
- break;
-#if ((DEBUG) & DEBUG_I)
- case SIGUSR1:
- sigs |= SIGUSR1_MASK;
- break;
-#endif
- }
-}
-
-void gettime(struct Time *now)
-{
- struct timeval t;
-
- gettimeofday(&t, 0);
- now->sec = t.tv_sec;
- now->usec = t.tv_usec;
-}
-
-inline time_t cmpmtime(struct Time *t1, struct Time *t2)
-{
- return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
-}
-
-/* Uptime in miliseconds */
-uint32_t getuptime(struct Time *t)
-{
- /* Maximum uptime is about 49/2 days */
- return cmpmtime(t, &start_time);
-}
-
-hash_t hash_flow(struct Flow *flow)
-{
- if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
- else return hash(flow, sizeof(struct Flow_TL));
-}
-
-uint16_t snmp_index(char *name) {
- uint32_t i;
-
- if (!*name) return 0;
-
- for (i = 0; (int) i < nsnmp_rules; i++) {
- if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
- return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
- }
-
- if ((i = if_nametoindex(name))) return i;
-
- return -1;
-}
-
-inline void copy_flow(struct Flow *src, struct Flow *dst)
-{
- dst->iif = src->iif;
- dst->oif = src->oif;
- dst->sip = src->sip;
- dst->dip = src->dip;
- dst->tos = src->tos;
- dst->proto = src->proto;
- dst->tcp_flags = src->tcp_flags;
- dst->id = src->id;
- dst->sp = src->sp;
- dst->dp = src->dp;
- dst->pkts = src->pkts;
- dst->size = src->size;
- dst->sizeF = src->sizeF;
- dst->sizeP = src->sizeP;
- dst->ctime = src->ctime;
- dst->mtime = src->mtime;
- dst->flags = src->flags;
-}
-
-struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
-{
- struct Flow **flowpp;
-
-#ifdef WALL
- flowpp = 0;
-#endif
-
- if (prev) flowpp = *prev;
-
- while (where) {
- if (where->sip.s_addr == what->sip.s_addr
- && where->dip.s_addr == what->dip.s_addr
- && where->proto == what->proto) {
- switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
- case 0:
- /* Both unfragmented */
- if ((what->sp == where->sp)
- && (what->dp == where->dp)) goto done;
- break;
- case 2:
- /* Both fragmented */
- if (where->id == what->id) goto done;
- break;
- }
- }
- flowpp = &where->next;
- where = where->next;
- }
-done:
- if (prev) *prev = flowpp;
- return where;
-}
-
-int put_into(struct Flow *flow, int flag
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- , char *logbuf
-#endif
-)
-{
- int ret = 0;
- hash_t h;
- struct Flow *flown, **flowpp;
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- char buf[64];
-#endif
-
- h = hash_flow(flow);
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- sprintf(buf, " %x H:%04x", (unsigned) flow, h);
- strcat(logbuf, buf);
-#endif
- pthread_mutex_lock(&flows_mutex[h]);
- flowpp = &flows[h];
- if (!(flown = find(flows[h], flow, &flowpp))) {
- /* No suitable flow found - add */
- if (flag == COPY_INTO) {
- if ((flown = mem_alloc())) {
- copy_flow(flow, flown);
- flow = flown;
- } else {
-#if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
- my_log(LOG_ERR, "%s %s. %s",
- "mem_alloc():", strerror(errno), "packet lost");
-#endif
- return -1;
- }
- }
- flow->next = flows[h];
- flows[h] = flow;
-#if ((DEBUG) & DEBUG_I)
- flows_total++;
- if (flow->flags & FLOW_FRAG) flows_fragmented++;
-#endif
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- if (flown) {
- sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
- strcat(logbuf, buf);
- }
-#endif
- } else {
- /* Found suitable flow - update */
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- sprintf(buf, " +> %x", (unsigned) flown);
- strcat(logbuf, buf);
-#endif
- if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
- flown->mtime = flow->mtime;
- if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
- flown->ctime = flow->ctime;
- flown->tcp_flags |= flow->tcp_flags;
- flown->size += flow->size;
- flown->pkts += flow->pkts;
- if (flow->flags & FLOW_FRAG) {
- /* Fragmented flow require some additional work */
- if (flow->flags & FLOW_TL) {
- /*
- ?FIXME?
- Several packets with FLOW_TL (attack)
- */
- flown->sp = flow->sp;
- flown->dp = flow->dp;
- }
- if (flow->flags & FLOW_LASTFRAG) {
- /*
- ?FIXME?
- Several packets with FLOW_LASTFRAG (attack)
- */
- flown->sizeP = flow->sizeP;
- }
- flown->flags |= flow->flags;
- flown->sizeF += flow->sizeF;
- if ((flown->flags & FLOW_LASTFRAG)
- && (flown->sizeF >= flown->sizeP)) {
- /* All fragments received - flow reassembled */
- *flowpp = flown->next;
- pthread_mutex_unlock(&flows_mutex[h]);
-#if ((DEBUG) & DEBUG_I)
- flows_total--;
- flows_fragmented--;
-#endif
- flown->id = 0;
- flown->flags &= ~FLOW_FRAG;
-#if ((DEBUG) & (DEBUG_U | DEBUG_S))
- strcat(logbuf," R");
-#endif
- ret = put_into(flown, MOVE_INTO
-#if ((DEBUG) & (DEBUG_U | DEBUG_S))
- , logbuf
-#endif
- );
- }
- }
- if (flag == MOVE_INTO) mem_free(flow);
- }
- pthread_mutex_unlock(&flows_mutex[h]);
- return ret;
-}
-
-void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
-{
- int i;
-
- for (i = 0; i < fields; i++) {
-#if ((DEBUG) & DEBUG_F)
- my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
-#endif
- switch (format[i]) {
- case NETFLOW_IPV4_SRC_ADDR:
- ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
- p += NETFLOW_IPV4_SRC_ADDR_SIZE;
- break;
-
- case NETFLOW_IPV4_DST_ADDR:
- ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
- p += NETFLOW_IPV4_DST_ADDR_SIZE;
- break;
-
- case NETFLOW_INPUT_SNMP:
- *((uint16_t *) p) = htons(flow->iif);
- p += NETFLOW_INPUT_SNMP_SIZE;
- break;
-
- case NETFLOW_OUTPUT_SNMP:
- *((uint16_t *) p) = htons(flow->oif);
- p += NETFLOW_OUTPUT_SNMP_SIZE;
- break;
-
- case NETFLOW_PKTS_32:
- *((uint32_t *) p) = htonl(flow->pkts);
- p += NETFLOW_PKTS_32_SIZE;
- break;
-
- case NETFLOW_BYTES_32:
- *((uint32_t *) p) = htonl(flow->size);
- p += NETFLOW_BYTES_32_SIZE;
- break;
-
- case NETFLOW_FIRST_SWITCHED:
- *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
- p += NETFLOW_FIRST_SWITCHED_SIZE;
- break;
-
- case NETFLOW_LAST_SWITCHED:
- *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
- p += NETFLOW_LAST_SWITCHED_SIZE;
- break;
-
- case NETFLOW_L4_SRC_PORT:
- *((uint16_t *) p) = flow->sp;
- p += NETFLOW_L4_SRC_PORT_SIZE;
- break;
-
- case NETFLOW_L4_DST_PORT:
- *((uint16_t *) p) = flow->dp;
- p += NETFLOW_L4_DST_PORT_SIZE;
- break;
-
- case NETFLOW_PROT:
- *((uint8_t *) p) = flow->proto;
- p += NETFLOW_PROT_SIZE;
- break;
-
- case NETFLOW_SRC_TOS:
- *((uint8_t *) p) = flow->tos;
- p += NETFLOW_SRC_TOS_SIZE;
- break;
-
- case NETFLOW_TCP_FLAGS:
- *((uint8_t *) p) = flow->tcp_flags;
- p += NETFLOW_TCP_FLAGS_SIZE;
- break;
-
- case NETFLOW_VERSION:
- *((uint16_t *) p) = htons(netflow->Version);
- p += NETFLOW_VERSION_SIZE;
- break;
-
- case NETFLOW_COUNT:
- *((uint16_t *) p) = htons(emit_count);
- p += NETFLOW_COUNT_SIZE;
- break;
-
- case NETFLOW_UPTIME:
- *((uint32_t *) p) = htonl(getuptime(&emit_time));
- p += NETFLOW_UPTIME_SIZE;
- break;
-
- case NETFLOW_UNIX_SECS:
- *((uint32_t *) p) = htonl(emit_time.sec);
- p += NETFLOW_UNIX_SECS_SIZE;
- break;
-
- case NETFLOW_UNIX_NSECS:
- *((uint32_t *) p) = htonl(emit_time.usec * 1000);
- p += NETFLOW_UNIX_NSECS_SIZE;
- break;
-
- case NETFLOW_FLOW_SEQUENCE:
- //*((uint32_t *) p) = htonl(emit_sequence);
- *((uint32_t *) p) = 0;
- p += NETFLOW_FLOW_SEQUENCE_SIZE;
- break;
-
- case NETFLOW_PAD8:
- /* Unsupported (uint8_t) */
- case NETFLOW_ENGINE_TYPE:
- case NETFLOW_ENGINE_ID:
- case NETFLOW_FLAGS7_1:
- case NETFLOW_SRC_MASK:
- case NETFLOW_DST_MASK:
- *((uint8_t *) p) = 0;
- p += NETFLOW_PAD8_SIZE;
- break;
-
- case NETFLOW_PAD16:
- /* Unsupported (uint16_t) */
- case NETFLOW_SRC_AS:
- case NETFLOW_DST_AS:
- case NETFLOW_FLAGS7_2:
- *((uint16_t *) p) = 0;
- p += NETFLOW_PAD16_SIZE;
- break;
-
- case NETFLOW_PAD32:
- /* Unsupported (uint32_t) */
- case NETFLOW_IPV4_NEXT_HOP:
- case NETFLOW_ROUTER_SC:
- *((uint32_t *) p) = 0;
- p += NETFLOW_PAD32_SIZE;
- break;
-
- default:
- my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
- format, i, format[i]);
- exit(1);
- }
- }
-#if ((DEBUG) & DEBUG_F)
- my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
-#endif
- return p;
-}
-
-void setuser() {
- /*
- Workaround for clone()-based threads
- Try to change EUID independently of main thread
- */
- if (pw) {
- setgroups(0, NULL);
- setregid(pw->pw_gid, pw->pw_gid);
- setreuid(pw->pw_uid, pw->pw_uid);
- }
-}
-
-void *emit_thread()
-{
- struct Flow *flow;
- void *p;
- struct timeval now;
- struct timespec timeout;
- int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
-
- p = (void *) &emit_packet + netflow->HeaderSize;
- timeout.tv_nsec = 0;
-
- setuser();
-
- for (;;) {
- pthread_mutex_lock(&emit_mutex);
- while (!flows_emit) {
- gettimeofday(&now, 0);
- timeout.tv_sec = now.tv_sec + emit_timeout;
- /* Do not wait until emit_packet will filled - it may be too long */
- if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
- pthread_mutex_unlock(&emit_mutex);
- goto sendit;
- }
- }
- flow = flows_emit;
- flows_emit = flows_emit->next;
-#if ((DEBUG) & DEBUG_I)
- emit_queue--;
-#endif
- pthread_mutex_unlock(&emit_mutex);
-
-#ifdef UPTIME_TRICK
- if (!emit_count) {
- gettime(&start_time);
- start_time.sec -= start_time_offset;
- }
-#endif
- p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
- mem_free(flow);
- emit_count++;
- if (emit_count == netflow->MaxFlows) {
- sendit:
- gettime(&emit_time);
- p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
- size = netflow->HeaderSize + emit_count * netflow->FlowSize;
- peer_rot_cur = 0;
- for (i = 0; i < npeers; i++) {
- if (peers[i].type == PEER_FILE) {
- printf( "Here\n");
- if (netflow->SeqOffset)
- *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
-#define MESSAGES
- ret = write(peers[i].write_fd, emit_packet, size);
- if (ret < size) {
-#if ((DEBUG) & DEBUG_E) || defined MESSAGES
- my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
- i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
-#endif
-#undef MESSAGES
- }
-#if ((DEBUG) & DEBUG_E)
- commaneelse {
- my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
- emit_count, i + 1, peers[i].seq);
- }
-#endif
- peers[i].seq += emit_count;
-
- /* Rate limit */
- if (emit_rate_bytes) {
- sent += size;
- delay = sent / emit_rate_bytes;
- if (delay) {
- sent %= emit_rate_bytes;
- timeout.tv_sec = 0;
- timeout.tv_nsec = emit_rate_delay * delay;
- while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
- }
- }
- }
- else
- if (peers[i].type == PEER_MIRROR) goto sendreal;
- else
- if (peers[i].type == PEER_ROTATE)
- if (peer_rot_cur++ == peer_rot_work) {
- sendreal:
- if (netflow->SeqOffset)
- *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
- ret = send(peers[i].write_fd, emit_packet, size, 0);
- if (ret < size) {
-#if ((DEBUG) & DEBUG_E) || defined MESSAGES
- my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
- i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
-#endif
- }
-#if ((DEBUG) & DEBUG_E)
- commaneelse {
- my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
- emit_count, i + 1, peers[i].seq);
- }
-#endif
- peers[i].seq += emit_count;
-
- /* Rate limit */
- if (emit_rate_bytes) {
- sent += size;
- delay = sent / emit_rate_bytes;
- if (delay) {
- sent %= emit_rate_bytes;
- timeout.tv_sec = 0;
- timeout.tv_nsec = emit_rate_delay * delay;
- while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
- }
- }
- }
- }
- if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
- emit_sequence += emit_count;
- emit_count = 0;
-#if ((DEBUG) & DEBUG_I)
- emit_pkts++;
-#endif
- }
- }
-}
-
-void *unpending_thread()
-{
- struct timeval now;
- struct timespec timeout;
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- char logbuf[256];
-#endif
-
- setuser();
-
- timeout.tv_nsec = 0;
- pthread_mutex_lock(&unpending_mutex);
-
- for (;;) {
- while (!(pending_tail->flags & FLOW_PENDING)) {
- gettimeofday(&now, 0);
- timeout.tv_sec = now.tv_sec + unpending_timeout;
- pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
- }
-
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- *logbuf = 0;
-#endif
- if (put_into(pending_tail, COPY_INTO
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- , logbuf
-#endif
- ) < 0) {
-#if ((DEBUG) & DEBUG_I)
- pkts_lost_unpending++;
-#endif
- }
-
-#if ((DEBUG) & DEBUG_U)
- my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
-#endif
-
- pending_tail->flags = 0;
- pending_tail = pending_tail->next;
-#if ((DEBUG) & DEBUG_I)
- pkts_pending_done++;
-#endif
- }
-}
-
-void *scan_thread()
-{
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- char logbuf[256];
-#endif
- int i;
- struct Flow *flow, **flowpp;
- struct Time now;
- struct timespec timeout;
-
- setuser();
-
- timeout.tv_nsec = 0;
- pthread_mutex_lock(&scan_mutex);
-
- for (;;) {
- gettime(&now);
- timeout.tv_sec = now.sec + scan_interval;
- pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
-
- gettime(&now);
-#if ((DEBUG) & DEBUG_S)
- my_log(LOG_DEBUG, "S: %d", now.sec);
-#endif
- for (i = 0; i < 1 << HASH_BITS ; i++) {
- pthread_mutex_lock(&flows_mutex[i]);
- flow = flows[i];
- flowpp = &flows[i];
- while (flow) {
- if (flow->flags & FLOW_FRAG) {
- /* Process fragmented flow */
- if ((now.sec - flow->mtime.sec) > frag_lifetime) {
- /* Fragmented flow expired - put it into special chain */
-#if ((DEBUG) & DEBUG_I)
- flows_fragmented--;
- flows_total--;
-#endif
- *flowpp = flow->next;
- flow->id = 0;
- flow->flags &= ~FLOW_FRAG;
- flow->next = scan_frag_dreg;
- scan_frag_dreg = flow;
- flow = *flowpp;
- continue;
- }
- } else {
- /* Flow is not frgamented */
- if ((now.sec - flow->mtime.sec) > inactive_lifetime
- || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
- /* Flow expired */
-#if ((DEBUG) & DEBUG_S)
- my_log(LOG_DEBUG, "S: E %x", flow);
-#endif
-#if ((DEBUG) & DEBUG_I)
- flows_total--;
-#endif
- *flowpp = flow->next;
- pthread_mutex_lock(&emit_mutex);
- flow->next = flows_emit;
- flows_emit = flow;
-#if ((DEBUG) & DEBUG_I)
- emit_queue++;
-#endif
- pthread_mutex_unlock(&emit_mutex);
- flow = *flowpp;
- continue;
- }
- }
- flowpp = &flow->next;
- flow = flow->next;
- } /* chain loop */
- pthread_mutex_unlock(&flows_mutex[i]);
- } /* hash loop */
- if (flows_emit) pthread_cond_signal(&emit_cond);
-
- while (scan_frag_dreg) {
- flow = scan_frag_dreg;
- scan_frag_dreg = flow->next;
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- *logbuf = 0;
-#endif
- put_into(flow, MOVE_INTO
-#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- , logbuf
-#endif
- );
-#if ((DEBUG) & DEBUG_S)
- my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
-#endif
- }
- }
-}
-
-void *cap_thread()
-{
- struct ulog_packet_msg *ulog_msg;
- struct ip *nl;
- void *tl;
- struct Flow *flow;
- int len, off_frag, psize;
-#if ((DEBUG) & DEBUG_C)
- char buf[64];
- char logbuf[256];
-#endif
-
- setuser();
-
- while (!killed) {
- len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
- if (len <= 0) {
- my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
- continue;
- }
- while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
-
-#if ((DEBUG) & DEBUG_C)
- sprintf(logbuf, "C: %d", ulog_msg->data_len);
-#endif
-
- nl = (void *) &ulog_msg->payload;
- psize = ulog_msg->data_len;
-
- /* Sanity check */
- if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
-#if ((DEBUG) & DEBUG_C)
- strcat(logbuf, " U");
- my_log(LOG_DEBUG, "%s", logbuf);
-#endif
-#if ((DEBUG) & DEBUG_I)
- pkts_ignored++;
-#endif
- continue;
- }
-
- if (pending_head->flags) {
-#if ((DEBUG) & DEBUG_C) || defined MESSAGES
- my_log(LOG_ERR,
-# if ((DEBUG) & DEBUG_C)
- "%s %s %s", logbuf,
-# else
- "%s %s",
-# endif
- "pending queue full:", "packet lost");
-#endif
-#if ((DEBUG) & DEBUG_I)
- pkts_lost_capture++;
-#endif
- goto done;
- }
-
-#if ((DEBUG) & DEBUG_I)
- pkts_total++;
-#endif
-
- flow = pending_head;
-
- /* ?FIXME? Add sanity check for ip_len? */
- flow->size = ntohs(nl->ip_len);
-#if ((DEBUG) & DEBUG_I)
- size_total += flow->size;
-#endif
-
- flow->sip = nl->ip_src;
- flow->dip = nl->ip_dst;
- flow->iif = snmp_index(ulog_msg->indev_name);
- flow->oif = snmp_index(ulog_msg->outdev_name);
- flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
- flow->proto = nl->ip_p;
- flow->id = 0;
- flow->tcp_flags = 0;
- flow->pkts = 1;
- flow->sizeF = 0;
- flow->sizeP = 0;
- /* Packets captured from OUTPUT table didn't contains valid timestamp */
- if (ulog_msg->timestamp_sec) {
- flow->ctime.sec = ulog_msg->timestamp_sec;
- flow->ctime.usec = ulog_msg->timestamp_usec;
- } else gettime(&flow->ctime);
- flow->mtime = flow->ctime;
-
- off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
-
- /*
- Offset (from network layer) to transport layer header/IP data
- IOW IP header size ;-)
-
- ?FIXME?
- Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
- */
- off_tl = nl->ip_hl << 2;
- tl = (void *) nl + off_tl;
-
- /* THIS packet data size: data_size = total_size - ip_header_size*4 */
- flow->sizeF = ntohs(nl->ip_len) - off_tl;
- psize -= off_tl;
- if ((signed) flow->sizeF < 0) flow->sizeF = 0;
- if (psize > (signed) flow->sizeF) psize = flow->sizeF;
-
- if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
- /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
-#if ((DEBUG) & DEBUG_C)
- strcat(logbuf, " F");
-#endif
-#if ((DEBUG) & DEBUG_I)
- pkts_total_fragmented++;
-#endif
- flow->flags |= FLOW_FRAG;
- flow->id = nl->ip_id;
-
- if (!(ntohs(nl->ip_off) & IP_MF)) {
- /* Packet whith IP_MF contains information about whole datagram size */
- flow->flags |= FLOW_LASTFRAG;
- /* size = frag_offset*8 + data_size */
- flow->sizeP = off_frag + flow->sizeF;
- }
- }
-
-#if ((DEBUG) & DEBUG_C)
- sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
- strcat(logbuf, buf);
- sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
- strcat(logbuf, buf);
-#endif
-
- /*
- Fortunately most interesting transport layer information fit
- into first 8 bytes of IP data field (minimal nonzero size).
- Thus we don't need actual packet reassembling to build whole
- transport layer data. We only check the fragment offset for
- zero value to find packet with this information.
- */
- if (!off_frag && psize >= 8) {
- switch (flow->proto) {
- case IPPROTO_TCP:
- case IPPROTO_UDP:
- flow->sp = ((struct udphdr *)tl)->uh_sport;
- flow->dp = ((struct udphdr *)tl)->uh_dport;
- goto tl_known;
-
-#ifdef ICMP_TRICK
- case IPPROTO_ICMP:
- flow->sp = htons(((struct icmp *)tl)->icmp_type);
- flow->dp = htons(((struct icmp *)tl)->icmp_code);
- goto tl_known;
-#endif
-#ifdef ICMP_TRICK_CISCO
- case IPPROTO_ICMP:
- flow->dp = *((int32_t *) tl);
- goto tl_known;
-#endif
-
- default:
- /* Unknown transport layer */
-#if ((DEBUG) & DEBUG_C)
- strcat(logbuf, " U");
-#endif
- flow->sp = 0;
- flow->dp = 0;
- break;
-
- tl_known:
-#if ((DEBUG) & DEBUG_C)
- sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
- strcat(logbuf, buf);
-#endif
- flow->flags |= FLOW_TL;
- }
- }
-
- /* Check for tcp flags presence (including CWR and ECE). */
- if (flow->proto == IPPROTO_TCP
- && off_frag < 16
- && psize >= 16 - off_frag) {
- flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
-#if ((DEBUG) & DEBUG_C)
- sprintf(buf, " TCP:%x", flow->tcp_flags);
- strcat(logbuf, buf);
-#endif
- }
-
-#if ((DEBUG) & DEBUG_C)
- sprintf(buf, " => %x", (unsigned) flow);
- strcat(logbuf, buf);
- my_log(LOG_DEBUG, "%s", logbuf);
-#endif
-
-#if ((DEBUG) & DEBUG_I)
- pkts_pending++;
- pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
- if (pending_queue_trace < pending_queue_trace_candidate)
- pending_queue_trace = pending_queue_trace_candidate;
-#endif
-
- /* Flow complete - inform unpending_thread() about it */
- pending_head->flags |= FLOW_PENDING;
- pending_head = pending_head->next;
- done:
- pthread_cond_signal(&unpending_cond);
- }
- }
- return 0;
-}
-
-int main(int argc, char **argv)
-{
- char errpbuf[512];
- char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
- int c, i, write_fd, memory_limit = 0;
- struct addrinfo hints, *res;
- struct sockaddr_in saddr;
- pthread_attr_t tattr;
- struct sigaction sigact;
- static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
- struct timeval timeout;
-
- sched_min = sched_get_priority_min(SCHED);
- sched_max = sched_get_priority_max(SCHED);
-
- memset(&saddr, 0 , sizeof(saddr));
- memset(&hints, 0 , sizeof(hints));
- hints.ai_flags = AI_PASSIVE;
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_DGRAM;
-
- /* Process command line options */
-
- opterr = 0;
- while ((c = my_getopt(argc, argv, parms)) != -1) {
- switch (c) {
- case '?':
- usage();
-
- case 'h':
- usage();
- }
- }
-
- if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
- if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
- if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
- if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
- if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
- if (parms[nflag].count) {
- switch (atoi(parms[nflag].arg)) {
- case 1:
- netflow = &NetFlow1;
- break;
-
- case 5:
- break;
-
- case 7:
- netflow = &NetFlow7;
- break;
-
- default:
- fprintf(stderr, "Illegal %s\n", "NetFlow version");
- exit(1);
- }
- }
- if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
- if (parms[lflag].count) {
- if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
- *log_suffix++ = 0;
- if (*log_suffix) {
- sprintf(errpbuf, "[%s]", log_suffix);
- strcat(ident, errpbuf);
- }
- }
- if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
- if (log_suffix) *--log_suffix = ':';
- }
- if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
- err_malloc:
- fprintf(stderr, "malloc(): %s\n", strerror(errno));
- exit(1);
- }
- sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
- if (parms[qflag].count) {
- pending_queue_length = atoi(parms[qflag].arg);
- if (pending_queue_length < 1) {
- fprintf(stderr, "Illegal %s\n", "pending queue length");
- exit(1);
- }
- }
- if (parms[rflag].count) {
- schedp.sched_priority = atoi(parms[rflag].arg);
- if (schedp.sched_priority
- && (schedp.sched_priority < sched_min
- || schedp.sched_priority > sched_max)) {
- fprintf(stderr, "Illegal %s\n", "realtime priority");
- exit(1);
- }
- }
- if (parms[Bflag].count) {
- sockbufsize = atoi(parms[Bflag].arg) << 10;
- }
- if (parms[bflag].count) {
- bulk_quantity = atoi(parms[bflag].arg);
- if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
- fprintf(stderr, "Illegal %s\n", "bulk size");
- exit(1);
- }
- }
- if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
- if (parms[Xflag].count) {
- for(i = 0; parms[Xflag].arg[i]; i++)
- if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
- if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
- goto err_malloc;
- rule = strtok(parms[Xflag].arg, ":");
- for (i = 0; rule; i++) {
- snmp_rules[i].len = strlen(rule);
- if (snmp_rules[i].len > IFNAMSIZ) {
- fprintf(stderr, "Illegal %s\n", "interface basename");
- exit(1);
- }
- strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
- if (!*(rule - 1)) *(rule - 1) = ',';
- rule = strtok(NULL, ",");
- if (!rule) {
- fprintf(stderr, "Illegal %s\n", "SNMP rule");
- exit(1);
- }
- snmp_rules[i].base = atoi(rule);
- *(rule - 1) = ':';
- rule = strtok(NULL, ":");
- }
- nsnmp_rules = i;
- }
- if (parms[tflag].count)
- sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
- if (parms[aflag].count) {
- if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
- bad_lhost:
- fprintf(stderr, "Illegal %s\n", "source address");
- exit(1);
- } else {
- saddr = *((struct sockaddr_in *) res->ai_addr);
- freeaddrinfo(res);
- }
- }
- if (parms[uflag].count)
- if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
- fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
- exit(1);
- }
-
-
- /* Process collectors parameters. Brrrr... :-[ */
-
- npeers = argc - optind;
- if (npeers > 1) {
- /* Send to remote Netflow collector */
- if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
- for (i = optind, npeers = 0; i < argc; i++, npeers++) {
- dhost = argv[i];
- if (!(dport = strchr(dhost, ':'))) goto bad_collector;
- *dport++ = 0;
- if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
- fprintf(stderr, "socket(): %s\n", strerror(errno));
- exit(1);
- }
- peers[npeers].write_fd = write_fd;
- peers[npeers].type = PEER_MIRROR;
- peers[npeers].laddr = saddr;
- peers[npeers].seq = 0;
- if ((lhost = strchr(dport, '/'))) {
- *lhost++ = 0;
- if ((type = strchr(lhost, '/'))) {
- *type++ = 0;
- switch (*type) {
- case 0:
- case 'm':
- break;
-
- case 'r':
- peers[npeers].type = PEER_ROTATE;
- npeers_rot++;
- break;
-
- default:
- goto bad_collector;
- }
- }
- if (*lhost) {
- if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
- peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
- freeaddrinfo(res);
- }
- }
- if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
- sizeof(struct sockaddr_in))) {
- fprintf(stderr, "bind(): %s\n", strerror(errno));
- exit(1);
- }
- if (getaddrinfo(dhost, dport, &hints, &res)) {
-bad_collector:
- fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
- exit(1);
- }
- peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
- freeaddrinfo(res);
- if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
- sizeof(struct sockaddr_in))) {
- fprintf(stderr, "connect(): %s\n", strerror(errno));
- exit(1);
- }
-
- /* Restore command line */
- if (type) *--type = '/';
- if (lhost) *--lhost = '/';
- *--dport = ':';
- }
- }
- else if (parms[fflag].count) {
- // log into a file
- char *fname;
- if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
- fname = parms[fflag].arg;
- if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
- fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
- exit(1);
- }
- peers[0].write_fd = write_fd;
- peers[0].type = PEER_FILE;
- peers[0].seq = 0;
- npeers++;
- }
- else
- usage();
-
-
- if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
- ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
- if (!ulog_handle) {
- fprintf(stderr, "libipulog initialization error: %s",
- ipulog_strerror(ipulog_errno));
- exit(1);
- }
- if (sockbufsize)
- if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
- &sockbufsize, sizeof(sockbufsize)) < 0)
- fprintf(stderr, "setsockopt(): %s", strerror(errno));
-
- /* Daemonize (if log destination stdout-free) */
-
- my_log_open(ident, verbosity, log_dest);
- if (!(log_dest & 2)) {
- switch (fork()) {
- case -1:
- fprintf(stderr, "fork(): %s", strerror(errno));
- exit(1);
-
- case 0:
- setsid();
- freopen("/dev/null", "r", stdin);
- freopen("/dev/null", "w", stdout);
- freopen("/dev/null", "w", stderr);
- break;
-
- default:
- exit(0);
- }
- } else {
- setvbuf(stdout, (char *)0, _IONBF, 0);
- setvbuf(stderr, (char *)0, _IONBF, 0);
- }
-
- pid = getpid();
- sprintf(errpbuf, "[%ld]", (long) pid);
- strcat(ident, errpbuf);
-
- /* Initialization */
-
- hash_init(); /* Actually for crc16 only */
- mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
- for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
-
-#ifdef UPTIME_TRICK
- /* Hope 12 days is enough :-/ */
- start_time_offset = 1 << 20;
-
- /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
-#endif
- gettime(&start_time);
-
- /*
- Build static pending queue as circular buffer.
- */
- if (!(pending_head = mem_alloc())) goto err_mem_alloc;
- pending_tail = pending_head;
- for (i = pending_queue_length - 1; i--;) {
- if (!(pending_tail->next = mem_alloc())) {
- err_mem_alloc:
- my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
- exit(1);
- }
- pending_tail = pending_tail->next;
- }
- pending_tail->next = pending_head;
- pending_tail = pending_head;
-
- sigemptyset(&sig_mask);
- sigact.sa_handler = &sighandler;
- sigact.sa_mask = sig_mask;
- sigact.sa_flags = 0;
- sigaddset(&sig_mask, SIGTERM);
- sigaction(SIGTERM, &sigact, 0);
-#if ((DEBUG) & DEBUG_I)
- sigaddset(&sig_mask, SIGUSR1);
- sigaction(SIGUSR1, &sigact, 0);
-#endif
- if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
- my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
- exit(1);
- }
-
- my_log(LOG_INFO, "Starting %s...", VERSION);
-
- if (parms[cflag].count) {
- if (chdir(parms[cflag].arg) || chroot(".")) {
- my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
- exit(1);
- }
- }
-
- schedp.sched_priority = schedp.sched_priority - THREADS + 2;
- pthread_attr_init(&tattr);
- for (i = 0; i < THREADS - 1; i++) {
- if (schedp.sched_priority > 0) {
- if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
- (pthread_attr_setschedparam(&tattr, &schedp))) {
- my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
- exit(1);
- }
- }
- if (pthread_create(&thid, &tattr, threads[i], 0)) {
- my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
- exit(1);
- }
- pthread_detach(thid);
- schedp.sched_priority++;
- }
-
- if (pw) {
- if (setgroups(0, NULL)) {
- my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
- exit(1);
- }
- if (setregid(pw->pw_gid, pw->pw_gid)) {
- my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
- exit(1);
- }
- if (setreuid(pw->pw_uid, pw->pw_uid)) {
- my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
- exit(1);
- }
- }
-
- if (!(pidfile = fopen(pidfilepath, "w")))
- my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
- else {
- fprintf(pidfile, "%ld\n", (long) pid);
- fclose(pidfile);
- }
-
- my_log(LOG_INFO, "pid: %d", pid);
- my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
- "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
- ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
- netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
- memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
- emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
- parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
- for (i = 0; i < nsnmp_rules; i++) {
- my_log(LOG_INFO, "SNMP rule #%d %s:%d",
- i + 1, snmp_rules[i].basename, snmp_rules[i].base);
- }
- for (i = 0; i < npeers; i++) {
- switch (peers[i].type) {
- case PEER_MIRROR:
- c = 'm';
- break;
- case PEER_ROTATE:
- c = 'r';
- break;
- }
- snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
- my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
- inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
- }
-
- pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
-
- timeout.tv_usec = 0;
- while (!killed
- || (total_elements - free_elements - pending_queue_length)
- || emit_count
- || pending_tail->flags) {
-
- if (!sigs) {
- timeout.tv_sec = scan_interval;
- select(0, 0, 0, 0, &timeout);
- }
-
- if (sigs & SIGTERM_MASK && !killed) {
- sigs &= ~SIGTERM_MASK;
- my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
- scan_interval = 1;
- frag_lifetime = -1;
- active_lifetime = -1;
- inactive_lifetime = -1;
- emit_timeout = 1;
- unpending_timeout = 1;
- killed = 1;
- pthread_cond_signal(&scan_cond);
- pthread_cond_signal(&unpending_cond);
- }
-
-#if ((DEBUG) & DEBUG_I)
- if (sigs & SIGUSR1_MASK) {
- sigs &= ~SIGUSR1_MASK;
- info_debug();
- }
-#endif
- }
- remove(pidfilepath);
-#if ((DEBUG) & DEBUG_I)
- info_debug();
-#endif
- my_log(LOG_INFO, "Done.");
-#ifdef WALL
- return 0;
-#endif
-}