/* Copyright (C) Slava Astashonok This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License. $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $ */ #include /* stdout, stderr, freopen() */ #include /* atoi(), exit() */ #include /* getopt(), alarm(), getpid(), sedsid(), chdir() */ #include /* strerror() */ #include /* sig*() */ #include #include struct ipulog_handle { int fd; u_int8_t blocking; struct sockaddr_nl local; struct sockaddr_nl peer; struct nlmsghdr* last_nlhdr; }; /* inet_*() (Linux, FreeBSD, Solaris), getpid() */ #include #include #include #include #include #include #include #include #include #include #include #include #ifdef OS_LINUX #include #endif /* pthread_*() */ #include /* errno */ #include /* getaddrinfo() */ #include /* nanosleep() */ #include /* gettimeofday() */ #include /* scheduling */ #include /* select() (POSIX)*/ #include /* open() */ #include #include #include #include #include #include #include #include enum { aflag, Bflag, bflag, cflag, dflag, eflag, gflag, hflag, lflag, mflag, Mflag, nflag, qflag, rflag, sflag, tflag, Uflag, uflag, vflag, Xflag, }; static struct getopt_parms parms[] = { {'a', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'B', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'b', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'c', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'d', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'e', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'g', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'h', 0, 0, 0}, {'l', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'m', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'M', 0, 0, 0}, {'n', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'q', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'r', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'s', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'t', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'U', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'u', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'v', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'X', MY_GETOPT_ARG_REQUIRED, 0, 0}, {0, 0, 0, 0} }; extern char *optarg; extern int optind, opterr, optopt; extern int errno; extern struct NetFlow NetFlow1; extern struct NetFlow NetFlow5; extern struct NetFlow NetFlow7; #define mark_is_tos parms[Mflag].count static unsigned scan_interval = 5; static int frag_lifetime = 30; static int inactive_lifetime = 60; static int active_lifetime = 300; static int sockbufsize; #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1) #if (MEM_BITS == 0) || (MEM_BITS == 16) #define BULK_QUANTITY 10000 #else #define BULK_QUANTITY 200 #endif static unsigned bulk_quantity = BULK_QUANTITY; static unsigned pending_queue_length = 100; static struct NetFlow *netflow = &NetFlow5; static unsigned verbosity = 6; static unsigned log_dest = MY_LOG_SYSLOG; static struct Time start_time; static long start_time_offset; static int off_tl; /* From mem.c */ extern unsigned total_elements; extern unsigned free_elements; extern unsigned total_memory; #if ((DEBUG) & DEBUG_I) static unsigned emit_pkts, emit_queue; static uint64_t size_total; static unsigned pkts_total, pkts_total_fragmented; static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending; static unsigned pkts_pending, pkts_pending_done; static unsigned pending_queue_trace, pending_queue_trace_candidate; static unsigned flows_total, flows_fragmented; #endif static unsigned emit_count; static uint32_t emit_sequence; static unsigned emit_rate_bytes, emit_rate_delay; static struct Time emit_time; static uint8_t emit_packet[NETFLOW_MAX_PACKET]; static pthread_t thid; static sigset_t sig_mask; static struct sched_param schedp; static int sched_min, sched_max; static int npeers, npeers_rot; static struct peer *peers; static int sigs; static struct Flow *flows[1 << HASH_BITS]; static pthread_mutex_t flows_mutex[1 << HASH_BITS]; static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER; static struct Flow *pending_head, *pending_tail; static struct Flow *scan_frag_dreg; static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER; static struct Flow *flows_emit; static char ident[256] = "fprobe-ulog"; static FILE *pidfile; static char *pidfilepath; static pid_t pid; static int killed; static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT; static struct ipulog_handle *ulog_handle; static uint32_t ulog_gmask = 1; static char *cap_buf; static int nsnmp_rules; static struct snmp_rule *snmp_rules; static struct passwd *pw = 0; void usage() { fprintf(stdout, "fprobe-ulog: a NetFlow probe. Version %s\n" "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n" "\n" "-h\t\tDisplay this help\n" "-U \tULOG group bitwise mask [1]\n" "-s \tHow often scan for expired flows [5]\n" "-g \tFragmented flow lifetime [30]\n" "-d \tIdle flow lifetime (inactive timer) [60]\n" "-e \tActive flow lifetime (active timer) [300]\n" "-n \tNetFlow version for use (1, 5 or 7) [5]\n" "-a
\tUse
as source for NetFlow flow\n" "-X \tInterface name to SNMP-index conversion rules\n" "-M\t\tUse netfilter mark value as ToS flag\n" "-b \tMemory bulk size (1..%u) [%u]\n" "-m \tMemory limit (0=no limit) [0]\n" "-q \tPending queue length [100]\n" "-B \tKernel capture buffer size [0]\n" "-r \tReal-time priority (0=disabled, %d..%d) [0]\n" "-t \tProduce nanosecond delay after each bytes sent [0:0]\n" "-c \tDirectory to chroot to\n" "-u \tUser to run as\n" "-v \tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n" "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n" "remote:port\tAddress of the NetFlow collector\n", VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max); exit(0); } #if ((DEBUG) & DEBUG_I) void info_debug() { my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d", pkts_total, pkts_total_fragmented, size_total, pkts_pending - pkts_pending_done, pending_queue_trace); my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d", pkts_ignored, pkts_lost_capture, pkts_lost_unpending); my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d", flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue); my_log(LOG_DEBUG, "I: memory:%d/%d (%d)", total_elements, free_elements, total_memory); } #endif void sighandler(int sig) { switch (sig) { case SIGTERM: sigs |= SIGTERM_MASK; break; #if ((DEBUG) & DEBUG_I) case SIGUSR1: sigs |= SIGUSR1_MASK; break; #endif } } void gettime(struct Time *now) { struct timeval t; gettimeofday(&t, 0); now->sec = t.tv_sec; now->usec = t.tv_usec; } inline time_t cmpmtime(struct Time *t1, struct Time *t2) { return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000; } /* Uptime in miliseconds */ uint32_t getuptime(struct Time *t) { /* Maximum uptime is about 49/2 days */ return cmpmtime(t, &start_time); } hash_t hash_flow(struct Flow *flow) { if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F)); else return hash(flow, sizeof(struct Flow_TL)); } uint16_t snmp_index(char *name) { uint32_t i; if (!*name) return 0; for (i = 0; (int) i < nsnmp_rules; i++) { if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue; return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base; } if ((i = if_nametoindex(name))) return i; return -1; } inline void copy_flow(struct Flow *src, struct Flow *dst) { dst->iif = src->iif; dst->oif = src->oif; dst->sip = src->sip; dst->dip = src->dip; dst->tos = src->tos; dst->proto = src->proto; dst->tcp_flags = src->tcp_flags; dst->id = src->id; dst->sp = src->sp; dst->dp = src->dp; dst->pkts = src->pkts; dst->size = src->size; dst->sizeF = src->sizeF; dst->sizeP = src->sizeP; dst->ctime = src->ctime; dst->mtime = src->mtime; dst->flags = src->flags; } struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev) { struct Flow **flowpp; #ifdef WALL flowpp = 0; #endif if (prev) flowpp = *prev; while (where) { if (where->sip.s_addr == what->sip.s_addr && where->dip.s_addr == what->dip.s_addr && where->proto == what->proto) { switch ((what->flags + where->flags) & FLOW_FRAGMASK) { case 0: /* Both unfragmented */ if ((what->sp == where->sp) && (what->dp == where->dp)) goto done; break; case 2: /* Both fragmented */ if (where->id == what->id) goto done; break; } } flowpp = &where->next; where = where->next; } done: if (prev) *prev = flowpp; return where; } int put_into(struct Flow *flow, int flag #if ((DEBUG) & (DEBUG_S | DEBUG_U)) , char *logbuf #endif ) { int ret = 0; hash_t h; struct Flow *flown, **flowpp; #if ((DEBUG) & (DEBUG_S | DEBUG_U)) char buf[64]; #endif h = hash_flow(flow); #if ((DEBUG) & (DEBUG_S | DEBUG_U)) sprintf(buf, " %x H:%04x", (unsigned) flow, h); strcat(logbuf, buf); #endif pthread_mutex_lock(&flows_mutex[h]); flowpp = &flows[h]; if (!(flown = find(flows[h], flow, &flowpp))) { /* No suitable flow found - add */ if (flag == COPY_INTO) { if ((flown = mem_alloc())) { copy_flow(flow, flown); flow = flown; } else { #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES my_log(LOG_ERR, "%s %s. %s", "mem_alloc():", strerror(errno), "packet lost"); #endif return -1; } } flow->next = flows[h]; flows[h] = flow; #if ((DEBUG) & DEBUG_I) flows_total++; if (flow->flags & FLOW_FRAG) flows_fragmented++; #endif #if ((DEBUG) & (DEBUG_S | DEBUG_U)) if (flown) { sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags); strcat(logbuf, buf); } #endif } else { /* Found suitable flow - update */ #if ((DEBUG) & (DEBUG_S | DEBUG_U)) sprintf(buf, " +> %x", (unsigned) flown); strcat(logbuf, buf); #endif if (cmpmtime(&flow->mtime, &flown->mtime) > 0) flown->mtime = flow->mtime; if (cmpmtime(&flow->ctime, &flown->ctime) < 0) flown->ctime = flow->ctime; flown->tcp_flags |= flow->tcp_flags; flown->size += flow->size; flown->pkts += flow->pkts; if (flow->flags & FLOW_FRAG) { /* Fragmented flow require some additional work */ if (flow->flags & FLOW_TL) { /* ?FIXME? Several packets with FLOW_TL (attack) */ flown->sp = flow->sp; flown->dp = flow->dp; } if (flow->flags & FLOW_LASTFRAG) { /* ?FIXME? Several packets with FLOW_LASTFRAG (attack) */ flown->sizeP = flow->sizeP; } flown->flags |= flow->flags; flown->sizeF += flow->sizeF; if ((flown->flags & FLOW_LASTFRAG) && (flown->sizeF >= flown->sizeP)) { /* All fragments received - flow reassembled */ *flowpp = flown->next; pthread_mutex_unlock(&flows_mutex[h]); #if ((DEBUG) & DEBUG_I) flows_total--; flows_fragmented--; #endif flown->id = 0; flown->flags &= ~FLOW_FRAG; #if ((DEBUG) & (DEBUG_U | DEBUG_S)) strcat(logbuf," R"); #endif ret = put_into(flown, MOVE_INTO #if ((DEBUG) & (DEBUG_U | DEBUG_S)) , logbuf #endif ); } } if (flag == MOVE_INTO) mem_free(flow); } pthread_mutex_unlock(&flows_mutex[h]); return ret; } void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) { int i; for (i = 0; i < fields; i++) { #if ((DEBUG) & DEBUG_F) my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p); #endif switch (format[i]) { case NETFLOW_IPV4_SRC_ADDR: ((struct in_addr *) p)->s_addr = flow->sip.s_addr; p += NETFLOW_IPV4_SRC_ADDR_SIZE; break; case NETFLOW_IPV4_DST_ADDR: ((struct in_addr *) p)->s_addr = flow->dip.s_addr; p += NETFLOW_IPV4_DST_ADDR_SIZE; break; case NETFLOW_INPUT_SNMP: *((uint16_t *) p) = htons(flow->iif); p += NETFLOW_INPUT_SNMP_SIZE; break; case NETFLOW_OUTPUT_SNMP: *((uint16_t *) p) = htons(flow->oif); p += NETFLOW_OUTPUT_SNMP_SIZE; break; case NETFLOW_PKTS_32: *((uint32_t *) p) = htonl(flow->pkts); p += NETFLOW_PKTS_32_SIZE; break; case NETFLOW_BYTES_32: *((uint32_t *) p) = htonl(flow->size); p += NETFLOW_BYTES_32_SIZE; break; case NETFLOW_FIRST_SWITCHED: *((uint32_t *) p) = htonl(getuptime(&flow->ctime)); p += NETFLOW_FIRST_SWITCHED_SIZE; break; case NETFLOW_LAST_SWITCHED: *((uint32_t *) p) = htonl(getuptime(&flow->mtime)); p += NETFLOW_LAST_SWITCHED_SIZE; break; case NETFLOW_L4_SRC_PORT: *((uint16_t *) p) = flow->sp; p += NETFLOW_L4_SRC_PORT_SIZE; break; case NETFLOW_L4_DST_PORT: *((uint16_t *) p) = flow->dp; p += NETFLOW_L4_DST_PORT_SIZE; break; case NETFLOW_PROT: *((uint8_t *) p) = flow->proto; p += NETFLOW_PROT_SIZE; break; case NETFLOW_SRC_TOS: *((uint8_t *) p) = flow->tos; p += NETFLOW_SRC_TOS_SIZE; break; case NETFLOW_TCP_FLAGS: *((uint8_t *) p) = flow->tcp_flags; p += NETFLOW_TCP_FLAGS_SIZE; break; case NETFLOW_VERSION: *((uint16_t *) p) = htons(netflow->Version); p += NETFLOW_VERSION_SIZE; break; case NETFLOW_COUNT: *((uint16_t *) p) = htons(emit_count); p += NETFLOW_COUNT_SIZE; break; case NETFLOW_UPTIME: *((uint32_t *) p) = htonl(getuptime(&emit_time)); p += NETFLOW_UPTIME_SIZE; break; case NETFLOW_UNIX_SECS: *((uint32_t *) p) = htonl(emit_time.sec); p += NETFLOW_UNIX_SECS_SIZE; break; case NETFLOW_UNIX_NSECS: *((uint32_t *) p) = htonl(emit_time.usec * 1000); p += NETFLOW_UNIX_NSECS_SIZE; break; case NETFLOW_FLOW_SEQUENCE: //*((uint32_t *) p) = htonl(emit_sequence); *((uint32_t *) p) = 0; p += NETFLOW_FLOW_SEQUENCE_SIZE; break; case NETFLOW_PAD8: /* Unsupported (uint8_t) */ case NETFLOW_ENGINE_TYPE: case NETFLOW_ENGINE_ID: case NETFLOW_FLAGS7_1: case NETFLOW_SRC_MASK: case NETFLOW_DST_MASK: *((uint8_t *) p) = 0; p += NETFLOW_PAD8_SIZE; break; case NETFLOW_PAD16: /* Unsupported (uint16_t) */ case NETFLOW_SRC_AS: case NETFLOW_DST_AS: case NETFLOW_FLAGS7_2: *((uint16_t *) p) = 0; p += NETFLOW_PAD16_SIZE; break; case NETFLOW_PAD32: /* Unsupported (uint32_t) */ case NETFLOW_IPV4_NEXT_HOP: case NETFLOW_ROUTER_SC: *((uint32_t *) p) = 0; p += NETFLOW_PAD32_SIZE; break; default: my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d", format, i, format[i]); exit(1); } } #if ((DEBUG) & DEBUG_F) my_log(LOG_DEBUG, "F: return %x", (unsigned) p); #endif return p; } void setuser() { /* Workaround for clone()-based threads Try to change EUID independently of main thread */ if (pw) { setgroups(0, NULL); setregid(pw->pw_gid, pw->pw_gid); setreuid(pw->pw_uid, pw->pw_uid); } } void *emit_thread() { struct Flow *flow; void *p; struct timeval now; struct timespec timeout; int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0; p = (void *) &emit_packet + netflow->HeaderSize; timeout.tv_nsec = 0; setuser(); for (;;) { pthread_mutex_lock(&emit_mutex); while (!flows_emit) { gettimeofday(&now, 0); timeout.tv_sec = now.tv_sec + emit_timeout; /* Do not wait until emit_packet will filled - it may be too long */ if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { pthread_mutex_unlock(&emit_mutex); goto sendit; } } flow = flows_emit; flows_emit = flows_emit->next; #if ((DEBUG) & DEBUG_I) emit_queue--; #endif pthread_mutex_unlock(&emit_mutex); #ifdef UPTIME_TRICK if (!emit_count) { gettime(&start_time); start_time.sec -= start_time_offset; } #endif p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p); mem_free(flow); emit_count++; if (emit_count == netflow->MaxFlows) { sendit: gettime(&emit_time); p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet); size = netflow->HeaderSize + emit_count * netflow->FlowSize; peer_rot_cur = 0; for (i = 0; i < npeers; i++) { if (peers[i].type == PEER_MIRROR) goto sendreal; if (peers[i].type == PEER_ROTATE) if (peer_rot_cur++ == peer_rot_work) { sendreal: if (netflow->SeqOffset) *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq); ret = send(peers[i].sock, emit_packet, size, 0); if (ret < size) { #if ((DEBUG) & DEBUG_E) || defined MESSAGES my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s", i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); #endif } #if ((DEBUG) & DEBUG_E) commaneelse { my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); } #endif peers[i].seq += emit_count; /* Rate limit */ if (emit_rate_bytes) { sent += size; delay = sent / emit_rate_bytes; if (delay) { sent %= emit_rate_bytes; timeout.tv_sec = 0; timeout.tv_nsec = emit_rate_delay * delay; while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); } } } } if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot; emit_sequence += emit_count; emit_count = 0; #if ((DEBUG) & DEBUG_I) emit_pkts++; #endif } } } void *unpending_thread() { struct timeval now; struct timespec timeout; #if ((DEBUG) & (DEBUG_S | DEBUG_U)) char logbuf[256]; #endif setuser(); timeout.tv_nsec = 0; pthread_mutex_lock(&unpending_mutex); for (;;) { while (!(pending_tail->flags & FLOW_PENDING)) { gettimeofday(&now, 0); timeout.tv_sec = now.tv_sec + unpending_timeout; pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout); } #if ((DEBUG) & (DEBUG_S | DEBUG_U)) *logbuf = 0; #endif if (put_into(pending_tail, COPY_INTO #if ((DEBUG) & (DEBUG_S | DEBUG_U)) , logbuf #endif ) < 0) { #if ((DEBUG) & DEBUG_I) pkts_lost_unpending++; #endif } #if ((DEBUG) & DEBUG_U) my_log(LOG_DEBUG, "%s%s", "U:", logbuf); #endif pending_tail->flags = 0; pending_tail = pending_tail->next; #if ((DEBUG) & DEBUG_I) pkts_pending_done++; #endif } } void *scan_thread() { #if ((DEBUG) & (DEBUG_S | DEBUG_U)) char logbuf[256]; #endif int i; struct Flow *flow, **flowpp; struct Time now; struct timespec timeout; setuser(); timeout.tv_nsec = 0; pthread_mutex_lock(&scan_mutex); for (;;) { gettime(&now); timeout.tv_sec = now.sec + scan_interval; pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout); gettime(&now); #if ((DEBUG) & DEBUG_S) my_log(LOG_DEBUG, "S: %d", now.sec); #endif for (i = 0; i < 1 << HASH_BITS ; i++) { pthread_mutex_lock(&flows_mutex[i]); flow = flows[i]; flowpp = &flows[i]; while (flow) { if (flow->flags & FLOW_FRAG) { /* Process fragmented flow */ if ((now.sec - flow->mtime.sec) > frag_lifetime) { /* Fragmented flow expired - put it into special chain */ #if ((DEBUG) & DEBUG_I) flows_fragmented--; flows_total--; #endif *flowpp = flow->next; flow->id = 0; flow->flags &= ~FLOW_FRAG; flow->next = scan_frag_dreg; scan_frag_dreg = flow; flow = *flowpp; continue; } } else { /* Flow is not frgamented */ if ((now.sec - flow->mtime.sec) > inactive_lifetime || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) { /* Flow expired */ #if ((DEBUG) & DEBUG_S) my_log(LOG_DEBUG, "S: E %x", flow); #endif #if ((DEBUG) & DEBUG_I) flows_total--; #endif *flowpp = flow->next; pthread_mutex_lock(&emit_mutex); flow->next = flows_emit; flows_emit = flow; #if ((DEBUG) & DEBUG_I) emit_queue++; #endif pthread_mutex_unlock(&emit_mutex); flow = *flowpp; continue; } } flowpp = &flow->next; flow = flow->next; } /* chain loop */ pthread_mutex_unlock(&flows_mutex[i]); } /* hash loop */ if (flows_emit) pthread_cond_signal(&emit_cond); while (scan_frag_dreg) { flow = scan_frag_dreg; scan_frag_dreg = flow->next; #if ((DEBUG) & (DEBUG_S | DEBUG_U)) *logbuf = 0; #endif put_into(flow, MOVE_INTO #if ((DEBUG) & (DEBUG_S | DEBUG_U)) , logbuf #endif ); #if ((DEBUG) & DEBUG_S) my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf); #endif } } } void *cap_thread() { struct ulog_packet_msg *ulog_msg; struct ip *nl; void *tl; struct Flow *flow; int len, off_frag, psize; #if ((DEBUG) & DEBUG_C) char buf[64]; char logbuf[256]; #endif setuser(); while (!killed) { len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1); if (len <= 0) { my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno)); continue; } while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) { #if ((DEBUG) & DEBUG_C) sprintf(logbuf, "C: %d", ulog_msg->data_len); #endif nl = (void *) &ulog_msg->payload; psize = ulog_msg->data_len; /* Sanity check */ if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) { #if ((DEBUG) & DEBUG_C) strcat(logbuf, " U"); my_log(LOG_DEBUG, "%s", logbuf); #endif #if ((DEBUG) & DEBUG_I) pkts_ignored++; #endif continue; } if (pending_head->flags) { #if ((DEBUG) & DEBUG_C) || defined MESSAGES my_log(LOG_ERR, # if ((DEBUG) & DEBUG_C) "%s %s %s", logbuf, # else "%s %s", # endif "pending queue full:", "packet lost"); #endif #if ((DEBUG) & DEBUG_I) pkts_lost_capture++; #endif goto done; } #if ((DEBUG) & DEBUG_I) pkts_total++; #endif flow = pending_head; /* ?FIXME? Add sanity check for ip_len? */ flow->size = ntohs(nl->ip_len); #if ((DEBUG) & DEBUG_I) size_total += flow->size; #endif flow->sip = nl->ip_src; flow->dip = nl->ip_dst; flow->iif = snmp_index(ulog_msg->indev_name); flow->oif = snmp_index(ulog_msg->outdev_name); flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; flow->proto = nl->ip_p; flow->id = 0; flow->tcp_flags = 0; flow->pkts = 1; flow->sizeF = 0; flow->sizeP = 0; /* Packets captured from OUTPUT table didn't contains valid timestamp */ if (ulog_msg->timestamp_sec) { flow->ctime.sec = ulog_msg->timestamp_sec; flow->ctime.usec = ulog_msg->timestamp_usec; } else gettime(&flow->ctime); flow->mtime = flow->ctime; off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3; /* Offset (from network layer) to transport layer header/IP data IOW IP header size ;-) ?FIXME? Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks... */ off_tl = nl->ip_hl << 2; tl = (void *) nl + off_tl; /* THIS packet data size: data_size = total_size - ip_header_size*4 */ flow->sizeF = ntohs(nl->ip_len) - off_tl; psize -= off_tl; if ((signed) flow->sizeF < 0) flow->sizeF = 0; if (psize > (signed) flow->sizeF) psize = flow->sizeF; if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) { /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */ #if ((DEBUG) & DEBUG_C) strcat(logbuf, " F"); #endif #if ((DEBUG) & DEBUG_I) pkts_total_fragmented++; #endif flow->flags |= FLOW_FRAG; flow->id = nl->ip_id; if (!(ntohs(nl->ip_off) & IP_MF)) { /* Packet whith IP_MF contains information about whole datagram size */ flow->flags |= FLOW_LASTFRAG; /* size = frag_offset*8 + data_size */ flow->sizeP = off_frag + flow->sizeF; } } #if ((DEBUG) & DEBUG_C) sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif); strcat(logbuf, buf); sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto); strcat(logbuf, buf); #endif /* Fortunately most interesting transport layer information fit into first 8 bytes of IP data field (minimal nonzero size). Thus we don't need actual packet reassembling to build whole transport layer data. We only check the fragment offset for zero value to find packet with this information. */ if (!off_frag && psize >= 8) { switch (flow->proto) { case IPPROTO_TCP: case IPPROTO_UDP: flow->sp = ((struct udphdr *)tl)->uh_sport; flow->dp = ((struct udphdr *)tl)->uh_dport; goto tl_known; #ifdef ICMP_TRICK case IPPROTO_ICMP: flow->sp = htons(((struct icmp *)tl)->icmp_type); flow->dp = htons(((struct icmp *)tl)->icmp_code); goto tl_known; #endif #ifdef ICMP_TRICK_CISCO case IPPROTO_ICMP: flow->dp = *((int32_t *) tl); goto tl_known; #endif default: /* Unknown transport layer */ #if ((DEBUG) & DEBUG_C) strcat(logbuf, " U"); #endif flow->sp = 0; flow->dp = 0; break; tl_known: #if ((DEBUG) & DEBUG_C) sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp)); strcat(logbuf, buf); #endif flow->flags |= FLOW_TL; } } /* Check for tcp flags presence (including CWR and ECE). */ if (flow->proto == IPPROTO_TCP && off_frag < 16 && psize >= 16 - off_frag) { flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag)); #if ((DEBUG) & DEBUG_C) sprintf(buf, " TCP:%x", flow->tcp_flags); strcat(logbuf, buf); #endif } #if ((DEBUG) & DEBUG_C) sprintf(buf, " => %x", (unsigned) flow); strcat(logbuf, buf); my_log(LOG_DEBUG, "%s", logbuf); #endif #if ((DEBUG) & DEBUG_I) pkts_pending++; pending_queue_trace_candidate = pkts_pending - pkts_pending_done; if (pending_queue_trace < pending_queue_trace_candidate) pending_queue_trace = pending_queue_trace_candidate; #endif /* Flow complete - inform unpending_thread() about it */ pending_head->flags |= FLOW_PENDING; pending_head = pending_head->next; done: pthread_cond_signal(&unpending_cond); } } return 0; } int main(int argc, char **argv) { char errpbuf[512]; char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule; int c, i, sock, memory_limit = 0; struct addrinfo hints, *res; struct sockaddr_in saddr; pthread_attr_t tattr; struct sigaction sigact; static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread}; struct timeval timeout; sched_min = sched_get_priority_min(SCHED); sched_max = sched_get_priority_max(SCHED); memset(&saddr, 0 , sizeof(saddr)); memset(&hints, 0 , sizeof(hints)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Process command line options */ opterr = 0; while ((c = my_getopt(argc, argv, parms)) != -1) { switch (c) { case '?': usage(); case 'h': usage(); } } if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg); if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg); if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg); if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg); if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg); if (parms[nflag].count) { switch (atoi(parms[nflag].arg)) { case 1: netflow = &NetFlow1; break; case 5: break; case 7: netflow = &NetFlow7; break; default: fprintf(stderr, "Illegal %s\n", "NetFlow version"); exit(1); } } if (parms[vflag].count) verbosity = atoi(parms[vflag].arg); if (parms[lflag].count) { if ((log_suffix = strchr(parms[lflag].arg, ':'))) { *log_suffix++ = 0; if (*log_suffix) { sprintf(errpbuf, "[%s]", log_suffix); strcat(ident, errpbuf); } } if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg); if (log_suffix) *--log_suffix = ':'; } if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) { err_malloc: fprintf(stderr, "malloc(): %s\n", strerror(errno)); exit(1); } sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident); if (parms[qflag].count) { pending_queue_length = atoi(parms[qflag].arg); if (pending_queue_length < 1) { fprintf(stderr, "Illegal %s\n", "pending queue length"); exit(1); } } if (parms[rflag].count) { schedp.sched_priority = atoi(parms[rflag].arg); if (schedp.sched_priority && (schedp.sched_priority < sched_min || schedp.sched_priority > sched_max)) { fprintf(stderr, "Illegal %s\n", "realtime priority"); exit(1); } } if (parms[Bflag].count) { sockbufsize = atoi(parms[Bflag].arg) << 10; } if (parms[bflag].count) { bulk_quantity = atoi(parms[bflag].arg); if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) { fprintf(stderr, "Illegal %s\n", "bulk size"); exit(1); } } if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10; if (parms[Xflag].count) { for(i = 0; parms[Xflag].arg[i]; i++) if (parms[Xflag].arg[i] == ':') nsnmp_rules++; if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule)))) goto err_malloc; rule = strtok(parms[Xflag].arg, ":"); for (i = 0; rule; i++) { snmp_rules[i].len = strlen(rule); if (snmp_rules[i].len > IFNAMSIZ) { fprintf(stderr, "Illegal %s\n", "interface basename"); exit(1); } strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len); if (!*(rule - 1)) *(rule - 1) = ','; rule = strtok(NULL, ","); if (!rule) { fprintf(stderr, "Illegal %s\n", "SNMP rule"); exit(1); } snmp_rules[i].base = atoi(rule); *(rule - 1) = ':'; rule = strtok(NULL, ":"); } nsnmp_rules = i; } if (parms[tflag].count) sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay); if (parms[aflag].count) { if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) { bad_lhost: fprintf(stderr, "Illegal %s\n", "source address"); exit(1); } else { saddr = *((struct sockaddr_in *) res->ai_addr); freeaddrinfo(res); } } if (parms[uflag].count) if ((pw = getpwnam(parms[uflag].arg)) == NULL) { fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user"); exit(1); } /* Process collectors parameters. Brrrr... :-[ */ npeers = argc - optind; if (npeers < 1) usage(); if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; for (i = optind, npeers = 0; i < argc; i++, npeers++) { dhost = argv[i]; if (!(dport = strchr(dhost, ':'))) goto bad_collector; *dport++ = 0; if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { fprintf(stderr, "socket(): %s\n", strerror(errno)); exit(1); } peers[npeers].sock = sock; peers[npeers].type = PEER_MIRROR; peers[npeers].laddr = saddr; peers[npeers].seq = 0; if ((lhost = strchr(dport, '/'))) { *lhost++ = 0; if ((type = strchr(lhost, '/'))) { *type++ = 0; switch (*type) { case 0: case 'm': break; case 'r': peers[npeers].type = PEER_ROTATE; npeers_rot++; break; default: goto bad_collector; } } if (*lhost) { if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost; peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr); freeaddrinfo(res); } } if (bind(sock, (struct sockaddr *) &peers[npeers].laddr, sizeof(struct sockaddr_in))) { fprintf(stderr, "bind(): %s\n", strerror(errno)); exit(1); } if (getaddrinfo(dhost, dport, &hints, &res)) { bad_collector: fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1); exit(1); } peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr); freeaddrinfo(res); if (connect(sock, (struct sockaddr *) &peers[npeers].addr, sizeof(struct sockaddr_in))) { fprintf(stderr, "connect(): %s\n", strerror(errno)); exit(1); } /* Restore command line */ if (type) *--type = '/'; if (lhost) *--lhost = '/'; *--dport = ':'; } if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc; ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE); if (!ulog_handle) { fprintf(stderr, "libipulog initialization error: %s", ipulog_strerror(ipulog_errno)); exit(1); } if (sockbufsize) if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF, &sockbufsize, sizeof(sockbufsize)) < 0) fprintf(stderr, "setsockopt(): %s", strerror(errno)); /* Daemonize (if log destination stdout-free) */ my_log_open(ident, verbosity, log_dest); if (!(log_dest & 2)) { switch (fork()) { case -1: fprintf(stderr, "fork(): %s", strerror(errno)); exit(1); case 0: setsid(); freopen("/dev/null", "r", stdin); freopen("/dev/null", "w", stdout); freopen("/dev/null", "w", stderr); break; default: exit(0); } } else { setvbuf(stdout, (char *)0, _IONBF, 0); setvbuf(stderr, (char *)0, _IONBF, 0); } pid = getpid(); sprintf(errpbuf, "[%ld]", (long) pid); strcat(ident, errpbuf); /* Initialization */ hash_init(); /* Actually for crc16 only */ mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0); #ifdef UPTIME_TRICK /* Hope 12 days is enough :-/ */ start_time_offset = 1 << 20; /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */ #endif gettime(&start_time); /* Build static pending queue as circular buffer. */ if (!(pending_head = mem_alloc())) goto err_mem_alloc; pending_tail = pending_head; for (i = pending_queue_length - 1; i--;) { if (!(pending_tail->next = mem_alloc())) { err_mem_alloc: my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno)); exit(1); } pending_tail = pending_tail->next; } pending_tail->next = pending_head; pending_tail = pending_head; sigemptyset(&sig_mask); sigact.sa_handler = &sighandler; sigact.sa_mask = sig_mask; sigact.sa_flags = 0; sigaddset(&sig_mask, SIGTERM); sigaction(SIGTERM, &sigact, 0); #if ((DEBUG) & DEBUG_I) sigaddset(&sig_mask, SIGUSR1); sigaction(SIGUSR1, &sigact, 0); #endif if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) { my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno)); exit(1); } my_log(LOG_INFO, "Starting %s...", VERSION); if (parms[cflag].count) { if (chdir(parms[cflag].arg) || chroot(".")) { my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno)); exit(1); } } schedp.sched_priority = schedp.sched_priority - THREADS + 2; pthread_attr_init(&tattr); for (i = 0; i < THREADS - 1; i++) { if (schedp.sched_priority > 0) { if ((pthread_attr_setschedpolicy(&tattr, SCHED)) || (pthread_attr_setschedparam(&tattr, &schedp))) { my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno)); exit(1); } } if (pthread_create(&thid, &tattr, threads[i], 0)) { my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno)); exit(1); } pthread_detach(thid); schedp.sched_priority++; } if (pw) { if (setgroups(0, NULL)) { my_log(LOG_CRIT, "setgroups(): %s", strerror(errno)); exit(1); } if (setregid(pw->pw_gid, pw->pw_gid)) { my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno)); exit(1); } if (setreuid(pw->pw_uid, pw->pw_uid)) { my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno)); exit(1); } } if (!(pidfile = fopen(pidfilepath, "w"))) my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno)); else { fprintf(pidfile, "%ld\n", (long) pid); fclose(pidfile); } my_log(LOG_INFO, "pid: %d", pid); my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s " "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s", ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime, netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity, memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1, emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "", parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : ""); for (i = 0; i < nsnmp_rules; i++) { my_log(LOG_INFO, "SNMP rule #%d %s:%d", i + 1, snmp_rules[i].basename, snmp_rules[i].base); } for (i = 0; i < npeers; i++) { switch (peers[i].type) { case PEER_MIRROR: c = 'm'; break; case PEER_ROTATE: c = 'r'; break; } snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr)); my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1, inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c); } pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0); timeout.tv_usec = 0; while (!killed || (total_elements - free_elements - pending_queue_length) || emit_count || pending_tail->flags) { if (!sigs) { timeout.tv_sec = scan_interval; select(0, 0, 0, 0, &timeout); } if (sigs & SIGTERM_MASK && !killed) { sigs &= ~SIGTERM_MASK; my_log(LOG_INFO, "SIGTERM received. Emitting flows cache..."); scan_interval = 1; frag_lifetime = -1; active_lifetime = -1; inactive_lifetime = -1; emit_timeout = 1; unpending_timeout = 1; killed = 1; pthread_cond_signal(&scan_cond); pthread_cond_signal(&unpending_cond); } #if ((DEBUG) & DEBUG_I) if (sigs & SIGUSR1_MASK) { sigs &= ~SIGUSR1_MASK; info_debug(); } #endif } remove(pidfilepath); #if ((DEBUG) & DEBUG_I) info_debug(); #endif my_log(LOG_INFO, "Done."); #ifdef WALL return 0; #endif }