X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=trunk%2Fsrc%2Ffprobe-ulog.c.old;fp=trunk%2Fsrc%2Ffprobe-ulog.c.old;h=7e8947d1b3f8d65c06398ca9901809f98048c1b8;hb=85718e4dcaf5f34496f629e45a47ec91145c6f9e;hp=0000000000000000000000000000000000000000;hpb=abb2bffe08424d2d7e612f423815aeb7c79b42de;p=iptables.git diff --git a/trunk/src/fprobe-ulog.c.old b/trunk/src/fprobe-ulog.c.old new file mode 100644 index 0000000..7e8947d --- /dev/null +++ b/trunk/src/fprobe-ulog.c.old @@ -0,0 +1,1486 @@ +/* + Copyright (C) Slava Astashonok + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License. + + $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $ +*/ + +#include + +/* stdout, stderr, freopen() */ +#include + +/* atoi(), exit() */ +#include + +/* getopt(), alarm(), getpid(), sedsid(), chdir() */ +#include + +/* strerror() */ +#include + +/* sig*() */ +#include + +#include +struct ipulog_handle { + int fd; + u_int8_t blocking; + struct sockaddr_nl local; + struct sockaddr_nl peer; + struct nlmsghdr* last_nlhdr; +}; + +/* inet_*() (Linux, FreeBSD, Solaris), getpid() */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#ifdef OS_LINUX +#include +#endif + +/* pthread_*() */ +#include + +/* errno */ +#include + +/* getaddrinfo() */ +#include + +/* nanosleep() */ +#include + +/* gettimeofday() */ +#include + +/* scheduling */ +#include + +/* select() (POSIX)*/ +#include + +/* open() */ +#include +#include + +#include +#include +#include +#include +#include +#include + +enum { + aflag, + Bflag, + bflag, + cflag, + dflag, + eflag, + gflag, + hflag, + lflag, + mflag, + Mflag, + nflag, + qflag, + rflag, + sflag, + tflag, + Uflag, + uflag, + vflag, + Xflag, +}; + +static struct getopt_parms parms[] = { + {'a', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'B', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'b', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'c', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'d', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'e', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'g', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'h', 0, 0, 0}, + {'l', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'m', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'M', 0, 0, 0}, + {'n', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'q', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'r', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'s', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'t', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'U', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'u', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'v', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'X', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {0, 0, 0, 0} +}; + +extern char *optarg; +extern int optind, opterr, optopt; +extern int errno; + +extern struct NetFlow NetFlow1; +extern struct NetFlow NetFlow5; +extern struct NetFlow NetFlow7; + +#define mark_is_tos parms[Mflag].count +static unsigned scan_interval = 5; +static int frag_lifetime = 30; +static int inactive_lifetime = 60; +static int active_lifetime = 300; +static int sockbufsize; +#define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1) +#if (MEM_BITS == 0) || (MEM_BITS == 16) +#define BULK_QUANTITY 10000 +#else +#define BULK_QUANTITY 200 +#endif +static unsigned bulk_quantity = BULK_QUANTITY; +static unsigned pending_queue_length = 100; +static struct NetFlow *netflow = &NetFlow5; +static unsigned verbosity = 6; +static unsigned log_dest = MY_LOG_SYSLOG; +static struct Time start_time; +static long start_time_offset; +static int off_tl; +/* From mem.c */ +extern unsigned total_elements; +extern unsigned free_elements; +extern unsigned total_memory; +#if ((DEBUG) & DEBUG_I) +static unsigned emit_pkts, emit_queue; +static uint64_t size_total; +static unsigned pkts_total, pkts_total_fragmented; +static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending; +static unsigned pkts_pending, pkts_pending_done; +static unsigned pending_queue_trace, pending_queue_trace_candidate; +static unsigned flows_total, flows_fragmented; +#endif +static unsigned emit_count; +static uint32_t emit_sequence; +static unsigned emit_rate_bytes, emit_rate_delay; +static struct Time emit_time; +static uint8_t emit_packet[NETFLOW_MAX_PACKET]; +static pthread_t thid; +static sigset_t sig_mask; +static struct sched_param schedp; +static int sched_min, sched_max; +static int npeers, npeers_rot; +static struct peer *peers; +static int sigs; + +static struct Flow *flows[1 << HASH_BITS]; +static pthread_mutex_t flows_mutex[1 << HASH_BITS]; + +static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER; + +static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER; +static struct Flow *pending_head, *pending_tail; +static struct Flow *scan_frag_dreg; + +static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER; +static struct Flow *flows_emit; + +static char ident[256] = "fprobe-ulog"; +static FILE *pidfile; +static char *pidfilepath; +static pid_t pid; +static int killed; +static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT; +static struct ipulog_handle *ulog_handle; +static uint32_t ulog_gmask = 1; +static char *cap_buf; +static int nsnmp_rules; +static struct snmp_rule *snmp_rules; +static struct passwd *pw = 0; + +void usage() +{ + fprintf(stdout, + "fprobe-ulog: a NetFlow probe. Version %s\n" + "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n" + "\n" + "-h\t\tDisplay this help\n" + "-U \tULOG group bitwise mask [1]\n" + "-s \tHow often scan for expired flows [5]\n" + "-g \tFragmented flow lifetime [30]\n" + "-d \tIdle flow lifetime (inactive timer) [60]\n" + "-e \tActive flow lifetime (active timer) [300]\n" + "-n \tNetFlow version for use (1, 5 or 7) [5]\n" + "-a
\tUse
as source for NetFlow flow\n" + "-X \tInterface name to SNMP-index conversion rules\n" + "-M\t\tUse netfilter mark value as ToS flag\n" + "-b \tMemory bulk size (1..%u) [%u]\n" + "-m \tMemory limit (0=no limit) [0]\n" + "-q \tPending queue length [100]\n" + "-B \tKernel capture buffer size [0]\n" + "-r \tReal-time priority (0=disabled, %d..%d) [0]\n" + "-t \tProduce nanosecond delay after each bytes sent [0:0]\n" + "-c \tDirectory to chroot to\n" + "-u \tUser to run as\n" + "-v \tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n" + "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n" + "remote:port\tAddress of the NetFlow collector\n", + VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max); + exit(0); +} + +#if ((DEBUG) & DEBUG_I) +void info_debug() +{ + my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d", + pkts_total, pkts_total_fragmented, size_total, + pkts_pending - pkts_pending_done, pending_queue_trace); + my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d", + pkts_ignored, pkts_lost_capture, pkts_lost_unpending); + my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d", + flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue); + my_log(LOG_DEBUG, "I: memory:%d/%d (%d)", + total_elements, free_elements, total_memory); +} +#endif + +void sighandler(int sig) +{ + switch (sig) { + case SIGTERM: + sigs |= SIGTERM_MASK; + break; +#if ((DEBUG) & DEBUG_I) + case SIGUSR1: + sigs |= SIGUSR1_MASK; + break; +#endif + } +} + +void gettime(struct Time *now) +{ + struct timeval t; + + gettimeofday(&t, 0); + now->sec = t.tv_sec; + now->usec = t.tv_usec; +} + +inline time_t cmpmtime(struct Time *t1, struct Time *t2) +{ + return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000; +} + +/* Uptime in miliseconds */ +uint32_t getuptime(struct Time *t) +{ + /* Maximum uptime is about 49/2 days */ + return cmpmtime(t, &start_time); +} + +hash_t hash_flow(struct Flow *flow) +{ + if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F)); + else return hash(flow, sizeof(struct Flow_TL)); +} + +uint16_t snmp_index(char *name) { + uint32_t i; + + if (!*name) return 0; + + for (i = 0; (int) i < nsnmp_rules; i++) { + if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue; + return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base; + } + + if ((i = if_nametoindex(name))) return i; + + return -1; +} + +inline void copy_flow(struct Flow *src, struct Flow *dst) +{ + dst->iif = src->iif; + dst->oif = src->oif; + dst->sip = src->sip; + dst->dip = src->dip; + dst->tos = src->tos; + dst->proto = src->proto; + dst->tcp_flags = src->tcp_flags; + dst->id = src->id; + dst->sp = src->sp; + dst->dp = src->dp; + dst->pkts = src->pkts; + dst->size = src->size; + dst->sizeF = src->sizeF; + dst->sizeP = src->sizeP; + dst->ctime = src->ctime; + dst->mtime = src->mtime; + dst->flags = src->flags; +} + +struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev) +{ + struct Flow **flowpp; + +#ifdef WALL + flowpp = 0; +#endif + + if (prev) flowpp = *prev; + + while (where) { + if (where->sip.s_addr == what->sip.s_addr + && where->dip.s_addr == what->dip.s_addr + && where->proto == what->proto) { + switch ((what->flags + where->flags) & FLOW_FRAGMASK) { + case 0: + /* Both unfragmented */ + if ((what->sp == where->sp) + && (what->dp == where->dp)) goto done; + break; + case 2: + /* Both fragmented */ + if (where->id == what->id) goto done; + break; + } + } + flowpp = &where->next; + where = where->next; + } +done: + if (prev) *prev = flowpp; + return where; +} + +int put_into(struct Flow *flow, int flag +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + , char *logbuf +#endif +) +{ + int ret = 0; + hash_t h; + struct Flow *flown, **flowpp; +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + char buf[64]; +#endif + + h = hash_flow(flow); +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + sprintf(buf, " %x H:%04x", (unsigned) flow, h); + strcat(logbuf, buf); +#endif + pthread_mutex_lock(&flows_mutex[h]); + flowpp = &flows[h]; + if (!(flown = find(flows[h], flow, &flowpp))) { + /* No suitable flow found - add */ + if (flag == COPY_INTO) { + if ((flown = mem_alloc())) { + copy_flow(flow, flown); + flow = flown; + } else { +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES + my_log(LOG_ERR, "%s %s. %s", + "mem_alloc():", strerror(errno), "packet lost"); +#endif + return -1; + } + } + flow->next = flows[h]; + flows[h] = flow; +#if ((DEBUG) & DEBUG_I) + flows_total++; + if (flow->flags & FLOW_FRAG) flows_fragmented++; +#endif +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + if (flown) { + sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags); + strcat(logbuf, buf); + } +#endif + } else { + /* Found suitable flow - update */ +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + sprintf(buf, " +> %x", (unsigned) flown); + strcat(logbuf, buf); +#endif + if (cmpmtime(&flow->mtime, &flown->mtime) > 0) + flown->mtime = flow->mtime; + if (cmpmtime(&flow->ctime, &flown->ctime) < 0) + flown->ctime = flow->ctime; + flown->tcp_flags |= flow->tcp_flags; + flown->size += flow->size; + flown->pkts += flow->pkts; + if (flow->flags & FLOW_FRAG) { + /* Fragmented flow require some additional work */ + if (flow->flags & FLOW_TL) { + /* + ?FIXME? + Several packets with FLOW_TL (attack) + */ + flown->sp = flow->sp; + flown->dp = flow->dp; + } + if (flow->flags & FLOW_LASTFRAG) { + /* + ?FIXME? + Several packets with FLOW_LASTFRAG (attack) + */ + flown->sizeP = flow->sizeP; + } + flown->flags |= flow->flags; + flown->sizeF += flow->sizeF; + if ((flown->flags & FLOW_LASTFRAG) + && (flown->sizeF >= flown->sizeP)) { + /* All fragments received - flow reassembled */ + *flowpp = flown->next; + pthread_mutex_unlock(&flows_mutex[h]); +#if ((DEBUG) & DEBUG_I) + flows_total--; + flows_fragmented--; +#endif + flown->id = 0; + flown->flags &= ~FLOW_FRAG; +#if ((DEBUG) & (DEBUG_U | DEBUG_S)) + strcat(logbuf," R"); +#endif + ret = put_into(flown, MOVE_INTO +#if ((DEBUG) & (DEBUG_U | DEBUG_S)) + , logbuf +#endif + ); + } + } + if (flag == MOVE_INTO) mem_free(flow); + } + pthread_mutex_unlock(&flows_mutex[h]); + return ret; +} + +void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) +{ + int i; + + for (i = 0; i < fields; i++) { +#if ((DEBUG) & DEBUG_F) + my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p); +#endif + switch (format[i]) { + case NETFLOW_IPV4_SRC_ADDR: + ((struct in_addr *) p)->s_addr = flow->sip.s_addr; + p += NETFLOW_IPV4_SRC_ADDR_SIZE; + break; + + case NETFLOW_IPV4_DST_ADDR: + ((struct in_addr *) p)->s_addr = flow->dip.s_addr; + p += NETFLOW_IPV4_DST_ADDR_SIZE; + break; + + case NETFLOW_INPUT_SNMP: + *((uint16_t *) p) = htons(flow->iif); + p += NETFLOW_INPUT_SNMP_SIZE; + break; + + case NETFLOW_OUTPUT_SNMP: + *((uint16_t *) p) = htons(flow->oif); + p += NETFLOW_OUTPUT_SNMP_SIZE; + break; + + case NETFLOW_PKTS_32: + *((uint32_t *) p) = htonl(flow->pkts); + p += NETFLOW_PKTS_32_SIZE; + break; + + case NETFLOW_BYTES_32: + *((uint32_t *) p) = htonl(flow->size); + p += NETFLOW_BYTES_32_SIZE; + break; + + case NETFLOW_FIRST_SWITCHED: + *((uint32_t *) p) = htonl(getuptime(&flow->ctime)); + p += NETFLOW_FIRST_SWITCHED_SIZE; + break; + + case NETFLOW_LAST_SWITCHED: + *((uint32_t *) p) = htonl(getuptime(&flow->mtime)); + p += NETFLOW_LAST_SWITCHED_SIZE; + break; + + case NETFLOW_L4_SRC_PORT: + *((uint16_t *) p) = flow->sp; + p += NETFLOW_L4_SRC_PORT_SIZE; + break; + + case NETFLOW_L4_DST_PORT: + *((uint16_t *) p) = flow->dp; + p += NETFLOW_L4_DST_PORT_SIZE; + break; + + case NETFLOW_PROT: + *((uint8_t *) p) = flow->proto; + p += NETFLOW_PROT_SIZE; + break; + + case NETFLOW_SRC_TOS: + *((uint8_t *) p) = flow->tos; + p += NETFLOW_SRC_TOS_SIZE; + break; + + case NETFLOW_TCP_FLAGS: + *((uint8_t *) p) = flow->tcp_flags; + p += NETFLOW_TCP_FLAGS_SIZE; + break; + + case NETFLOW_VERSION: + *((uint16_t *) p) = htons(netflow->Version); + p += NETFLOW_VERSION_SIZE; + break; + + case NETFLOW_COUNT: + *((uint16_t *) p) = htons(emit_count); + p += NETFLOW_COUNT_SIZE; + break; + + case NETFLOW_UPTIME: + *((uint32_t *) p) = htonl(getuptime(&emit_time)); + p += NETFLOW_UPTIME_SIZE; + break; + + case NETFLOW_UNIX_SECS: + *((uint32_t *) p) = htonl(emit_time.sec); + p += NETFLOW_UNIX_SECS_SIZE; + break; + + case NETFLOW_UNIX_NSECS: + *((uint32_t *) p) = htonl(emit_time.usec * 1000); + p += NETFLOW_UNIX_NSECS_SIZE; + break; + + case NETFLOW_FLOW_SEQUENCE: + //*((uint32_t *) p) = htonl(emit_sequence); + *((uint32_t *) p) = 0; + p += NETFLOW_FLOW_SEQUENCE_SIZE; + break; + + case NETFLOW_PAD8: + /* Unsupported (uint8_t) */ + case NETFLOW_ENGINE_TYPE: + case NETFLOW_ENGINE_ID: + case NETFLOW_FLAGS7_1: + case NETFLOW_SRC_MASK: + case NETFLOW_DST_MASK: + *((uint8_t *) p) = 0; + p += NETFLOW_PAD8_SIZE; + break; + + case NETFLOW_PAD16: + /* Unsupported (uint16_t) */ + case NETFLOW_SRC_AS: + case NETFLOW_DST_AS: + case NETFLOW_FLAGS7_2: + *((uint16_t *) p) = 0; + p += NETFLOW_PAD16_SIZE; + break; + + case NETFLOW_PAD32: + /* Unsupported (uint32_t) */ + case NETFLOW_IPV4_NEXT_HOP: + case NETFLOW_ROUTER_SC: + *((uint32_t *) p) = 0; + p += NETFLOW_PAD32_SIZE; + break; + + default: + my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d", + format, i, format[i]); + exit(1); + } + } +#if ((DEBUG) & DEBUG_F) + my_log(LOG_DEBUG, "F: return %x", (unsigned) p); +#endif + return p; +} + +void setuser() { + /* + Workaround for clone()-based threads + Try to change EUID independently of main thread + */ + if (pw) { + setgroups(0, NULL); + setregid(pw->pw_gid, pw->pw_gid); + setreuid(pw->pw_uid, pw->pw_uid); + } +} + +void *emit_thread() +{ + struct Flow *flow; + void *p; + struct timeval now; + struct timespec timeout; + int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0; + + p = (void *) &emit_packet + netflow->HeaderSize; + timeout.tv_nsec = 0; + + setuser(); + + for (;;) { + pthread_mutex_lock(&emit_mutex); + while (!flows_emit) { + gettimeofday(&now, 0); + timeout.tv_sec = now.tv_sec + emit_timeout; + /* Do not wait until emit_packet will filled - it may be too long */ + if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { + pthread_mutex_unlock(&emit_mutex); + goto sendit; + } + } + flow = flows_emit; + flows_emit = flows_emit->next; +#if ((DEBUG) & DEBUG_I) + emit_queue--; +#endif + pthread_mutex_unlock(&emit_mutex); + +#ifdef UPTIME_TRICK + if (!emit_count) { + gettime(&start_time); + start_time.sec -= start_time_offset; + } +#endif + p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p); + mem_free(flow); + emit_count++; + if (emit_count == netflow->MaxFlows) { + sendit: + gettime(&emit_time); + p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet); + size = netflow->HeaderSize + emit_count * netflow->FlowSize; + peer_rot_cur = 0; + for (i = 0; i < npeers; i++) { + if (peers[i].type == PEER_MIRROR) goto sendreal; + if (peers[i].type == PEER_ROTATE) + if (peer_rot_cur++ == peer_rot_work) { + sendreal: + if (netflow->SeqOffset) + *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq); + ret = send(peers[i].sock, emit_packet, size, 0); + if (ret < size) { +#if ((DEBUG) & DEBUG_E) || defined MESSAGES + my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s", + i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); +#endif + } +#if ((DEBUG) & DEBUG_E) + commaneelse { + my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", + emit_count, i + 1, peers[i].seq); + } +#endif + peers[i].seq += emit_count; + + /* Rate limit */ + if (emit_rate_bytes) { + sent += size; + delay = sent / emit_rate_bytes; + if (delay) { + sent %= emit_rate_bytes; + timeout.tv_sec = 0; + timeout.tv_nsec = emit_rate_delay * delay; + while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); + } + } + } + } + if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot; + emit_sequence += emit_count; + emit_count = 0; +#if ((DEBUG) & DEBUG_I) + emit_pkts++; +#endif + } + } +} + +void *unpending_thread() +{ + struct timeval now; + struct timespec timeout; +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + char logbuf[256]; +#endif + + setuser(); + + timeout.tv_nsec = 0; + pthread_mutex_lock(&unpending_mutex); + + for (;;) { + while (!(pending_tail->flags & FLOW_PENDING)) { + gettimeofday(&now, 0); + timeout.tv_sec = now.tv_sec + unpending_timeout; + pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout); + } + +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + *logbuf = 0; +#endif + if (put_into(pending_tail, COPY_INTO +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + , logbuf +#endif + ) < 0) { +#if ((DEBUG) & DEBUG_I) + pkts_lost_unpending++; +#endif + } + +#if ((DEBUG) & DEBUG_U) + my_log(LOG_DEBUG, "%s%s", "U:", logbuf); +#endif + + pending_tail->flags = 0; + pending_tail = pending_tail->next; +#if ((DEBUG) & DEBUG_I) + pkts_pending_done++; +#endif + } +} + +void *scan_thread() +{ +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + char logbuf[256]; +#endif + int i; + struct Flow *flow, **flowpp; + struct Time now; + struct timespec timeout; + + setuser(); + + timeout.tv_nsec = 0; + pthread_mutex_lock(&scan_mutex); + + for (;;) { + gettime(&now); + timeout.tv_sec = now.sec + scan_interval; + pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout); + + gettime(&now); +#if ((DEBUG) & DEBUG_S) + my_log(LOG_DEBUG, "S: %d", now.sec); +#endif + for (i = 0; i < 1 << HASH_BITS ; i++) { + pthread_mutex_lock(&flows_mutex[i]); + flow = flows[i]; + flowpp = &flows[i]; + while (flow) { + if (flow->flags & FLOW_FRAG) { + /* Process fragmented flow */ + if ((now.sec - flow->mtime.sec) > frag_lifetime) { + /* Fragmented flow expired - put it into special chain */ +#if ((DEBUG) & DEBUG_I) + flows_fragmented--; + flows_total--; +#endif + *flowpp = flow->next; + flow->id = 0; + flow->flags &= ~FLOW_FRAG; + flow->next = scan_frag_dreg; + scan_frag_dreg = flow; + flow = *flowpp; + continue; + } + } else { + /* Flow is not frgamented */ + if ((now.sec - flow->mtime.sec) > inactive_lifetime + || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) { + /* Flow expired */ +#if ((DEBUG) & DEBUG_S) + my_log(LOG_DEBUG, "S: E %x", flow); +#endif +#if ((DEBUG) & DEBUG_I) + flows_total--; +#endif + *flowpp = flow->next; + pthread_mutex_lock(&emit_mutex); + flow->next = flows_emit; + flows_emit = flow; +#if ((DEBUG) & DEBUG_I) + emit_queue++; +#endif + pthread_mutex_unlock(&emit_mutex); + flow = *flowpp; + continue; + } + } + flowpp = &flow->next; + flow = flow->next; + } /* chain loop */ + pthread_mutex_unlock(&flows_mutex[i]); + } /* hash loop */ + if (flows_emit) pthread_cond_signal(&emit_cond); + + while (scan_frag_dreg) { + flow = scan_frag_dreg; + scan_frag_dreg = flow->next; +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + *logbuf = 0; +#endif + put_into(flow, MOVE_INTO +#if ((DEBUG) & (DEBUG_S | DEBUG_U)) + , logbuf +#endif + ); +#if ((DEBUG) & DEBUG_S) + my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf); +#endif + } + } +} + +void *cap_thread() +{ + struct ulog_packet_msg *ulog_msg; + struct ip *nl; + void *tl; + struct Flow *flow; + int len, off_frag, psize; +#if ((DEBUG) & DEBUG_C) + char buf[64]; + char logbuf[256]; +#endif + + setuser(); + + while (!killed) { + len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1); + if (len <= 0) { + my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno)); + continue; + } + while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) { + +#if ((DEBUG) & DEBUG_C) + sprintf(logbuf, "C: %d", ulog_msg->data_len); +#endif + + nl = (void *) &ulog_msg->payload; + psize = ulog_msg->data_len; + + /* Sanity check */ + if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) { +#if ((DEBUG) & DEBUG_C) + strcat(logbuf, " U"); + my_log(LOG_DEBUG, "%s", logbuf); +#endif +#if ((DEBUG) & DEBUG_I) + pkts_ignored++; +#endif + continue; + } + + if (pending_head->flags) { +#if ((DEBUG) & DEBUG_C) || defined MESSAGES + my_log(LOG_ERR, +# if ((DEBUG) & DEBUG_C) + "%s %s %s", logbuf, +# else + "%s %s", +# endif + "pending queue full:", "packet lost"); +#endif +#if ((DEBUG) & DEBUG_I) + pkts_lost_capture++; +#endif + goto done; + } + +#if ((DEBUG) & DEBUG_I) + pkts_total++; +#endif + + flow = pending_head; + + /* ?FIXME? Add sanity check for ip_len? */ + flow->size = ntohs(nl->ip_len); +#if ((DEBUG) & DEBUG_I) + size_total += flow->size; +#endif + + flow->sip = nl->ip_src; + flow->dip = nl->ip_dst; + flow->iif = snmp_index(ulog_msg->indev_name); + flow->oif = snmp_index(ulog_msg->outdev_name); + flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; + flow->proto = nl->ip_p; + flow->id = 0; + flow->tcp_flags = 0; + flow->pkts = 1; + flow->sizeF = 0; + flow->sizeP = 0; + /* Packets captured from OUTPUT table didn't contains valid timestamp */ + if (ulog_msg->timestamp_sec) { + flow->ctime.sec = ulog_msg->timestamp_sec; + flow->ctime.usec = ulog_msg->timestamp_usec; + } else gettime(&flow->ctime); + flow->mtime = flow->ctime; + + off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3; + + /* + Offset (from network layer) to transport layer header/IP data + IOW IP header size ;-) + + ?FIXME? + Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks... + */ + off_tl = nl->ip_hl << 2; + tl = (void *) nl + off_tl; + + /* THIS packet data size: data_size = total_size - ip_header_size*4 */ + flow->sizeF = ntohs(nl->ip_len) - off_tl; + psize -= off_tl; + if ((signed) flow->sizeF < 0) flow->sizeF = 0; + if (psize > (signed) flow->sizeF) psize = flow->sizeF; + + if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) { + /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */ +#if ((DEBUG) & DEBUG_C) + strcat(logbuf, " F"); +#endif +#if ((DEBUG) & DEBUG_I) + pkts_total_fragmented++; +#endif + flow->flags |= FLOW_FRAG; + flow->id = nl->ip_id; + + if (!(ntohs(nl->ip_off) & IP_MF)) { + /* Packet whith IP_MF contains information about whole datagram size */ + flow->flags |= FLOW_LASTFRAG; + /* size = frag_offset*8 + data_size */ + flow->sizeP = off_frag + flow->sizeF; + } + } + +#if ((DEBUG) & DEBUG_C) + sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif); + strcat(logbuf, buf); + sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto); + strcat(logbuf, buf); +#endif + + /* + Fortunately most interesting transport layer information fit + into first 8 bytes of IP data field (minimal nonzero size). + Thus we don't need actual packet reassembling to build whole + transport layer data. We only check the fragment offset for + zero value to find packet with this information. + */ + if (!off_frag && psize >= 8) { + switch (flow->proto) { + case IPPROTO_TCP: + case IPPROTO_UDP: + flow->sp = ((struct udphdr *)tl)->uh_sport; + flow->dp = ((struct udphdr *)tl)->uh_dport; + goto tl_known; + +#ifdef ICMP_TRICK + case IPPROTO_ICMP: + flow->sp = htons(((struct icmp *)tl)->icmp_type); + flow->dp = htons(((struct icmp *)tl)->icmp_code); + goto tl_known; +#endif +#ifdef ICMP_TRICK_CISCO + case IPPROTO_ICMP: + flow->dp = *((int32_t *) tl); + goto tl_known; +#endif + + default: + /* Unknown transport layer */ +#if ((DEBUG) & DEBUG_C) + strcat(logbuf, " U"); +#endif + flow->sp = 0; + flow->dp = 0; + break; + + tl_known: +#if ((DEBUG) & DEBUG_C) + sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp)); + strcat(logbuf, buf); +#endif + flow->flags |= FLOW_TL; + } + } + + /* Check for tcp flags presence (including CWR and ECE). */ + if (flow->proto == IPPROTO_TCP + && off_frag < 16 + && psize >= 16 - off_frag) { + flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag)); +#if ((DEBUG) & DEBUG_C) + sprintf(buf, " TCP:%x", flow->tcp_flags); + strcat(logbuf, buf); +#endif + } + +#if ((DEBUG) & DEBUG_C) + sprintf(buf, " => %x", (unsigned) flow); + strcat(logbuf, buf); + my_log(LOG_DEBUG, "%s", logbuf); +#endif + +#if ((DEBUG) & DEBUG_I) + pkts_pending++; + pending_queue_trace_candidate = pkts_pending - pkts_pending_done; + if (pending_queue_trace < pending_queue_trace_candidate) + pending_queue_trace = pending_queue_trace_candidate; +#endif + + /* Flow complete - inform unpending_thread() about it */ + pending_head->flags |= FLOW_PENDING; + pending_head = pending_head->next; + done: + pthread_cond_signal(&unpending_cond); + } + } + return 0; +} + +int main(int argc, char **argv) +{ + char errpbuf[512]; + char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule; + int c, i, sock, memory_limit = 0; + struct addrinfo hints, *res; + struct sockaddr_in saddr; + pthread_attr_t tattr; + struct sigaction sigact; + static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread}; + struct timeval timeout; + + sched_min = sched_get_priority_min(SCHED); + sched_max = sched_get_priority_max(SCHED); + + memset(&saddr, 0 , sizeof(saddr)); + memset(&hints, 0 , sizeof(hints)); + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + + /* Process command line options */ + + opterr = 0; + while ((c = my_getopt(argc, argv, parms)) != -1) { + switch (c) { + case '?': + usage(); + + case 'h': + usage(); + } + } + + if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg); + if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg); + if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg); + if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg); + if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg); + if (parms[nflag].count) { + switch (atoi(parms[nflag].arg)) { + case 1: + netflow = &NetFlow1; + break; + + case 5: + break; + + case 7: + netflow = &NetFlow7; + break; + + default: + fprintf(stderr, "Illegal %s\n", "NetFlow version"); + exit(1); + } + } + if (parms[vflag].count) verbosity = atoi(parms[vflag].arg); + if (parms[lflag].count) { + if ((log_suffix = strchr(parms[lflag].arg, ':'))) { + *log_suffix++ = 0; + if (*log_suffix) { + sprintf(errpbuf, "[%s]", log_suffix); + strcat(ident, errpbuf); + } + } + if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg); + if (log_suffix) *--log_suffix = ':'; + } + if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) { + err_malloc: + fprintf(stderr, "malloc(): %s\n", strerror(errno)); + exit(1); + } + sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident); + if (parms[qflag].count) { + pending_queue_length = atoi(parms[qflag].arg); + if (pending_queue_length < 1) { + fprintf(stderr, "Illegal %s\n", "pending queue length"); + exit(1); + } + } + if (parms[rflag].count) { + schedp.sched_priority = atoi(parms[rflag].arg); + if (schedp.sched_priority + && (schedp.sched_priority < sched_min + || schedp.sched_priority > sched_max)) { + fprintf(stderr, "Illegal %s\n", "realtime priority"); + exit(1); + } + } + if (parms[Bflag].count) { + sockbufsize = atoi(parms[Bflag].arg) << 10; + } + if (parms[bflag].count) { + bulk_quantity = atoi(parms[bflag].arg); + if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) { + fprintf(stderr, "Illegal %s\n", "bulk size"); + exit(1); + } + } + if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10; + if (parms[Xflag].count) { + for(i = 0; parms[Xflag].arg[i]; i++) + if (parms[Xflag].arg[i] == ':') nsnmp_rules++; + if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule)))) + goto err_malloc; + rule = strtok(parms[Xflag].arg, ":"); + for (i = 0; rule; i++) { + snmp_rules[i].len = strlen(rule); + if (snmp_rules[i].len > IFNAMSIZ) { + fprintf(stderr, "Illegal %s\n", "interface basename"); + exit(1); + } + strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len); + if (!*(rule - 1)) *(rule - 1) = ','; + rule = strtok(NULL, ","); + if (!rule) { + fprintf(stderr, "Illegal %s\n", "SNMP rule"); + exit(1); + } + snmp_rules[i].base = atoi(rule); + *(rule - 1) = ':'; + rule = strtok(NULL, ":"); + } + nsnmp_rules = i; + } + if (parms[tflag].count) + sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay); + if (parms[aflag].count) { + if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) { + bad_lhost: + fprintf(stderr, "Illegal %s\n", "source address"); + exit(1); + } else { + saddr = *((struct sockaddr_in *) res->ai_addr); + freeaddrinfo(res); + } + } + if (parms[uflag].count) + if ((pw = getpwnam(parms[uflag].arg)) == NULL) { + fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user"); + exit(1); + } + + + /* Process collectors parameters. Brrrr... :-[ */ + + npeers = argc - optind; + if (npeers < 1) usage(); + if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; + for (i = optind, npeers = 0; i < argc; i++, npeers++) { + dhost = argv[i]; + if (!(dport = strchr(dhost, ':'))) goto bad_collector; + *dport++ = 0; + if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + fprintf(stderr, "socket(): %s\n", strerror(errno)); + exit(1); + } + peers[npeers].sock = sock; + peers[npeers].type = PEER_MIRROR; + peers[npeers].laddr = saddr; + peers[npeers].seq = 0; + if ((lhost = strchr(dport, '/'))) { + *lhost++ = 0; + if ((type = strchr(lhost, '/'))) { + *type++ = 0; + switch (*type) { + case 0: + case 'm': + break; + + case 'r': + peers[npeers].type = PEER_ROTATE; + npeers_rot++; + break; + + default: + goto bad_collector; + } + } + if (*lhost) { + if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost; + peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr); + freeaddrinfo(res); + } + } + if (bind(sock, (struct sockaddr *) &peers[npeers].laddr, + sizeof(struct sockaddr_in))) { + fprintf(stderr, "bind(): %s\n", strerror(errno)); + exit(1); + } + if (getaddrinfo(dhost, dport, &hints, &res)) { + bad_collector: + fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1); + exit(1); + } + peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr); + freeaddrinfo(res); + if (connect(sock, (struct sockaddr *) &peers[npeers].addr, + sizeof(struct sockaddr_in))) { + fprintf(stderr, "connect(): %s\n", strerror(errno)); + exit(1); + } + + /* Restore command line */ + if (type) *--type = '/'; + if (lhost) *--lhost = '/'; + *--dport = ':'; + } + + if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc; + ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE); + if (!ulog_handle) { + fprintf(stderr, "libipulog initialization error: %s", + ipulog_strerror(ipulog_errno)); + exit(1); + } + if (sockbufsize) + if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF, + &sockbufsize, sizeof(sockbufsize)) < 0) + fprintf(stderr, "setsockopt(): %s", strerror(errno)); + + /* Daemonize (if log destination stdout-free) */ + + my_log_open(ident, verbosity, log_dest); + if (!(log_dest & 2)) { + switch (fork()) { + case -1: + fprintf(stderr, "fork(): %s", strerror(errno)); + exit(1); + + case 0: + setsid(); + freopen("/dev/null", "r", stdin); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); + break; + + default: + exit(0); + } + } else { + setvbuf(stdout, (char *)0, _IONBF, 0); + setvbuf(stderr, (char *)0, _IONBF, 0); + } + + pid = getpid(); + sprintf(errpbuf, "[%ld]", (long) pid); + strcat(ident, errpbuf); + + /* Initialization */ + + hash_init(); /* Actually for crc16 only */ + mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); + for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0); + +#ifdef UPTIME_TRICK + /* Hope 12 days is enough :-/ */ + start_time_offset = 1 << 20; + + /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */ +#endif + gettime(&start_time); + + /* + Build static pending queue as circular buffer. + */ + if (!(pending_head = mem_alloc())) goto err_mem_alloc; + pending_tail = pending_head; + for (i = pending_queue_length - 1; i--;) { + if (!(pending_tail->next = mem_alloc())) { + err_mem_alloc: + my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno)); + exit(1); + } + pending_tail = pending_tail->next; + } + pending_tail->next = pending_head; + pending_tail = pending_head; + + sigemptyset(&sig_mask); + sigact.sa_handler = &sighandler; + sigact.sa_mask = sig_mask; + sigact.sa_flags = 0; + sigaddset(&sig_mask, SIGTERM); + sigaction(SIGTERM, &sigact, 0); +#if ((DEBUG) & DEBUG_I) + sigaddset(&sig_mask, SIGUSR1); + sigaction(SIGUSR1, &sigact, 0); +#endif + if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) { + my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno)); + exit(1); + } + + my_log(LOG_INFO, "Starting %s...", VERSION); + + if (parms[cflag].count) { + if (chdir(parms[cflag].arg) || chroot(".")) { + my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno)); + exit(1); + } + } + + schedp.sched_priority = schedp.sched_priority - THREADS + 2; + pthread_attr_init(&tattr); + for (i = 0; i < THREADS - 1; i++) { + if (schedp.sched_priority > 0) { + if ((pthread_attr_setschedpolicy(&tattr, SCHED)) || + (pthread_attr_setschedparam(&tattr, &schedp))) { + my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno)); + exit(1); + } + } + if (pthread_create(&thid, &tattr, threads[i], 0)) { + my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno)); + exit(1); + } + pthread_detach(thid); + schedp.sched_priority++; + } + + if (pw) { + if (setgroups(0, NULL)) { + my_log(LOG_CRIT, "setgroups(): %s", strerror(errno)); + exit(1); + } + if (setregid(pw->pw_gid, pw->pw_gid)) { + my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno)); + exit(1); + } + if (setreuid(pw->pw_uid, pw->pw_uid)) { + my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno)); + exit(1); + } + } + + if (!(pidfile = fopen(pidfilepath, "w"))) + my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno)); + else { + fprintf(pidfile, "%ld\n", (long) pid); + fclose(pidfile); + } + + my_log(LOG_INFO, "pid: %d", pid); + my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s " + "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s", + ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime, + netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity, + memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1, + emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "", + parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : ""); + for (i = 0; i < nsnmp_rules; i++) { + my_log(LOG_INFO, "SNMP rule #%d %s:%d", + i + 1, snmp_rules[i].basename, snmp_rules[i].base); + } + for (i = 0; i < npeers; i++) { + switch (peers[i].type) { + case PEER_MIRROR: + c = 'm'; + break; + case PEER_ROTATE: + c = 'r'; + break; + } + snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr)); + my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1, + inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c); + } + + pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0); + + timeout.tv_usec = 0; + while (!killed + || (total_elements - free_elements - pending_queue_length) + || emit_count + || pending_tail->flags) { + + if (!sigs) { + timeout.tv_sec = scan_interval; + select(0, 0, 0, 0, &timeout); + } + + if (sigs & SIGTERM_MASK && !killed) { + sigs &= ~SIGTERM_MASK; + my_log(LOG_INFO, "SIGTERM received. Emitting flows cache..."); + scan_interval = 1; + frag_lifetime = -1; + active_lifetime = -1; + inactive_lifetime = -1; + emit_timeout = 1; + unpending_timeout = 1; + killed = 1; + pthread_cond_signal(&scan_cond); + pthread_cond_signal(&unpending_cond); + } + +#if ((DEBUG) & DEBUG_I) + if (sigs & SIGUSR1_MASK) { + sigs &= ~SIGUSR1_MASK; + info_debug(); + } +#endif + } + remove(pidfilepath); +#if ((DEBUG) & DEBUG_I) + info_debug(); +#endif + my_log(LOG_INFO, "Done."); +#ifdef WALL + return 0; +#endif +}