Re-import of fprobe-ulog
[iptables.git] / src / fprobe-ulog.c
diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c
new file mode 100644 (file)
index 0000000..cdcf170
--- /dev/null
@@ -0,0 +1,1549 @@
+/*
+       Copyright (C) Slava Astashonok <sla@0n.ru>
+
+       This program is free software; you can redistribute it and/or
+       modify it under the terms of the GNU General Public License.
+
+       $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
+*/
+
+#include <common.h>
+
+/* stdout, stderr, freopen() */
+#include <stdio.h>
+
+/* atoi(), exit() */
+#include <stdlib.h>
+
+/* getopt(), alarm(), getpid(), sedsid(), chdir() */
+#include <unistd.h>
+
+/* strerror() */
+#include <string.h>
+
+/* sig*() */
+#include <signal.h>
+
+#include <libipulog/libipulog.h>
+struct ipulog_handle {
+       int fd;
+       u_int8_t blocking;
+       struct sockaddr_nl local;
+       struct sockaddr_nl peer;
+       struct nlmsghdr* last_nlhdr;
+};
+
+/* inet_*() (Linux, FreeBSD, Solaris), getpid() */
+#include <sys/types.h>
+#include <netinet/in_systm.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <netinet/ip_icmp.h>
+#include <net/if.h>
+
+#include <sys/param.h>
+#include <pwd.h>
+#ifdef OS_LINUX
+#include <grp.h>
+#endif
+
+/* pthread_*() */
+#include <pthread.h>
+
+/* errno */
+#include <errno.h>
+
+/* getaddrinfo() */
+#include <netdb.h>
+
+/* nanosleep() */
+#include <time.h>
+
+/* gettimeofday() */
+#include <sys/time.h>
+
+/* scheduling */
+#include <sched.h>
+
+/* select() (POSIX)*/
+#include <sys/select.h>
+
+/* open() */
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <fprobe-ulog.h>
+#include <my_log.h>
+#include <my_getopt.h>
+#include <netflow.h>
+#include <hash.h>
+#include <mem.h>
+
+enum {
+       aflag,
+       Bflag,
+       bflag,
+       cflag,
+       dflag,
+       eflag,
+       fflag,
+       gflag,
+       hflag,
+       lflag,
+       mflag,
+       Mflag,
+       nflag,
+       qflag,
+       rflag,
+       sflag,
+       tflag,
+       Uflag,
+       uflag,
+       vflag,
+       Xflag,
+};
+
+static struct getopt_parms parms[] = {
+       {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'h', 0, 0, 0},
+       {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'M', 0, 0, 0},
+       {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {0, 0, 0, 0}
+};
+
+extern char *optarg;
+extern int optind, opterr, optopt;
+extern int errno;
+
+extern struct NetFlow NetFlow1;
+extern struct NetFlow NetFlow5;
+extern struct NetFlow NetFlow7;
+
+#define mark_is_tos parms[Mflag].count
+static unsigned scan_interval = 5;
+static int frag_lifetime = 30;
+static int inactive_lifetime = 60;
+static int active_lifetime = 300;
+static int sockbufsize;
+#define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
+#if (MEM_BITS == 0) || (MEM_BITS == 16)
+#define BULK_QUANTITY 10000
+#else
+#define BULK_QUANTITY 200
+#endif
+static unsigned bulk_quantity = BULK_QUANTITY;
+static unsigned pending_queue_length = 100;
+static struct NetFlow *netflow = &NetFlow5;
+static unsigned verbosity = 6;
+static unsigned log_dest = MY_LOG_SYSLOG;
+static struct Time start_time;
+static long start_time_offset;
+static int off_tl;
+/* From mem.c */
+extern unsigned total_elements;
+extern unsigned free_elements;
+extern unsigned total_memory;
+#if ((DEBUG) & DEBUG_I)
+static unsigned emit_pkts, emit_queue;
+static uint64_t size_total;
+static unsigned pkts_total, pkts_total_fragmented;
+static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
+static unsigned pkts_pending, pkts_pending_done;
+static unsigned pending_queue_trace, pending_queue_trace_candidate;
+static unsigned flows_total, flows_fragmented;
+#endif
+static unsigned emit_count;
+static uint32_t emit_sequence;
+static unsigned emit_rate_bytes, emit_rate_delay;
+static struct Time emit_time;
+static uint8_t emit_packet[NETFLOW_MAX_PACKET];
+static pthread_t thid;
+static sigset_t sig_mask;
+static struct sched_param schedp;
+static int sched_min, sched_max;
+static int npeers, npeers_rot;
+static struct peer *peers;
+static int sigs;
+
+static struct Flow *flows[1 << HASH_BITS];
+static pthread_mutex_t flows_mutex[1 << HASH_BITS];
+
+static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
+
+static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
+static struct Flow *pending_head, *pending_tail;
+static struct Flow *scan_frag_dreg;
+
+static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
+static struct Flow *flows_emit;
+
+static char ident[256] = "fprobe-ulog";
+static FILE *pidfile;
+static char *pidfilepath;
+static pid_t pid;
+static int killed;
+static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
+static struct ipulog_handle *ulog_handle;
+static uint32_t ulog_gmask = 1;
+static char *cap_buf;
+static int nsnmp_rules;
+static struct snmp_rule *snmp_rules;
+static struct passwd *pw = 0;
+
+void usage()
+{
+       fprintf(stdout,
+               "fprobe-ulog: a NetFlow probe. Version %s\n"
+               "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
+               "\n"
+               "-h\t\tDisplay this help\n"
+               "-U <mask>\tULOG group bitwise mask [1]\n"
+               "-s <seconds>\tHow often scan for expired flows [5]\n"
+               "-g <seconds>\tFragmented flow lifetime [30]\n"
+               "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
+               "-f <filename>\tLog flow data in a file\n"
+               "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
+               "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
+               "-a <address>\tUse <address> as source for NetFlow flow\n"
+               "-X <rules>\tInterface name to SNMP-index conversion rules\n"
+               "-M\t\tUse netfilter mark value as ToS flag\n"
+               "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
+               "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
+               "-q <flows>\tPending queue length [100]\n"
+               "-B <kilobytes>\tKernel capture buffer size [0]\n"
+               "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
+               "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
+               "-c <directory>\tDirectory to chroot to\n"
+               "-u <user>\tUser to run as\n"
+               "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
+               "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
+               "-y <remote:port>\tAddress of the NetFlow collector\n",
+               "-f <writable file>\tFile to write data into\n"
+               VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
+       exit(0);
+}
+
+#if ((DEBUG) & DEBUG_I)
+void info_debug()
+{
+       my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
+               pkts_total, pkts_total_fragmented, size_total,
+               pkts_pending - pkts_pending_done, pending_queue_trace);
+       my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
+               pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
+       my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
+               flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
+       my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
+               total_elements, free_elements, total_memory);
+}
+#endif
+
+void sighandler(int sig)
+{
+       switch (sig) {
+               case SIGTERM:
+                       sigs |= SIGTERM_MASK;
+                       break;
+#if ((DEBUG) & DEBUG_I)
+               case SIGUSR1:
+                       sigs |= SIGUSR1_MASK;
+                       break;
+#endif
+       }
+}
+
+void gettime(struct Time *now)
+{
+       struct timeval t;
+
+       gettimeofday(&t, 0);
+       now->sec = t.tv_sec;
+       now->usec = t.tv_usec;
+}
+
+inline time_t cmpmtime(struct Time *t1, struct Time *t2)
+{
+       return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
+}
+
+/* Uptime in miliseconds */
+uint32_t getuptime(struct Time *t)
+{
+       /* Maximum uptime is about 49/2 days */
+       return cmpmtime(t, &start_time);
+}
+
+hash_t hash_flow(struct Flow *flow)
+{
+       if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
+       else return hash(flow, sizeof(struct Flow_TL));
+}
+
+uint16_t snmp_index(char *name) {
+       uint32_t i;
+
+       if (!*name) return 0;
+
+       for (i = 0; (int) i < nsnmp_rules; i++) {
+               if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
+               return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
+       }
+
+       if ((i = if_nametoindex(name))) return i;
+
+       return -1;
+}
+
+inline void copy_flow(struct Flow *src, struct Flow *dst)
+{
+       dst->iif = src->iif;
+       dst->oif = src->oif;
+       dst->sip = src->sip;
+       dst->dip = src->dip;
+       dst->tos = src->tos;
+       dst->proto = src->proto;
+       dst->tcp_flags = src->tcp_flags;
+       dst->id = src->id;
+       dst->sp = src->sp;
+       dst->dp = src->dp;
+       dst->pkts = src->pkts;
+       dst->size = src->size;
+       dst->sizeF = src->sizeF;
+       dst->sizeP = src->sizeP;
+       dst->ctime = src->ctime;
+       dst->mtime = src->mtime;
+       dst->flags = src->flags;
+}
+
+struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
+{
+       struct Flow **flowpp;
+
+#ifdef WALL
+       flowpp = 0;
+#endif
+
+       if (prev) flowpp = *prev;
+
+       while (where) {
+               if (where->sip.s_addr == what->sip.s_addr
+                       && where->dip.s_addr == what->dip.s_addr
+                       && where->proto == what->proto) {
+                       switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
+                               case 0:
+                                       /* Both unfragmented */
+                                       if ((what->sp == where->sp)
+                                               && (what->dp == where->dp)) goto done;
+                                       break;
+                               case 2:
+                                       /* Both fragmented */
+                                       if (where->id == what->id) goto done;
+                                       break;
+                       }
+               }
+               flowpp = &where->next;
+               where = where->next;
+       }
+done:
+       if (prev) *prev = flowpp;
+       return where;
+}
+
+int put_into(struct Flow *flow, int flag
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+       , char *logbuf
+#endif
+)
+{
+       int ret = 0;
+       hash_t h;
+       struct Flow *flown, **flowpp;
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+       char buf[64];
+#endif
+
+       h = hash_flow(flow);
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+       sprintf(buf, " %x H:%04x", (unsigned) flow, h);
+       strcat(logbuf, buf);
+#endif
+       pthread_mutex_lock(&flows_mutex[h]);
+       flowpp = &flows[h];
+       if (!(flown = find(flows[h], flow, &flowpp))) {
+               /* No suitable flow found - add */
+               if (flag == COPY_INTO) {
+                       if ((flown = mem_alloc())) {
+                               copy_flow(flow, flown);
+                               flow = flown;
+                       } else {
+#if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
+                               my_log(LOG_ERR, "%s %s. %s",
+                                       "mem_alloc():", strerror(errno), "packet lost");
+#endif
+                               return -1;
+                       }
+               }
+               flow->next = flows[h];
+               flows[h] = flow;
+#if ((DEBUG) & DEBUG_I)
+               flows_total++;
+               if (flow->flags & FLOW_FRAG) flows_fragmented++;
+#endif
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+               if (flown) {
+                       sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
+                       strcat(logbuf, buf);
+               }
+#endif
+       } else {
+               /* Found suitable flow - update */
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+               sprintf(buf, " +> %x", (unsigned) flown);
+               strcat(logbuf, buf);
+#endif
+               if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
+                       flown->mtime = flow->mtime;
+               if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
+                       flown->ctime = flow->ctime;
+               flown->tcp_flags |= flow->tcp_flags;
+               flown->size += flow->size;
+               flown->pkts += flow->pkts;
+               if (flow->flags & FLOW_FRAG) {
+                       /* Fragmented flow require some additional work */
+                       if (flow->flags & FLOW_TL) {
+                               /*
+                               ?FIXME?
+                               Several packets with FLOW_TL (attack)
+                               */
+                               flown->sp = flow->sp;
+                               flown->dp = flow->dp;
+                       }
+                       if (flow->flags & FLOW_LASTFRAG) {
+                               /*
+                               ?FIXME?
+                               Several packets with FLOW_LASTFRAG (attack)
+                               */
+                               flown->sizeP = flow->sizeP;
+                       }
+                       flown->flags |= flow->flags;
+                       flown->sizeF += flow->sizeF;
+                       if ((flown->flags & FLOW_LASTFRAG)
+                               && (flown->sizeF >= flown->sizeP)) {
+                               /* All fragments received - flow reassembled */
+                               *flowpp = flown->next;
+                               pthread_mutex_unlock(&flows_mutex[h]);
+#if ((DEBUG) & DEBUG_I)
+                               flows_total--;
+                               flows_fragmented--;
+#endif
+                               flown->id = 0;
+                               flown->flags &= ~FLOW_FRAG;
+#if ((DEBUG) & (DEBUG_U | DEBUG_S))
+                               strcat(logbuf," R");
+#endif
+                               ret = put_into(flown, MOVE_INTO
+#if ((DEBUG) & (DEBUG_U | DEBUG_S))
+                                               , logbuf
+#endif
+                                       );
+                       }
+               }
+               if (flag == MOVE_INTO) mem_free(flow);
+       }
+       pthread_mutex_unlock(&flows_mutex[h]);
+       return ret;
+}
+
+void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
+{
+       int i;
+
+       for (i = 0; i < fields; i++) {
+#if ((DEBUG) & DEBUG_F)
+               my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
+#endif
+               switch (format[i]) {
+                       case NETFLOW_IPV4_SRC_ADDR:
+                               ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
+                               p += NETFLOW_IPV4_SRC_ADDR_SIZE;
+                               break;
+
+                       case NETFLOW_IPV4_DST_ADDR:
+                               ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
+                               p += NETFLOW_IPV4_DST_ADDR_SIZE;
+                               break;
+
+                       case NETFLOW_INPUT_SNMP:
+                               *((uint16_t *) p) = htons(flow->iif);
+                               p += NETFLOW_INPUT_SNMP_SIZE;
+                               break;
+
+                       case NETFLOW_OUTPUT_SNMP:
+                               *((uint16_t *) p) = htons(flow->oif);
+                               p += NETFLOW_OUTPUT_SNMP_SIZE;
+                               break;
+
+                       case NETFLOW_PKTS_32:
+                               *((uint32_t *) p) = htonl(flow->pkts);
+                               p += NETFLOW_PKTS_32_SIZE;
+                               break;
+
+                       case NETFLOW_BYTES_32:
+                               *((uint32_t *) p) = htonl(flow->size);
+                               p += NETFLOW_BYTES_32_SIZE;
+                               break;
+
+                       case NETFLOW_FIRST_SWITCHED:
+                               *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
+                               p += NETFLOW_FIRST_SWITCHED_SIZE;
+                               break;
+
+                       case NETFLOW_LAST_SWITCHED:
+                               *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
+                               p += NETFLOW_LAST_SWITCHED_SIZE;
+                               break;
+
+                       case NETFLOW_L4_SRC_PORT:
+                               *((uint16_t *) p) = flow->sp;
+                               p += NETFLOW_L4_SRC_PORT_SIZE;
+                               break;
+
+                       case NETFLOW_L4_DST_PORT:
+                               *((uint16_t *) p) = flow->dp;
+                               p += NETFLOW_L4_DST_PORT_SIZE;
+                               break;
+
+                       case NETFLOW_PROT:
+                               *((uint8_t *) p) = flow->proto;
+                               p += NETFLOW_PROT_SIZE;
+                               break;
+
+                       case NETFLOW_SRC_TOS:
+                               *((uint8_t *) p) = flow->tos;
+                               p += NETFLOW_SRC_TOS_SIZE;
+                               break;
+
+                       case NETFLOW_TCP_FLAGS:
+                               *((uint8_t *) p) = flow->tcp_flags;
+                               p += NETFLOW_TCP_FLAGS_SIZE;
+                               break;
+
+                       case NETFLOW_VERSION:
+                               *((uint16_t *) p) = htons(netflow->Version);
+                               p += NETFLOW_VERSION_SIZE;
+                               break;
+
+                       case NETFLOW_COUNT:
+                               *((uint16_t *) p) = htons(emit_count);
+                               p += NETFLOW_COUNT_SIZE;
+                               break;
+
+                       case NETFLOW_UPTIME:
+                               *((uint32_t *) p) = htonl(getuptime(&emit_time));
+                               p += NETFLOW_UPTIME_SIZE;
+                               break;
+
+                       case NETFLOW_UNIX_SECS:
+                               *((uint32_t *) p) = htonl(emit_time.sec);
+                               p += NETFLOW_UNIX_SECS_SIZE;
+                               break;
+
+                       case NETFLOW_UNIX_NSECS:
+                               *((uint32_t *) p) = htonl(emit_time.usec * 1000);
+                               p += NETFLOW_UNIX_NSECS_SIZE;
+                               break;
+
+                       case NETFLOW_FLOW_SEQUENCE:
+                               //*((uint32_t *) p) = htonl(emit_sequence);
+                               *((uint32_t *) p) = 0;
+                               p += NETFLOW_FLOW_SEQUENCE_SIZE;
+                               break;
+
+                       case NETFLOW_PAD8:
+                       /* Unsupported (uint8_t) */
+                       case NETFLOW_ENGINE_TYPE:
+                       case NETFLOW_ENGINE_ID:
+                       case NETFLOW_FLAGS7_1:
+                       case NETFLOW_SRC_MASK:
+                       case NETFLOW_DST_MASK:
+                               *((uint8_t *) p) = 0;
+                               p += NETFLOW_PAD8_SIZE;
+                               break;
+
+                       case NETFLOW_PAD16:
+                       /* Unsupported (uint16_t) */
+                       case NETFLOW_SRC_AS:
+                       case NETFLOW_DST_AS:
+                       case NETFLOW_FLAGS7_2:
+                               *((uint16_t *) p) = 0;
+                               p += NETFLOW_PAD16_SIZE;
+                               break;
+
+                       case NETFLOW_PAD32:
+                       /* Unsupported (uint32_t) */
+                       case NETFLOW_IPV4_NEXT_HOP:
+                       case NETFLOW_ROUTER_SC:
+                               *((uint32_t *) p) = 0;
+                               p += NETFLOW_PAD32_SIZE;
+                               break;
+
+                       default:
+                               my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
+                                       format, i, format[i]);
+                               exit(1);
+               }
+       }
+#if ((DEBUG) & DEBUG_F)
+       my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
+#endif
+       return p;
+}
+
+void setuser() {
+       /*
+       Workaround for clone()-based threads
+       Try to change EUID independently of main thread
+       */
+       if (pw) {
+               setgroups(0, NULL);
+               setregid(pw->pw_gid, pw->pw_gid);
+               setreuid(pw->pw_uid, pw->pw_uid);
+       }
+}
+
+void *emit_thread()
+{
+       struct Flow *flow;
+       void *p;
+       struct timeval now;
+       struct timespec timeout;
+       int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
+
+       p = (void *) &emit_packet + netflow->HeaderSize;
+       timeout.tv_nsec = 0;
+
+       setuser();
+
+       for (;;) {
+               pthread_mutex_lock(&emit_mutex);
+               while (!flows_emit) {
+                       gettimeofday(&now, 0);
+                       timeout.tv_sec = now.tv_sec + emit_timeout;
+                       /* Do not wait until emit_packet will filled - it may be too long */
+                       if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
+                               pthread_mutex_unlock(&emit_mutex);
+                               goto sendit;
+                       }
+               }
+               flow = flows_emit;
+               flows_emit = flows_emit->next;
+#if ((DEBUG) & DEBUG_I)
+               emit_queue--;
+#endif         
+               pthread_mutex_unlock(&emit_mutex);
+
+#ifdef UPTIME_TRICK
+               if (!emit_count) {
+                       gettime(&start_time);
+                       start_time.sec -= start_time_offset;
+               }
+#endif
+               p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
+               mem_free(flow);
+               emit_count++;
+#ifdef PF2_DEBUG
+               printf("Emit count = %d\n", emit_count);
+               fflush(stdout);
+#endif
+               if (emit_count == netflow->MaxFlows) {
+               sendit:
+                       gettime(&emit_time);
+                       p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
+                       size = netflow->HeaderSize + emit_count * netflow->FlowSize;
+                       /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
+                       if (size < 1464) size = 1464;
+                       peer_rot_cur = 0;
+                       for (i = 0; i < npeers; i++) {
+                               if (peers[0].type == PEER_FILE) {
+                                               if (netflow->SeqOffset)
+                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
+#define MESSAGES
+                                               ret = write(peers[0].write_fd, emit_packet, size);
+                                               if (ret < size) {
+#if ((DEBUG) & DEBUG_E) || defined MESSAGES
+                                                       my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
+                                                               i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
+#endif
+#undef MESSAGES
+                                               }
+#if ((DEBUG) & DEBUG_E)
+                                               commaneelse {
+                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+                                                               emit_count, i + 1, peers[i].seq);
+                                               }
+#endif
+                                               peers[0].seq += emit_count;
+
+                                               /* Rate limit */
+                                               if (emit_rate_bytes) {
+                                                       sent += size;
+                                                       delay = sent / emit_rate_bytes;
+                                                       if (delay) {
+                                                               sent %= emit_rate_bytes;
+                                                               timeout.tv_sec = 0;
+                                                               timeout.tv_nsec = emit_rate_delay * delay;
+                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+                                                       }
+                                               }
+                                       }
+                               else
+                               if (peers[i].type == PEER_MIRROR) goto sendreal;
+                               else
+                               if (peers[i].type == PEER_ROTATE) 
+                                       if (peer_rot_cur++ == peer_rot_work) {
+                                       sendreal:
+                                               if (netflow->SeqOffset)
+                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
+                                               ret = send(peers[i].write_fd, emit_packet, size, 0);
+                                               if (ret < size) {
+#if ((DEBUG) & DEBUG_E) || defined MESSAGES
+                                                       my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
+                                                               i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
+#endif
+                                               }
+#if ((DEBUG) & DEBUG_E)
+                                               commaneelse {
+                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+                                                               emit_count, i + 1, peers[i].seq);
+                                               }
+#endif
+                                               peers[i].seq += emit_count;
+
+                                               /* Rate limit */
+                                               if (emit_rate_bytes) {
+                                                       sent += size;
+                                                       delay = sent / emit_rate_bytes;
+                                                       if (delay) {
+                                                               sent %= emit_rate_bytes;
+                                                               timeout.tv_sec = 0;
+                                                               timeout.tv_nsec = emit_rate_delay * delay;
+                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+                                                       }
+                                               }
+                                       }
+                       }
+                       if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
+                       emit_sequence += emit_count;
+                       emit_count = 0;
+#if ((DEBUG) & DEBUG_I)
+                       emit_pkts++;
+#endif
+               }
+       }
+}      
+
+void *unpending_thread()
+{
+       struct timeval now;
+       struct timespec timeout;
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+       char logbuf[256];
+#endif
+
+       setuser();
+
+       timeout.tv_nsec = 0;
+       pthread_mutex_lock(&unpending_mutex);
+
+       for (;;) {
+               while (!(pending_tail->flags & FLOW_PENDING)) {
+                       gettimeofday(&now, 0);
+                       timeout.tv_sec = now.tv_sec + unpending_timeout;
+                       pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
+               }
+
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+               *logbuf = 0;
+#endif
+               if (put_into(pending_tail, COPY_INTO
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+                               , logbuf
+#endif
+                       ) < 0) {
+#if ((DEBUG) & DEBUG_I)
+                       pkts_lost_unpending++;
+#endif                         
+               }
+
+#if ((DEBUG) & DEBUG_U)
+               my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
+#endif
+
+               pending_tail->flags = 0;
+               pending_tail = pending_tail->next;
+#if ((DEBUG) & DEBUG_I)
+               pkts_pending_done++;
+#endif
+       }
+}
+
+void *scan_thread()
+{
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+       char logbuf[256];
+#endif
+       int i;
+       struct Flow *flow, **flowpp;
+       struct Time now;
+       struct timespec timeout;
+
+       setuser();
+
+       timeout.tv_nsec = 0;
+       pthread_mutex_lock(&scan_mutex);
+
+       for (;;) {
+               gettime(&now);
+               timeout.tv_sec = now.sec + scan_interval;
+               pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
+
+               gettime(&now);
+#if ((DEBUG) & DEBUG_S)
+               my_log(LOG_DEBUG, "S: %d", now.sec);
+#endif
+               for (i = 0; i < 1 << HASH_BITS ; i++) {
+                       pthread_mutex_lock(&flows_mutex[i]);
+                       flow = flows[i];
+                       flowpp = &flows[i];
+                       while (flow) {
+                               if (flow->flags & FLOW_FRAG) {
+                                       /* Process fragmented flow */
+                                       if ((now.sec - flow->mtime.sec) > frag_lifetime) {
+                                               /* Fragmented flow expired - put it into special chain */
+#if ((DEBUG) & DEBUG_I)
+                                               flows_fragmented--;
+                                               flows_total--;
+#endif
+                                               *flowpp = flow->next;
+                                               flow->id = 0;
+                                               flow->flags &= ~FLOW_FRAG;
+                                               flow->next = scan_frag_dreg;
+                                               scan_frag_dreg = flow;
+                                               flow = *flowpp;
+                                               continue;
+                                       }
+                               } else {
+                                       /* Flow is not frgamented */
+                                       if ((now.sec - flow->mtime.sec) > inactive_lifetime
+                                               || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
+                                               /* Flow expired */
+#if ((DEBUG) & DEBUG_S)
+                                               my_log(LOG_DEBUG, "S: E %x", flow);
+#endif
+#if ((DEBUG) & DEBUG_I)
+                                               flows_total--;
+#endif
+                                               *flowpp = flow->next;
+                                               pthread_mutex_lock(&emit_mutex);
+                                               flow->next = flows_emit;
+                                               flows_emit = flow;
+#if ((DEBUG) & DEBUG_I)
+                                               emit_queue++;
+#endif                         
+                                               pthread_mutex_unlock(&emit_mutex);
+                                               flow = *flowpp;
+                                               continue;
+                                       }
+                               }
+                               flowpp = &flow->next;
+                               flow = flow->next;
+                       } /* chain loop */
+                       pthread_mutex_unlock(&flows_mutex[i]);
+               } /* hash loop */
+               if (flows_emit) pthread_cond_signal(&emit_cond);
+
+               while (scan_frag_dreg) {
+                       flow = scan_frag_dreg;
+                       scan_frag_dreg = flow->next;
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+                       *logbuf = 0;
+#endif
+                       put_into(flow, MOVE_INTO
+#if ((DEBUG) & (DEBUG_S | DEBUG_U))
+                               , logbuf
+#endif
+                       );
+#if ((DEBUG) & DEBUG_S)
+                       my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
+#endif
+               }
+       }
+}
+
+void *cap_thread()
+{
+       struct ulog_packet_msg *ulog_msg;
+       struct ip *nl;
+       void *tl;
+       struct Flow *flow;
+       int len, off_frag, psize;
+#if ((DEBUG) & DEBUG_C)
+       char buf[64];
+       char logbuf[256];
+#endif
+
+       setuser();
+
+       while (!killed) {
+               len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
+               if (len <= 0) {
+                       my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
+                       continue;
+               }
+               while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
+
+#if ((DEBUG) & DEBUG_C)
+                       sprintf(logbuf, "C: %d", ulog_msg->data_len);
+#endif
+
+                       nl = (void *) &ulog_msg->payload;
+                       psize = ulog_msg->data_len;
+
+                       /* Sanity check */
+                       if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
+#if ((DEBUG) & DEBUG_C)
+                               strcat(logbuf, " U");
+                               my_log(LOG_DEBUG, "%s", logbuf);
+#endif
+#if ((DEBUG) & DEBUG_I)
+                               pkts_ignored++;
+#endif
+                               continue;
+                       }
+
+                       if (pending_head->flags) {
+#if ((DEBUG) & DEBUG_C) || defined MESSAGES
+                               my_log(LOG_ERR,
+# if ((DEBUG) & DEBUG_C)
+                                       "%s %s %s", logbuf,
+# else
+                                       "%s %s",
+# endif
+                                       "pending queue full:", "packet lost");
+#endif
+#if ((DEBUG) & DEBUG_I)
+                               pkts_lost_capture++;
+#endif
+                               goto done;
+                       }
+
+#if ((DEBUG) & DEBUG_I)
+                       pkts_total++;
+#endif
+
+                       flow = pending_head;
+
+                       /* ?FIXME? Add sanity check for ip_len? */
+                       flow->size = ntohs(nl->ip_len);
+#if ((DEBUG) & DEBUG_I)
+                       size_total += flow->size;
+#endif
+
+                       flow->sip = nl->ip_src;
+                       flow->dip = nl->ip_dst;
+                       flow->iif = snmp_index(ulog_msg->indev_name);
+                       flow->oif = snmp_index(ulog_msg->outdev_name);
+                       flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
+                       flow->proto = nl->ip_p;
+                       flow->id = 0;
+                       flow->tcp_flags = 0;
+                       flow->pkts = 1;
+                       flow->sizeF = 0;
+                       flow->sizeP = 0;
+                       /* Packets captured from OUTPUT table didn't contains valid timestamp */
+                       if (ulog_msg->timestamp_sec) {
+                               flow->ctime.sec = ulog_msg->timestamp_sec;
+                               flow->ctime.usec = ulog_msg->timestamp_usec;
+                       } else gettime(&flow->ctime);
+                       flow->mtime = flow->ctime;
+
+                       off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
+
+                       /*
+                       Offset (from network layer) to transport layer header/IP data
+                       IOW IP header size ;-)
+
+                       ?FIXME?
+                       Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
+                       */
+                       off_tl = nl->ip_hl << 2;
+                       tl = (void *) nl + off_tl;
+
+                       /* THIS packet data size: data_size = total_size - ip_header_size*4 */
+                       flow->sizeF = ntohs(nl->ip_len) - off_tl;
+                       psize -= off_tl;
+                       if ((signed) flow->sizeF < 0) flow->sizeF = 0;
+                       if (psize > (signed) flow->sizeF) psize = flow->sizeF;
+
+                       if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
+                               /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
+#if ((DEBUG) & DEBUG_C)
+                               strcat(logbuf, " F");
+#endif
+#if ((DEBUG) & DEBUG_I)
+                               pkts_total_fragmented++;
+#endif
+                               flow->flags |= FLOW_FRAG;
+                               flow->id = nl->ip_id;
+
+                               if (!(ntohs(nl->ip_off) & IP_MF)) {
+                                       /* Packet whith IP_MF contains information about whole datagram size */
+                                       flow->flags |= FLOW_LASTFRAG;
+                                       /* size = frag_offset*8 + data_size */
+                                       flow->sizeP = off_frag + flow->sizeF;
+                               }
+                       }
+
+#if ((DEBUG) & DEBUG_C)
+                       sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
+                       strcat(logbuf, buf);
+                       sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
+                       strcat(logbuf, buf);
+#endif
+
+                       /*
+                       Fortunately most interesting transport layer information fit
+                       into first 8 bytes of IP data field (minimal nonzero size).
+                       Thus we don't need actual packet reassembling to build whole
+                       transport layer data. We only check the fragment offset for
+                       zero value to find packet with this information.
+                       */
+                       if (!off_frag && psize >= 8) {
+                               switch (flow->proto) {
+                                       case IPPROTO_TCP:
+                                       case IPPROTO_UDP:
+                                               flow->sp = ((struct udphdr *)tl)->uh_sport;
+                                               flow->dp = ((struct udphdr *)tl)->uh_dport;
+                                               goto tl_known;
+
+#ifdef ICMP_TRICK
+                                       case IPPROTO_ICMP:
+                                               flow->sp = htons(((struct icmp *)tl)->icmp_type);
+                                               flow->dp = htons(((struct icmp *)tl)->icmp_code);
+                                               goto tl_known;
+#endif
+#ifdef ICMP_TRICK_CISCO
+                                       case IPPROTO_ICMP:
+                                               flow->dp = *((int32_t *) tl);
+                                               goto tl_known;
+#endif
+
+                                       default:
+                                               /* Unknown transport layer */
+#if ((DEBUG) & DEBUG_C)
+                                               strcat(logbuf, " U");
+#endif
+                                               flow->sp = 0;
+                                               flow->dp = 0;
+                                               break;
+
+                                       tl_known:
+#if ((DEBUG) & DEBUG_C)
+                                               sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
+                                               strcat(logbuf, buf);
+#endif
+                                               flow->flags |= FLOW_TL;
+                               }
+                       }
+
+                       /* Check for tcp flags presence (including CWR and ECE). */
+                       if (flow->proto == IPPROTO_TCP
+                               && off_frag < 16
+                               && psize >= 16 - off_frag) {
+                               flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
+#if ((DEBUG) & DEBUG_C)
+                               sprintf(buf, " TCP:%x", flow->tcp_flags);
+                               strcat(logbuf, buf);
+#endif
+                       }
+
+#if ((DEBUG) & DEBUG_C)
+                       sprintf(buf, " => %x", (unsigned) flow);
+                       strcat(logbuf, buf);
+                       my_log(LOG_DEBUG, "%s", logbuf);
+#endif
+
+#if ((DEBUG) & DEBUG_I)
+                       pkts_pending++;
+                       pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
+                       if (pending_queue_trace < pending_queue_trace_candidate)
+                               pending_queue_trace = pending_queue_trace_candidate;
+#endif
+
+                       /* Flow complete - inform unpending_thread() about it */
+                       pending_head->flags |= FLOW_PENDING;
+                       pending_head = pending_head->next;
+               done:
+                       pthread_cond_signal(&unpending_cond);
+               }
+       }
+       return 0;
+}
+
+int main(int argc, char **argv)
+{
+       char errpbuf[512];
+       char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
+       int c, i, write_fd, memory_limit = 0;
+       struct addrinfo hints, *res;
+       struct sockaddr_in saddr;
+       pthread_attr_t tattr;
+       struct sigaction sigact;
+       static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
+       struct timeval timeout;
+
+       sched_min = sched_get_priority_min(SCHED);
+       sched_max = sched_get_priority_max(SCHED);
+
+       memset(&saddr, 0 , sizeof(saddr));
+       memset(&hints, 0 , sizeof(hints));
+       hints.ai_flags = AI_PASSIVE;
+       hints.ai_family = AF_INET;
+       hints.ai_socktype = SOCK_DGRAM;
+
+       /* Process command line options */
+
+       opterr = 0;
+       while ((c = my_getopt(argc, argv, parms)) != -1) {
+               switch (c) {
+                       case '?':
+                               usage();
+
+                       case 'h':
+                               usage();
+               }
+       }
+
+       if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
+       if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
+       if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
+       if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
+       if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
+       if (parms[nflag].count) {
+               switch (atoi(parms[nflag].arg)) {
+                       case 1:
+                               netflow = &NetFlow1;
+                               break;
+
+                       case 5:
+                               break;
+
+                       case 7:
+                               netflow = &NetFlow7;
+                               break;
+
+                       default:
+                               fprintf(stderr, "Illegal %s\n", "NetFlow version");
+                               exit(1);
+               }
+       }
+       if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
+       if (parms[lflag].count) {
+               if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
+                       *log_suffix++ = 0;
+                       if (*log_suffix) {
+                               sprintf(errpbuf, "[%s]", log_suffix);
+                               strcat(ident, errpbuf);
+                       }
+               }
+               if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
+               if (log_suffix) *--log_suffix = ':';
+       }
+       if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
+       err_malloc:
+               fprintf(stderr, "malloc(): %s\n", strerror(errno));
+               exit(1);
+       }
+       sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
+       if (parms[qflag].count) {
+               pending_queue_length = atoi(parms[qflag].arg);
+               if (pending_queue_length < 1) {
+                       fprintf(stderr, "Illegal %s\n", "pending queue length");
+                       exit(1);
+               }
+       }
+       if (parms[rflag].count) {
+               schedp.sched_priority = atoi(parms[rflag].arg);
+               if (schedp.sched_priority
+                       && (schedp.sched_priority < sched_min
+                               || schedp.sched_priority > sched_max)) {
+                       fprintf(stderr, "Illegal %s\n", "realtime priority");
+                       exit(1);
+               }
+       }
+       if (parms[Bflag].count) {
+               sockbufsize = atoi(parms[Bflag].arg) << 10;
+       }
+       if (parms[bflag].count) {
+               bulk_quantity = atoi(parms[bflag].arg);
+               if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
+                       fprintf(stderr, "Illegal %s\n", "bulk size");
+                       exit(1);
+               }
+       }
+       if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
+       if (parms[Xflag].count) {
+               for(i = 0; parms[Xflag].arg[i]; i++)
+                       if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
+               if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
+                       goto err_malloc;
+               rule = strtok(parms[Xflag].arg, ":");
+               for (i = 0; rule; i++) {
+                       snmp_rules[i].len = strlen(rule);
+                       if (snmp_rules[i].len > IFNAMSIZ) {
+                               fprintf(stderr, "Illegal %s\n", "interface basename");
+                               exit(1);
+                       }
+                       strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
+                       if (!*(rule - 1)) *(rule - 1) = ',';
+                       rule = strtok(NULL, ",");
+                       if (!rule) {
+                               fprintf(stderr, "Illegal %s\n", "SNMP rule");
+                               exit(1);
+                       }
+                       snmp_rules[i].base = atoi(rule);
+                       *(rule - 1) = ':';
+                       rule = strtok(NULL, ":");
+               }
+               nsnmp_rules = i;
+       }
+       if (parms[tflag].count)
+               sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
+       if (parms[aflag].count) {
+               if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
+               bad_lhost:
+                       fprintf(stderr, "Illegal %s\n", "source address");
+                       exit(1);
+               } else {
+                       saddr = *((struct sockaddr_in *) res->ai_addr);
+                       freeaddrinfo(res);
+               }
+       }
+       if (parms[uflag].count) 
+               if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
+                       fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
+                       exit(1);
+               }
+
+
+       /* Process collectors parameters. Brrrr... :-[ */
+
+       npeers = argc - optind;
+       if (npeers > 1) {
+               /* Send to remote Netflow collector */
+               if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
+               for (i = optind, npeers = 0; i < argc; i++, npeers++) {
+                       dhost = argv[i];
+                       if (!(dport = strchr(dhost, ':'))) goto bad_collector;
+                       *dport++ = 0;
+                       if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
+                               fprintf(stderr, "socket(): %s\n", strerror(errno));
+                               exit(1);
+                       }
+                       peers[npeers].write_fd = write_fd;
+                       peers[npeers].type = PEER_MIRROR;
+                       peers[npeers].laddr = saddr;
+                       peers[npeers].seq = 0;
+                       if ((lhost = strchr(dport, '/'))) {
+                               *lhost++ = 0;
+                               if ((type = strchr(lhost, '/'))) {
+                                       *type++ = 0;
+                                       switch (*type) {
+                                               case 0:
+                                               case 'm':
+                                                       break;
+
+                                               case 'r':
+                                                       peers[npeers].type = PEER_ROTATE;
+                                                       npeers_rot++;
+                                                       break;
+
+                                               default:
+                                                       goto bad_collector;
+                                       }
+                               }
+                               if (*lhost) {
+                                       if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
+                                       peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
+                                       freeaddrinfo(res);
+                               }
+                       }
+                       if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
+                                               sizeof(struct sockaddr_in))) {
+                               fprintf(stderr, "bind(): %s\n", strerror(errno));
+                               exit(1);
+                       }
+                       if (getaddrinfo(dhost, dport, &hints, &res)) {
+bad_collector:
+                               fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
+                               exit(1);
+                       }
+                       peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
+                       freeaddrinfo(res);
+                       if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
+                                               sizeof(struct sockaddr_in))) {
+                               fprintf(stderr, "connect(): %s\n", strerror(errno));
+                               exit(1);
+                       }
+
+                       /* Restore command line */
+                       if (type) *--type = '/';
+                       if (lhost) *--lhost = '/';
+                       *--dport = ':';
+               }
+       }
+       else if (parms[fflag].count) {
+               // log into a file
+               char *fname;
+               if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
+               fname = parms[fflag].arg;
+               if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
+                       fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
+                       exit(1);
+               }
+               peers[0].write_fd = write_fd;
+               peers[0].type = PEER_FILE;
+               peers[0].seq = 0;
+               npeers++;
+       }
+       else 
+               usage();
+
+
+       if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
+       ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
+       if (!ulog_handle) {
+               fprintf(stderr, "libipulog initialization error: %s",
+                       ipulog_strerror(ipulog_errno));
+               exit(1);
+       }
+       if (sockbufsize)
+               if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
+                       &sockbufsize, sizeof(sockbufsize)) < 0)
+                       fprintf(stderr, "setsockopt(): %s", strerror(errno));
+
+       /* Daemonize (if log destination stdout-free) */
+
+       my_log_open(ident, verbosity, log_dest);
+       if (!(log_dest & 2)) {
+               switch (fork()) {
+                       case -1:
+                               fprintf(stderr, "fork(): %s", strerror(errno));
+                               exit(1);
+
+                       case 0:
+                               setsid();
+                               freopen("/dev/null", "r", stdin);
+                               freopen("/dev/null", "w", stdout);
+                               freopen("/dev/null", "w", stderr);
+                               break;
+
+                       default:
+                               exit(0);
+               }
+       } else {
+               setvbuf(stdout, (char *)0, _IONBF, 0);
+               setvbuf(stderr, (char *)0, _IONBF, 0);
+       }
+
+       pid = getpid();
+       sprintf(errpbuf, "[%ld]", (long) pid);
+       strcat(ident, errpbuf);
+
+       /* Initialization */
+
+       hash_init(); /* Actually for crc16 only */
+       mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
+       for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
+
+#ifdef UPTIME_TRICK
+       /* Hope 12 days is enough :-/ */
+       start_time_offset = 1 << 20;
+
+       /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
+#endif
+       gettime(&start_time);
+
+       /*
+       Build static pending queue as circular buffer.
+       */
+       if (!(pending_head = mem_alloc())) goto err_mem_alloc;
+       pending_tail = pending_head;
+       for (i = pending_queue_length - 1; i--;) {
+               if (!(pending_tail->next = mem_alloc())) {
+               err_mem_alloc:
+                       my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
+                       exit(1);
+               }
+               pending_tail = pending_tail->next;
+       }
+       pending_tail->next = pending_head;
+       pending_tail = pending_head;
+
+       sigemptyset(&sig_mask);
+       sigact.sa_handler = &sighandler;
+       sigact.sa_mask = sig_mask;
+       sigact.sa_flags = 0;
+       sigaddset(&sig_mask, SIGTERM);
+       sigaction(SIGTERM, &sigact, 0);
+#if ((DEBUG) & DEBUG_I)
+       sigaddset(&sig_mask, SIGUSR1);
+       sigaction(SIGUSR1, &sigact, 0);
+#endif
+       if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
+               my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
+               exit(1);
+       }
+
+       my_log(LOG_INFO, "Starting %s...", VERSION);
+
+       if (parms[cflag].count) {
+               if (chdir(parms[cflag].arg) || chroot(".")) {
+                       my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
+                       exit(1);
+               }
+       }
+
+       schedp.sched_priority = schedp.sched_priority - THREADS + 2;
+       pthread_attr_init(&tattr);
+       for (i = 0; i < THREADS - 1; i++) {
+               if (schedp.sched_priority > 0) {
+                       if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
+                               (pthread_attr_setschedparam(&tattr, &schedp))) {
+                               my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
+                               exit(1);
+                       }
+               }
+               if (pthread_create(&thid, &tattr, threads[i], 0)) {
+                       my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
+                       exit(1);
+               }
+               pthread_detach(thid);
+               schedp.sched_priority++;
+       }
+
+       if (pw) {
+               if (setgroups(0, NULL)) {
+                       my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
+                       exit(1);
+               }
+               if (setregid(pw->pw_gid, pw->pw_gid)) {
+                       my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
+                       exit(1);
+               }
+               if (setreuid(pw->pw_uid, pw->pw_uid)) {
+                       my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
+                       exit(1);
+               }
+       }
+
+       if (!(pidfile = fopen(pidfilepath, "w")))
+               my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
+       else {
+               fprintf(pidfile, "%ld\n", (long) pid);
+               fclose(pidfile);
+       }
+
+       my_log(LOG_INFO, "pid: %d", pid);
+       my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
+               "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
+               ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
+               netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
+               memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
+               emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
+               parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
+       for (i = 0; i < nsnmp_rules; i++) {
+               my_log(LOG_INFO, "SNMP rule #%d %s:%d",
+                       i + 1, snmp_rules[i].basename, snmp_rules[i].base);
+       }
+       for (i = 0; i < npeers; i++) {
+               switch (peers[i].type) {
+                       case PEER_MIRROR:
+                               c = 'm';
+                               break;
+                       case PEER_ROTATE:
+                               c = 'r';
+                               break;
+               }
+               snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
+               my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
+                       inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
+       }
+
+       pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
+
+       timeout.tv_usec = 0;
+       while (!killed
+               || (total_elements - free_elements - pending_queue_length)
+               || emit_count
+               || pending_tail->flags) {
+
+               if (!sigs) {
+                       timeout.tv_sec = scan_interval;
+                       select(0, 0, 0, 0, &timeout);
+               }
+
+               if (sigs & SIGTERM_MASK && !killed) {
+                       sigs &= ~SIGTERM_MASK;
+                       my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
+                       scan_interval = 1;
+                       frag_lifetime = -1;
+                       active_lifetime = -1;
+                       inactive_lifetime = -1;
+                       emit_timeout = 1;
+                       unpending_timeout = 1;
+                       killed = 1;
+                       pthread_cond_signal(&scan_cond);
+                       pthread_cond_signal(&unpending_cond);
+               }
+
+#if ((DEBUG) & DEBUG_I)
+               if (sigs & SIGUSR1_MASK) {
+                       sigs &= ~SIGUSR1_MASK;
+                       info_debug();
+               }
+#endif
+       }
+       remove(pidfilepath);
+#if ((DEBUG) & DEBUG_I)
+       info_debug();
+#endif
+       my_log(LOG_INFO, "Done.");
+#ifdef WALL
+       return 0;
+#endif
+}