X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=trunk%2Fsrc%2Ffprobe-ulog.c;fp=trunk%2Fsrc%2Ffprobe-ulog.c;h=0000000000000000000000000000000000000000;hb=4a2a65ece88edbfdfda338e263370f292e311228;hp=db63d9cefc7de7b261e0c20c8ff305a36d53f31f;hpb=85718e4dcaf5f34496f629e45a47ec91145c6f9e;p=iptables.git diff --git a/trunk/src/fprobe-ulog.c b/trunk/src/fprobe-ulog.c deleted file mode 100644 index db63d9c..0000000 --- a/trunk/src/fprobe-ulog.c +++ /dev/null @@ -1,1543 +0,0 @@ -/* - Copyright (C) Slava Astashonok - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License. - - $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $ -*/ - -#include - -/* stdout, stderr, freopen() */ -#include - -/* atoi(), exit() */ -#include - -/* getopt(), alarm(), getpid(), sedsid(), chdir() */ -#include - -/* strerror() */ -#include - -/* sig*() */ -#include - -#include -struct ipulog_handle { - int fd; - u_int8_t blocking; - struct sockaddr_nl local; - struct sockaddr_nl peer; - struct nlmsghdr* last_nlhdr; -}; - -/* inet_*() (Linux, FreeBSD, Solaris), getpid() */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#ifdef OS_LINUX -#include -#endif - -/* pthread_*() */ -#include - -/* errno */ -#include - -/* getaddrinfo() */ -#include - -/* nanosleep() */ -#include - -/* gettimeofday() */ -#include - -/* scheduling */ -#include - -/* select() (POSIX)*/ -#include - -/* open() */ -#include -#include - -#include -#include -#include -#include -#include -#include - -enum { - aflag, - Bflag, - bflag, - cflag, - dflag, - eflag, - fflag, - gflag, - hflag, - lflag, - mflag, - Mflag, - nflag, - qflag, - rflag, - sflag, - tflag, - Uflag, - uflag, - vflag, - Xflag, -}; - -static struct getopt_parms parms[] = { - {'a', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'B', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'b', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'c', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'d', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'e', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'f', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'g', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'h', 0, 0, 0}, - {'l', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'m', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'M', 0, 0, 0}, - {'n', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'q', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'r', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'s', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'t', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'U', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'u', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'v', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {'X', MY_GETOPT_ARG_REQUIRED, 0, 0}, - {0, 0, 0, 0} -}; - -extern char *optarg; -extern int optind, opterr, optopt; -extern int errno; - -extern struct NetFlow NetFlow1; -extern struct NetFlow NetFlow5; -extern struct NetFlow NetFlow7; - -#define mark_is_tos parms[Mflag].count -static unsigned scan_interval = 5; -static int frag_lifetime = 30; -static int inactive_lifetime = 60; -static int active_lifetime = 300; -static int sockbufsize; -#define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1) -#if (MEM_BITS == 0) || (MEM_BITS == 16) -#define BULK_QUANTITY 10000 -#else -#define BULK_QUANTITY 200 -#endif -static unsigned bulk_quantity = BULK_QUANTITY; -static unsigned pending_queue_length = 100; -static struct NetFlow *netflow = &NetFlow5; -static unsigned verbosity = 6; -static unsigned log_dest = MY_LOG_SYSLOG; -static struct Time start_time; -static long start_time_offset; -static int off_tl; -/* From mem.c */ -extern unsigned total_elements; -extern unsigned free_elements; -extern unsigned total_memory; -#if ((DEBUG) & DEBUG_I) -static unsigned emit_pkts, emit_queue; -static uint64_t size_total; -static unsigned pkts_total, pkts_total_fragmented; -static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending; -static unsigned pkts_pending, pkts_pending_done; -static unsigned pending_queue_trace, pending_queue_trace_candidate; -static unsigned flows_total, flows_fragmented; -#endif -static unsigned emit_count; -static uint32_t emit_sequence; -static unsigned emit_rate_bytes, emit_rate_delay; -static struct Time emit_time; -static uint8_t emit_packet[NETFLOW_MAX_PACKET]; -static pthread_t thid; -static sigset_t sig_mask; -static struct sched_param schedp; -static int sched_min, sched_max; -static int npeers, npeers_rot; -static struct peer *peers; -static int sigs; - -static struct Flow *flows[1 << HASH_BITS]; -static pthread_mutex_t flows_mutex[1 << HASH_BITS]; - -static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER; - -static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER; -static struct Flow *pending_head, *pending_tail; -static struct Flow *scan_frag_dreg; - -static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER; -static struct Flow *flows_emit; - -static char ident[256] = "fprobe-ulog"; -static FILE *pidfile; -static char *pidfilepath; -static pid_t pid; -static int killed; -static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT; -static struct ipulog_handle *ulog_handle; -static uint32_t ulog_gmask = 1; -static char *cap_buf; -static int nsnmp_rules; -static struct snmp_rule *snmp_rules; -static struct passwd *pw = 0; - -void usage() -{ - fprintf(stdout, - "fprobe-ulog: a NetFlow probe. Version %s\n" - "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n" - "\n" - "-h\t\tDisplay this help\n" - "-U \tULOG group bitwise mask [1]\n" - "-s \tHow often scan for expired flows [5]\n" - "-g \tFragmented flow lifetime [30]\n" - "-d \tIdle flow lifetime (inactive timer) [60]\n" - "-e \tActive flow lifetime (active timer) [300]\n" - "-n \tNetFlow version for use (1, 5 or 7) [5]\n" - "-a
\tUse
as source for NetFlow flow\n" - "-X \tInterface name to SNMP-index conversion rules\n" - "-M\t\tUse netfilter mark value as ToS flag\n" - "-b \tMemory bulk size (1..%u) [%u]\n" - "-m \tMemory limit (0=no limit) [0]\n" - "-q \tPending queue length [100]\n" - "-B \tKernel capture buffer size [0]\n" - "-r \tReal-time priority (0=disabled, %d..%d) [0]\n" - "-t \tProduce nanosecond delay after each bytes sent [0:0]\n" - "-c \tDirectory to chroot to\n" - "-u \tUser to run as\n" - "-v \tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n" - "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n" - "-y \tAddress of the NetFlow collector\n", - "-f \tFile to write data into\n" - VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max); - exit(0); -} - -#if ((DEBUG) & DEBUG_I) -void info_debug() -{ - my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d", - pkts_total, pkts_total_fragmented, size_total, - pkts_pending - pkts_pending_done, pending_queue_trace); - my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d", - pkts_ignored, pkts_lost_capture, pkts_lost_unpending); - my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d", - flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue); - my_log(LOG_DEBUG, "I: memory:%d/%d (%d)", - total_elements, free_elements, total_memory); -} -#endif - -void sighandler(int sig) -{ - switch (sig) { - case SIGTERM: - sigs |= SIGTERM_MASK; - break; -#if ((DEBUG) & DEBUG_I) - case SIGUSR1: - sigs |= SIGUSR1_MASK; - break; -#endif - } -} - -void gettime(struct Time *now) -{ - struct timeval t; - - gettimeofday(&t, 0); - now->sec = t.tv_sec; - now->usec = t.tv_usec; -} - -inline time_t cmpmtime(struct Time *t1, struct Time *t2) -{ - return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000; -} - -/* Uptime in miliseconds */ -uint32_t getuptime(struct Time *t) -{ - /* Maximum uptime is about 49/2 days */ - return cmpmtime(t, &start_time); -} - -hash_t hash_flow(struct Flow *flow) -{ - if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F)); - else return hash(flow, sizeof(struct Flow_TL)); -} - -uint16_t snmp_index(char *name) { - uint32_t i; - - if (!*name) return 0; - - for (i = 0; (int) i < nsnmp_rules; i++) { - if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue; - return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base; - } - - if ((i = if_nametoindex(name))) return i; - - return -1; -} - -inline void copy_flow(struct Flow *src, struct Flow *dst) -{ - dst->iif = src->iif; - dst->oif = src->oif; - dst->sip = src->sip; - dst->dip = src->dip; - dst->tos = src->tos; - dst->proto = src->proto; - dst->tcp_flags = src->tcp_flags; - dst->id = src->id; - dst->sp = src->sp; - dst->dp = src->dp; - dst->pkts = src->pkts; - dst->size = src->size; - dst->sizeF = src->sizeF; - dst->sizeP = src->sizeP; - dst->ctime = src->ctime; - dst->mtime = src->mtime; - dst->flags = src->flags; -} - -struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev) -{ - struct Flow **flowpp; - -#ifdef WALL - flowpp = 0; -#endif - - if (prev) flowpp = *prev; - - while (where) { - if (where->sip.s_addr == what->sip.s_addr - && where->dip.s_addr == what->dip.s_addr - && where->proto == what->proto) { - switch ((what->flags + where->flags) & FLOW_FRAGMASK) { - case 0: - /* Both unfragmented */ - if ((what->sp == where->sp) - && (what->dp == where->dp)) goto done; - break; - case 2: - /* Both fragmented */ - if (where->id == what->id) goto done; - break; - } - } - flowpp = &where->next; - where = where->next; - } -done: - if (prev) *prev = flowpp; - return where; -} - -int put_into(struct Flow *flow, int flag -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - , char *logbuf -#endif -) -{ - int ret = 0; - hash_t h; - struct Flow *flown, **flowpp; -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - char buf[64]; -#endif - - h = hash_flow(flow); -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - sprintf(buf, " %x H:%04x", (unsigned) flow, h); - strcat(logbuf, buf); -#endif - pthread_mutex_lock(&flows_mutex[h]); - flowpp = &flows[h]; - if (!(flown = find(flows[h], flow, &flowpp))) { - /* No suitable flow found - add */ - if (flag == COPY_INTO) { - if ((flown = mem_alloc())) { - copy_flow(flow, flown); - flow = flown; - } else { -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES - my_log(LOG_ERR, "%s %s. %s", - "mem_alloc():", strerror(errno), "packet lost"); -#endif - return -1; - } - } - flow->next = flows[h]; - flows[h] = flow; -#if ((DEBUG) & DEBUG_I) - flows_total++; - if (flow->flags & FLOW_FRAG) flows_fragmented++; -#endif -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - if (flown) { - sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags); - strcat(logbuf, buf); - } -#endif - } else { - /* Found suitable flow - update */ -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - sprintf(buf, " +> %x", (unsigned) flown); - strcat(logbuf, buf); -#endif - if (cmpmtime(&flow->mtime, &flown->mtime) > 0) - flown->mtime = flow->mtime; - if (cmpmtime(&flow->ctime, &flown->ctime) < 0) - flown->ctime = flow->ctime; - flown->tcp_flags |= flow->tcp_flags; - flown->size += flow->size; - flown->pkts += flow->pkts; - if (flow->flags & FLOW_FRAG) { - /* Fragmented flow require some additional work */ - if (flow->flags & FLOW_TL) { - /* - ?FIXME? - Several packets with FLOW_TL (attack) - */ - flown->sp = flow->sp; - flown->dp = flow->dp; - } - if (flow->flags & FLOW_LASTFRAG) { - /* - ?FIXME? - Several packets with FLOW_LASTFRAG (attack) - */ - flown->sizeP = flow->sizeP; - } - flown->flags |= flow->flags; - flown->sizeF += flow->sizeF; - if ((flown->flags & FLOW_LASTFRAG) - && (flown->sizeF >= flown->sizeP)) { - /* All fragments received - flow reassembled */ - *flowpp = flown->next; - pthread_mutex_unlock(&flows_mutex[h]); -#if ((DEBUG) & DEBUG_I) - flows_total--; - flows_fragmented--; -#endif - flown->id = 0; - flown->flags &= ~FLOW_FRAG; -#if ((DEBUG) & (DEBUG_U | DEBUG_S)) - strcat(logbuf," R"); -#endif - ret = put_into(flown, MOVE_INTO -#if ((DEBUG) & (DEBUG_U | DEBUG_S)) - , logbuf -#endif - ); - } - } - if (flag == MOVE_INTO) mem_free(flow); - } - pthread_mutex_unlock(&flows_mutex[h]); - return ret; -} - -void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) -{ - int i; - - for (i = 0; i < fields; i++) { -#if ((DEBUG) & DEBUG_F) - my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p); -#endif - switch (format[i]) { - case NETFLOW_IPV4_SRC_ADDR: - ((struct in_addr *) p)->s_addr = flow->sip.s_addr; - p += NETFLOW_IPV4_SRC_ADDR_SIZE; - break; - - case NETFLOW_IPV4_DST_ADDR: - ((struct in_addr *) p)->s_addr = flow->dip.s_addr; - p += NETFLOW_IPV4_DST_ADDR_SIZE; - break; - - case NETFLOW_INPUT_SNMP: - *((uint16_t *) p) = htons(flow->iif); - p += NETFLOW_INPUT_SNMP_SIZE; - break; - - case NETFLOW_OUTPUT_SNMP: - *((uint16_t *) p) = htons(flow->oif); - p += NETFLOW_OUTPUT_SNMP_SIZE; - break; - - case NETFLOW_PKTS_32: - *((uint32_t *) p) = htonl(flow->pkts); - p += NETFLOW_PKTS_32_SIZE; - break; - - case NETFLOW_BYTES_32: - *((uint32_t *) p) = htonl(flow->size); - p += NETFLOW_BYTES_32_SIZE; - break; - - case NETFLOW_FIRST_SWITCHED: - *((uint32_t *) p) = htonl(getuptime(&flow->ctime)); - p += NETFLOW_FIRST_SWITCHED_SIZE; - break; - - case NETFLOW_LAST_SWITCHED: - *((uint32_t *) p) = htonl(getuptime(&flow->mtime)); - p += NETFLOW_LAST_SWITCHED_SIZE; - break; - - case NETFLOW_L4_SRC_PORT: - *((uint16_t *) p) = flow->sp; - p += NETFLOW_L4_SRC_PORT_SIZE; - break; - - case NETFLOW_L4_DST_PORT: - *((uint16_t *) p) = flow->dp; - p += NETFLOW_L4_DST_PORT_SIZE; - break; - - case NETFLOW_PROT: - *((uint8_t *) p) = flow->proto; - p += NETFLOW_PROT_SIZE; - break; - - case NETFLOW_SRC_TOS: - *((uint8_t *) p) = flow->tos; - p += NETFLOW_SRC_TOS_SIZE; - break; - - case NETFLOW_TCP_FLAGS: - *((uint8_t *) p) = flow->tcp_flags; - p += NETFLOW_TCP_FLAGS_SIZE; - break; - - case NETFLOW_VERSION: - *((uint16_t *) p) = htons(netflow->Version); - p += NETFLOW_VERSION_SIZE; - break; - - case NETFLOW_COUNT: - *((uint16_t *) p) = htons(emit_count); - p += NETFLOW_COUNT_SIZE; - break; - - case NETFLOW_UPTIME: - *((uint32_t *) p) = htonl(getuptime(&emit_time)); - p += NETFLOW_UPTIME_SIZE; - break; - - case NETFLOW_UNIX_SECS: - *((uint32_t *) p) = htonl(emit_time.sec); - p += NETFLOW_UNIX_SECS_SIZE; - break; - - case NETFLOW_UNIX_NSECS: - *((uint32_t *) p) = htonl(emit_time.usec * 1000); - p += NETFLOW_UNIX_NSECS_SIZE; - break; - - case NETFLOW_FLOW_SEQUENCE: - //*((uint32_t *) p) = htonl(emit_sequence); - *((uint32_t *) p) = 0; - p += NETFLOW_FLOW_SEQUENCE_SIZE; - break; - - case NETFLOW_PAD8: - /* Unsupported (uint8_t) */ - case NETFLOW_ENGINE_TYPE: - case NETFLOW_ENGINE_ID: - case NETFLOW_FLAGS7_1: - case NETFLOW_SRC_MASK: - case NETFLOW_DST_MASK: - *((uint8_t *) p) = 0; - p += NETFLOW_PAD8_SIZE; - break; - - case NETFLOW_PAD16: - /* Unsupported (uint16_t) */ - case NETFLOW_SRC_AS: - case NETFLOW_DST_AS: - case NETFLOW_FLAGS7_2: - *((uint16_t *) p) = 0; - p += NETFLOW_PAD16_SIZE; - break; - - case NETFLOW_PAD32: - /* Unsupported (uint32_t) */ - case NETFLOW_IPV4_NEXT_HOP: - case NETFLOW_ROUTER_SC: - *((uint32_t *) p) = 0; - p += NETFLOW_PAD32_SIZE; - break; - - default: - my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d", - format, i, format[i]); - exit(1); - } - } -#if ((DEBUG) & DEBUG_F) - my_log(LOG_DEBUG, "F: return %x", (unsigned) p); -#endif - return p; -} - -void setuser() { - /* - Workaround for clone()-based threads - Try to change EUID independently of main thread - */ - if (pw) { - setgroups(0, NULL); - setregid(pw->pw_gid, pw->pw_gid); - setreuid(pw->pw_uid, pw->pw_uid); - } -} - -void *emit_thread() -{ - struct Flow *flow; - void *p; - struct timeval now; - struct timespec timeout; - int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0; - - p = (void *) &emit_packet + netflow->HeaderSize; - timeout.tv_nsec = 0; - - setuser(); - - for (;;) { - pthread_mutex_lock(&emit_mutex); - while (!flows_emit) { - gettimeofday(&now, 0); - timeout.tv_sec = now.tv_sec + emit_timeout; - /* Do not wait until emit_packet will filled - it may be too long */ - if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { - pthread_mutex_unlock(&emit_mutex); - goto sendit; - } - } - flow = flows_emit; - flows_emit = flows_emit->next; -#if ((DEBUG) & DEBUG_I) - emit_queue--; -#endif - pthread_mutex_unlock(&emit_mutex); - -#ifdef UPTIME_TRICK - if (!emit_count) { - gettime(&start_time); - start_time.sec -= start_time_offset; - } -#endif - p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p); - mem_free(flow); - emit_count++; - if (emit_count == netflow->MaxFlows) { - sendit: - gettime(&emit_time); - p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet); - size = netflow->HeaderSize + emit_count * netflow->FlowSize; - peer_rot_cur = 0; - for (i = 0; i < npeers; i++) { - if (peers[i].type == PEER_FILE) { - printf( "Here\n"); - if (netflow->SeqOffset) - *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq); -#define MESSAGES - ret = write(peers[i].write_fd, emit_packet, size); - if (ret < size) { -#if ((DEBUG) & DEBUG_E) || defined MESSAGES - my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s", - i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); -#endif -#undef MESSAGES - } -#if ((DEBUG) & DEBUG_E) - commaneelse { - my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", - emit_count, i + 1, peers[i].seq); - } -#endif - peers[i].seq += emit_count; - - /* Rate limit */ - if (emit_rate_bytes) { - sent += size; - delay = sent / emit_rate_bytes; - if (delay) { - sent %= emit_rate_bytes; - timeout.tv_sec = 0; - timeout.tv_nsec = emit_rate_delay * delay; - while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); - } - } - } - else - if (peers[i].type == PEER_MIRROR) goto sendreal; - else - if (peers[i].type == PEER_ROTATE) - if (peer_rot_cur++ == peer_rot_work) { - sendreal: - if (netflow->SeqOffset) - *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq); - ret = send(peers[i].write_fd, emit_packet, size, 0); - if (ret < size) { -#if ((DEBUG) & DEBUG_E) || defined MESSAGES - my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s", - i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); -#endif - } -#if ((DEBUG) & DEBUG_E) - commaneelse { - my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", - emit_count, i + 1, peers[i].seq); - } -#endif - peers[i].seq += emit_count; - - /* Rate limit */ - if (emit_rate_bytes) { - sent += size; - delay = sent / emit_rate_bytes; - if (delay) { - sent %= emit_rate_bytes; - timeout.tv_sec = 0; - timeout.tv_nsec = emit_rate_delay * delay; - while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); - } - } - } - } - if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot; - emit_sequence += emit_count; - emit_count = 0; -#if ((DEBUG) & DEBUG_I) - emit_pkts++; -#endif - } - } -} - -void *unpending_thread() -{ - struct timeval now; - struct timespec timeout; -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - char logbuf[256]; -#endif - - setuser(); - - timeout.tv_nsec = 0; - pthread_mutex_lock(&unpending_mutex); - - for (;;) { - while (!(pending_tail->flags & FLOW_PENDING)) { - gettimeofday(&now, 0); - timeout.tv_sec = now.tv_sec + unpending_timeout; - pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout); - } - -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - *logbuf = 0; -#endif - if (put_into(pending_tail, COPY_INTO -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - , logbuf -#endif - ) < 0) { -#if ((DEBUG) & DEBUG_I) - pkts_lost_unpending++; -#endif - } - -#if ((DEBUG) & DEBUG_U) - my_log(LOG_DEBUG, "%s%s", "U:", logbuf); -#endif - - pending_tail->flags = 0; - pending_tail = pending_tail->next; -#if ((DEBUG) & DEBUG_I) - pkts_pending_done++; -#endif - } -} - -void *scan_thread() -{ -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - char logbuf[256]; -#endif - int i; - struct Flow *flow, **flowpp; - struct Time now; - struct timespec timeout; - - setuser(); - - timeout.tv_nsec = 0; - pthread_mutex_lock(&scan_mutex); - - for (;;) { - gettime(&now); - timeout.tv_sec = now.sec + scan_interval; - pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout); - - gettime(&now); -#if ((DEBUG) & DEBUG_S) - my_log(LOG_DEBUG, "S: %d", now.sec); -#endif - for (i = 0; i < 1 << HASH_BITS ; i++) { - pthread_mutex_lock(&flows_mutex[i]); - flow = flows[i]; - flowpp = &flows[i]; - while (flow) { - if (flow->flags & FLOW_FRAG) { - /* Process fragmented flow */ - if ((now.sec - flow->mtime.sec) > frag_lifetime) { - /* Fragmented flow expired - put it into special chain */ -#if ((DEBUG) & DEBUG_I) - flows_fragmented--; - flows_total--; -#endif - *flowpp = flow->next; - flow->id = 0; - flow->flags &= ~FLOW_FRAG; - flow->next = scan_frag_dreg; - scan_frag_dreg = flow; - flow = *flowpp; - continue; - } - } else { - /* Flow is not frgamented */ - if ((now.sec - flow->mtime.sec) > inactive_lifetime - || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) { - /* Flow expired */ -#if ((DEBUG) & DEBUG_S) - my_log(LOG_DEBUG, "S: E %x", flow); -#endif -#if ((DEBUG) & DEBUG_I) - flows_total--; -#endif - *flowpp = flow->next; - pthread_mutex_lock(&emit_mutex); - flow->next = flows_emit; - flows_emit = flow; -#if ((DEBUG) & DEBUG_I) - emit_queue++; -#endif - pthread_mutex_unlock(&emit_mutex); - flow = *flowpp; - continue; - } - } - flowpp = &flow->next; - flow = flow->next; - } /* chain loop */ - pthread_mutex_unlock(&flows_mutex[i]); - } /* hash loop */ - if (flows_emit) pthread_cond_signal(&emit_cond); - - while (scan_frag_dreg) { - flow = scan_frag_dreg; - scan_frag_dreg = flow->next; -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - *logbuf = 0; -#endif - put_into(flow, MOVE_INTO -#if ((DEBUG) & (DEBUG_S | DEBUG_U)) - , logbuf -#endif - ); -#if ((DEBUG) & DEBUG_S) - my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf); -#endif - } - } -} - -void *cap_thread() -{ - struct ulog_packet_msg *ulog_msg; - struct ip *nl; - void *tl; - struct Flow *flow; - int len, off_frag, psize; -#if ((DEBUG) & DEBUG_C) - char buf[64]; - char logbuf[256]; -#endif - - setuser(); - - while (!killed) { - len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1); - if (len <= 0) { - my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno)); - continue; - } - while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) { - -#if ((DEBUG) & DEBUG_C) - sprintf(logbuf, "C: %d", ulog_msg->data_len); -#endif - - nl = (void *) &ulog_msg->payload; - psize = ulog_msg->data_len; - - /* Sanity check */ - if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) { -#if ((DEBUG) & DEBUG_C) - strcat(logbuf, " U"); - my_log(LOG_DEBUG, "%s", logbuf); -#endif -#if ((DEBUG) & DEBUG_I) - pkts_ignored++; -#endif - continue; - } - - if (pending_head->flags) { -#if ((DEBUG) & DEBUG_C) || defined MESSAGES - my_log(LOG_ERR, -# if ((DEBUG) & DEBUG_C) - "%s %s %s", logbuf, -# else - "%s %s", -# endif - "pending queue full:", "packet lost"); -#endif -#if ((DEBUG) & DEBUG_I) - pkts_lost_capture++; -#endif - goto done; - } - -#if ((DEBUG) & DEBUG_I) - pkts_total++; -#endif - - flow = pending_head; - - /* ?FIXME? Add sanity check for ip_len? */ - flow->size = ntohs(nl->ip_len); -#if ((DEBUG) & DEBUG_I) - size_total += flow->size; -#endif - - flow->sip = nl->ip_src; - flow->dip = nl->ip_dst; - flow->iif = snmp_index(ulog_msg->indev_name); - flow->oif = snmp_index(ulog_msg->outdev_name); - flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; - flow->proto = nl->ip_p; - flow->id = 0; - flow->tcp_flags = 0; - flow->pkts = 1; - flow->sizeF = 0; - flow->sizeP = 0; - /* Packets captured from OUTPUT table didn't contains valid timestamp */ - if (ulog_msg->timestamp_sec) { - flow->ctime.sec = ulog_msg->timestamp_sec; - flow->ctime.usec = ulog_msg->timestamp_usec; - } else gettime(&flow->ctime); - flow->mtime = flow->ctime; - - off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3; - - /* - Offset (from network layer) to transport layer header/IP data - IOW IP header size ;-) - - ?FIXME? - Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks... - */ - off_tl = nl->ip_hl << 2; - tl = (void *) nl + off_tl; - - /* THIS packet data size: data_size = total_size - ip_header_size*4 */ - flow->sizeF = ntohs(nl->ip_len) - off_tl; - psize -= off_tl; - if ((signed) flow->sizeF < 0) flow->sizeF = 0; - if (psize > (signed) flow->sizeF) psize = flow->sizeF; - - if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) { - /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */ -#if ((DEBUG) & DEBUG_C) - strcat(logbuf, " F"); -#endif -#if ((DEBUG) & DEBUG_I) - pkts_total_fragmented++; -#endif - flow->flags |= FLOW_FRAG; - flow->id = nl->ip_id; - - if (!(ntohs(nl->ip_off) & IP_MF)) { - /* Packet whith IP_MF contains information about whole datagram size */ - flow->flags |= FLOW_LASTFRAG; - /* size = frag_offset*8 + data_size */ - flow->sizeP = off_frag + flow->sizeF; - } - } - -#if ((DEBUG) & DEBUG_C) - sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif); - strcat(logbuf, buf); - sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto); - strcat(logbuf, buf); -#endif - - /* - Fortunately most interesting transport layer information fit - into first 8 bytes of IP data field (minimal nonzero size). - Thus we don't need actual packet reassembling to build whole - transport layer data. We only check the fragment offset for - zero value to find packet with this information. - */ - if (!off_frag && psize >= 8) { - switch (flow->proto) { - case IPPROTO_TCP: - case IPPROTO_UDP: - flow->sp = ((struct udphdr *)tl)->uh_sport; - flow->dp = ((struct udphdr *)tl)->uh_dport; - goto tl_known; - -#ifdef ICMP_TRICK - case IPPROTO_ICMP: - flow->sp = htons(((struct icmp *)tl)->icmp_type); - flow->dp = htons(((struct icmp *)tl)->icmp_code); - goto tl_known; -#endif -#ifdef ICMP_TRICK_CISCO - case IPPROTO_ICMP: - flow->dp = *((int32_t *) tl); - goto tl_known; -#endif - - default: - /* Unknown transport layer */ -#if ((DEBUG) & DEBUG_C) - strcat(logbuf, " U"); -#endif - flow->sp = 0; - flow->dp = 0; - break; - - tl_known: -#if ((DEBUG) & DEBUG_C) - sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp)); - strcat(logbuf, buf); -#endif - flow->flags |= FLOW_TL; - } - } - - /* Check for tcp flags presence (including CWR and ECE). */ - if (flow->proto == IPPROTO_TCP - && off_frag < 16 - && psize >= 16 - off_frag) { - flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag)); -#if ((DEBUG) & DEBUG_C) - sprintf(buf, " TCP:%x", flow->tcp_flags); - strcat(logbuf, buf); -#endif - } - -#if ((DEBUG) & DEBUG_C) - sprintf(buf, " => %x", (unsigned) flow); - strcat(logbuf, buf); - my_log(LOG_DEBUG, "%s", logbuf); -#endif - -#if ((DEBUG) & DEBUG_I) - pkts_pending++; - pending_queue_trace_candidate = pkts_pending - pkts_pending_done; - if (pending_queue_trace < pending_queue_trace_candidate) - pending_queue_trace = pending_queue_trace_candidate; -#endif - - /* Flow complete - inform unpending_thread() about it */ - pending_head->flags |= FLOW_PENDING; - pending_head = pending_head->next; - done: - pthread_cond_signal(&unpending_cond); - } - } - return 0; -} - -int main(int argc, char **argv) -{ - char errpbuf[512]; - char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule; - int c, i, write_fd, memory_limit = 0; - struct addrinfo hints, *res; - struct sockaddr_in saddr; - pthread_attr_t tattr; - struct sigaction sigact; - static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread}; - struct timeval timeout; - - sched_min = sched_get_priority_min(SCHED); - sched_max = sched_get_priority_max(SCHED); - - memset(&saddr, 0 , sizeof(saddr)); - memset(&hints, 0 , sizeof(hints)); - hints.ai_flags = AI_PASSIVE; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - - /* Process command line options */ - - opterr = 0; - while ((c = my_getopt(argc, argv, parms)) != -1) { - switch (c) { - case '?': - usage(); - - case 'h': - usage(); - } - } - - if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg); - if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg); - if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg); - if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg); - if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg); - if (parms[nflag].count) { - switch (atoi(parms[nflag].arg)) { - case 1: - netflow = &NetFlow1; - break; - - case 5: - break; - - case 7: - netflow = &NetFlow7; - break; - - default: - fprintf(stderr, "Illegal %s\n", "NetFlow version"); - exit(1); - } - } - if (parms[vflag].count) verbosity = atoi(parms[vflag].arg); - if (parms[lflag].count) { - if ((log_suffix = strchr(parms[lflag].arg, ':'))) { - *log_suffix++ = 0; - if (*log_suffix) { - sprintf(errpbuf, "[%s]", log_suffix); - strcat(ident, errpbuf); - } - } - if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg); - if (log_suffix) *--log_suffix = ':'; - } - if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) { - err_malloc: - fprintf(stderr, "malloc(): %s\n", strerror(errno)); - exit(1); - } - sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident); - if (parms[qflag].count) { - pending_queue_length = atoi(parms[qflag].arg); - if (pending_queue_length < 1) { - fprintf(stderr, "Illegal %s\n", "pending queue length"); - exit(1); - } - } - if (parms[rflag].count) { - schedp.sched_priority = atoi(parms[rflag].arg); - if (schedp.sched_priority - && (schedp.sched_priority < sched_min - || schedp.sched_priority > sched_max)) { - fprintf(stderr, "Illegal %s\n", "realtime priority"); - exit(1); - } - } - if (parms[Bflag].count) { - sockbufsize = atoi(parms[Bflag].arg) << 10; - } - if (parms[bflag].count) { - bulk_quantity = atoi(parms[bflag].arg); - if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) { - fprintf(stderr, "Illegal %s\n", "bulk size"); - exit(1); - } - } - if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10; - if (parms[Xflag].count) { - for(i = 0; parms[Xflag].arg[i]; i++) - if (parms[Xflag].arg[i] == ':') nsnmp_rules++; - if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule)))) - goto err_malloc; - rule = strtok(parms[Xflag].arg, ":"); - for (i = 0; rule; i++) { - snmp_rules[i].len = strlen(rule); - if (snmp_rules[i].len > IFNAMSIZ) { - fprintf(stderr, "Illegal %s\n", "interface basename"); - exit(1); - } - strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len); - if (!*(rule - 1)) *(rule - 1) = ','; - rule = strtok(NULL, ","); - if (!rule) { - fprintf(stderr, "Illegal %s\n", "SNMP rule"); - exit(1); - } - snmp_rules[i].base = atoi(rule); - *(rule - 1) = ':'; - rule = strtok(NULL, ":"); - } - nsnmp_rules = i; - } - if (parms[tflag].count) - sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay); - if (parms[aflag].count) { - if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) { - bad_lhost: - fprintf(stderr, "Illegal %s\n", "source address"); - exit(1); - } else { - saddr = *((struct sockaddr_in *) res->ai_addr); - freeaddrinfo(res); - } - } - if (parms[uflag].count) - if ((pw = getpwnam(parms[uflag].arg)) == NULL) { - fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user"); - exit(1); - } - - - /* Process collectors parameters. Brrrr... :-[ */ - - npeers = argc - optind; - if (npeers > 1) { - /* Send to remote Netflow collector */ - if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; - for (i = optind, npeers = 0; i < argc; i++, npeers++) { - dhost = argv[i]; - if (!(dport = strchr(dhost, ':'))) goto bad_collector; - *dport++ = 0; - if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - fprintf(stderr, "socket(): %s\n", strerror(errno)); - exit(1); - } - peers[npeers].write_fd = write_fd; - peers[npeers].type = PEER_MIRROR; - peers[npeers].laddr = saddr; - peers[npeers].seq = 0; - if ((lhost = strchr(dport, '/'))) { - *lhost++ = 0; - if ((type = strchr(lhost, '/'))) { - *type++ = 0; - switch (*type) { - case 0: - case 'm': - break; - - case 'r': - peers[npeers].type = PEER_ROTATE; - npeers_rot++; - break; - - default: - goto bad_collector; - } - } - if (*lhost) { - if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost; - peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr); - freeaddrinfo(res); - } - } - if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr, - sizeof(struct sockaddr_in))) { - fprintf(stderr, "bind(): %s\n", strerror(errno)); - exit(1); - } - if (getaddrinfo(dhost, dport, &hints, &res)) { -bad_collector: - fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1); - exit(1); - } - peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr); - freeaddrinfo(res); - if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr, - sizeof(struct sockaddr_in))) { - fprintf(stderr, "connect(): %s\n", strerror(errno)); - exit(1); - } - - /* Restore command line */ - if (type) *--type = '/'; - if (lhost) *--lhost = '/'; - *--dport = ':'; - } - } - else if (parms[fflag].count) { - // log into a file - char *fname; - if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; - fname = parms[fflag].arg; - if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) { - fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno)); - exit(1); - } - peers[0].write_fd = write_fd; - peers[0].type = PEER_FILE; - peers[0].seq = 0; - npeers++; - } - else - usage(); - - - if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc; - ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE); - if (!ulog_handle) { - fprintf(stderr, "libipulog initialization error: %s", - ipulog_strerror(ipulog_errno)); - exit(1); - } - if (sockbufsize) - if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF, - &sockbufsize, sizeof(sockbufsize)) < 0) - fprintf(stderr, "setsockopt(): %s", strerror(errno)); - - /* Daemonize (if log destination stdout-free) */ - - my_log_open(ident, verbosity, log_dest); - if (!(log_dest & 2)) { - switch (fork()) { - case -1: - fprintf(stderr, "fork(): %s", strerror(errno)); - exit(1); - - case 0: - setsid(); - freopen("/dev/null", "r", stdin); - freopen("/dev/null", "w", stdout); - freopen("/dev/null", "w", stderr); - break; - - default: - exit(0); - } - } else { - setvbuf(stdout, (char *)0, _IONBF, 0); - setvbuf(stderr, (char *)0, _IONBF, 0); - } - - pid = getpid(); - sprintf(errpbuf, "[%ld]", (long) pid); - strcat(ident, errpbuf); - - /* Initialization */ - - hash_init(); /* Actually for crc16 only */ - mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); - for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0); - -#ifdef UPTIME_TRICK - /* Hope 12 days is enough :-/ */ - start_time_offset = 1 << 20; - - /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */ -#endif - gettime(&start_time); - - /* - Build static pending queue as circular buffer. - */ - if (!(pending_head = mem_alloc())) goto err_mem_alloc; - pending_tail = pending_head; - for (i = pending_queue_length - 1; i--;) { - if (!(pending_tail->next = mem_alloc())) { - err_mem_alloc: - my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno)); - exit(1); - } - pending_tail = pending_tail->next; - } - pending_tail->next = pending_head; - pending_tail = pending_head; - - sigemptyset(&sig_mask); - sigact.sa_handler = &sighandler; - sigact.sa_mask = sig_mask; - sigact.sa_flags = 0; - sigaddset(&sig_mask, SIGTERM); - sigaction(SIGTERM, &sigact, 0); -#if ((DEBUG) & DEBUG_I) - sigaddset(&sig_mask, SIGUSR1); - sigaction(SIGUSR1, &sigact, 0); -#endif - if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) { - my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno)); - exit(1); - } - - my_log(LOG_INFO, "Starting %s...", VERSION); - - if (parms[cflag].count) { - if (chdir(parms[cflag].arg) || chroot(".")) { - my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno)); - exit(1); - } - } - - schedp.sched_priority = schedp.sched_priority - THREADS + 2; - pthread_attr_init(&tattr); - for (i = 0; i < THREADS - 1; i++) { - if (schedp.sched_priority > 0) { - if ((pthread_attr_setschedpolicy(&tattr, SCHED)) || - (pthread_attr_setschedparam(&tattr, &schedp))) { - my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno)); - exit(1); - } - } - if (pthread_create(&thid, &tattr, threads[i], 0)) { - my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno)); - exit(1); - } - pthread_detach(thid); - schedp.sched_priority++; - } - - if (pw) { - if (setgroups(0, NULL)) { - my_log(LOG_CRIT, "setgroups(): %s", strerror(errno)); - exit(1); - } - if (setregid(pw->pw_gid, pw->pw_gid)) { - my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno)); - exit(1); - } - if (setreuid(pw->pw_uid, pw->pw_uid)) { - my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno)); - exit(1); - } - } - - if (!(pidfile = fopen(pidfilepath, "w"))) - my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno)); - else { - fprintf(pidfile, "%ld\n", (long) pid); - fclose(pidfile); - } - - my_log(LOG_INFO, "pid: %d", pid); - my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s " - "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s", - ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime, - netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity, - memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1, - emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "", - parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : ""); - for (i = 0; i < nsnmp_rules; i++) { - my_log(LOG_INFO, "SNMP rule #%d %s:%d", - i + 1, snmp_rules[i].basename, snmp_rules[i].base); - } - for (i = 0; i < npeers; i++) { - switch (peers[i].type) { - case PEER_MIRROR: - c = 'm'; - break; - case PEER_ROTATE: - c = 'r'; - break; - } - snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr)); - my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1, - inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c); - } - - pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0); - - timeout.tv_usec = 0; - while (!killed - || (total_elements - free_elements - pending_queue_length) - || emit_count - || pending_tail->flags) { - - if (!sigs) { - timeout.tv_sec = scan_interval; - select(0, 0, 0, 0, &timeout); - } - - if (sigs & SIGTERM_MASK && !killed) { - sigs &= ~SIGTERM_MASK; - my_log(LOG_INFO, "SIGTERM received. Emitting flows cache..."); - scan_interval = 1; - frag_lifetime = -1; - active_lifetime = -1; - inactive_lifetime = -1; - emit_timeout = 1; - unpending_timeout = 1; - killed = 1; - pthread_cond_signal(&scan_cond); - pthread_cond_signal(&unpending_cond); - } - -#if ((DEBUG) & DEBUG_I) - if (sigs & SIGUSR1_MASK) { - sigs &= ~SIGUSR1_MASK; - info_debug(); - } -#endif - } - remove(pidfilepath); -#if ((DEBUG) & DEBUG_I) - info_debug(); -#endif - my_log(LOG_INFO, "Done."); -#ifdef WALL - return 0; -#endif -}