2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
12 /* stdout, stderr, freopen() */
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
31 struct sockaddr_nl local;
32 struct sockaddr_nl peer;
33 struct nlmsghdr* last_nlhdr;
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
48 #include <sys/param.h>
73 #include <sys/select.h>
79 #include <fprobe-ulog.h>
81 #include <my_getopt.h>
112 static struct getopt_parms parms[] = {
113 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
114 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
115 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
116 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
117 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
118 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
119 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
120 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
123 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 extern int optind, opterr, optopt;
143 extern struct NetFlow NetFlow1;
144 extern struct NetFlow NetFlow5;
145 extern struct NetFlow NetFlow7;
147 #define mark_is_tos parms[Mflag].count
148 static unsigned scan_interval = 5;
149 static int frag_lifetime = 30;
150 static int inactive_lifetime = 60;
151 static int active_lifetime = 300;
152 static int sockbufsize;
153 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
154 #if (MEM_BITS == 0) || (MEM_BITS == 16)
155 #define BULK_QUANTITY 10000
157 #define BULK_QUANTITY 200
160 static unsigned epoch_length=60, log_epochs=1;
161 static unsigned cur_epoch=0,prev_uptime=0;
163 static unsigned bulk_quantity = BULK_QUANTITY;
164 static unsigned pending_queue_length = 100;
165 static struct NetFlow *netflow = &NetFlow5;
166 static unsigned verbosity = 6;
167 static unsigned log_dest = MY_LOG_SYSLOG;
168 static struct Time start_time;
169 static long start_time_offset;
172 extern unsigned total_elements;
173 extern unsigned free_elements;
174 extern unsigned total_memory;
175 #if ((DEBUG) & DEBUG_I)
176 static unsigned emit_pkts, emit_queue;
177 static uint64_t size_total;
178 static unsigned pkts_total, pkts_total_fragmented;
179 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
180 static unsigned pkts_pending, pkts_pending_done;
181 static unsigned pending_queue_trace, pending_queue_trace_candidate;
182 static unsigned flows_total, flows_fragmented;
184 static unsigned emit_count;
185 static uint32_t emit_sequence;
186 static unsigned emit_rate_bytes, emit_rate_delay;
187 static struct Time emit_time;
188 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
189 static pthread_t thid;
190 static sigset_t sig_mask;
191 static struct sched_param schedp;
192 static int sched_min, sched_max;
193 static int npeers, npeers_rot;
194 static struct peer *peers;
197 static struct Flow *flows[1 << HASH_BITS];
198 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
200 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
203 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
204 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
205 static struct Flow *pending_head, *pending_tail;
206 static struct Flow *scan_frag_dreg;
208 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
209 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
210 static struct Flow *flows_emit;
212 static char ident[256] = "fprobe-ulog";
213 static FILE *pidfile;
214 static char *pidfilepath;
217 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
218 static struct ipulog_handle *ulog_handle;
219 static uint32_t ulog_gmask = 1;
220 static char *cap_buf;
221 static int nsnmp_rules;
222 static struct snmp_rule *snmp_rules;
223 static struct passwd *pw = 0;
228 "fprobe-ulog: a NetFlow probe. Version %s\n"
229 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
231 "-h\t\tDisplay this help\n"
232 "-U <mask>\tULOG group bitwise mask [1]\n"
233 "-s <seconds>\tHow often scan for expired flows [5]\n"
234 "-g <seconds>\tFragmented flow lifetime [30]\n"
235 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
236 "-f <filename>\tLog flow data in a file\n"
237 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
238 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
239 "-a <address>\tUse <address> as source for NetFlow flow\n"
240 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
241 "-M\t\tUse netfilter mark value as ToS flag\n"
242 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
243 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
244 "-q <flows>\tPending queue length [100]\n"
245 "-B <kilobytes>\tKernel capture buffer size [0]\n"
246 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
247 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
248 "-c <directory>\tDirectory to chroot to\n"
249 "-u <user>\tUser to run as\n"
250 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
251 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
252 "-y <remote:port>\tAddress of the NetFlow collector\n"
253 "-f <writable file>\tFile to write data into\n"
254 "-T <n>\tRotate log file every n epochs\n"
255 "-E <[1..60]>\tSize of an epoch in minutes\n",
256 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
260 #if ((DEBUG) & DEBUG_I)
263 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
264 pkts_total, pkts_total_fragmented, size_total,
265 pkts_pending - pkts_pending_done, pending_queue_trace);
266 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
267 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
268 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
269 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
270 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
271 total_elements, free_elements, total_memory);
275 void sighandler(int sig)
279 sigs |= SIGTERM_MASK;
281 #if ((DEBUG) & DEBUG_I)
283 sigs |= SIGUSR1_MASK;
289 void gettime(struct Time *now)
295 now->usec = t.tv_usec;
298 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
300 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
303 /* Uptime in miliseconds */
304 uint32_t getuptime(struct Time *t)
306 /* Maximum uptime is about 49/2 days */
307 return cmpmtime(t, &start_time);
310 hash_t hash_flow(struct Flow *flow)
312 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
313 else return hash(flow, sizeof(struct Flow_TL));
316 uint16_t snmp_index(char *name) {
319 if (!*name) return 0;
321 for (i = 0; (int) i < nsnmp_rules; i++) {
322 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
323 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
326 if ((i = if_nametoindex(name))) return i;
331 inline void copy_flow(struct Flow *src, struct Flow *dst)
338 dst->proto = src->proto;
339 dst->tcp_flags = src->tcp_flags;
343 dst->pkts = src->pkts;
344 dst->size = src->size;
345 dst->sizeF = src->sizeF;
346 dst->sizeP = src->sizeP;
347 dst->ctime = src->ctime;
348 dst->mtime = src->mtime;
349 dst->flags = src->flags;
352 unsigned get_log_fd(char *fname, unsigned cur_fd) {
357 cur_uptime = getuptime(&now);
359 /* Epoch lenght in minutes */
360 if ((cur_uptime - prev_uptime) > (1000 * 60 * epoch_length)) {
361 char nextname[MAX_PATH_LEN];
363 prev_uptime = cur_uptime;
364 cur_epoch = (cur_epoch + 1) % log_epochs;
366 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
367 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
368 fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
378 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
380 struct Flow **flowpp;
386 if (prev) flowpp = *prev;
389 if (where->sip.s_addr == what->sip.s_addr
390 && where->dip.s_addr == what->dip.s_addr
391 && where->proto == what->proto) {
392 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
394 /* Both unfragmented */
395 if ((what->sp == where->sp)
396 && (what->dp == where->dp)) goto done;
399 /* Both fragmented */
400 if (where->id == what->id) goto done;
404 flowpp = &where->next;
408 if (prev) *prev = flowpp;
412 int put_into(struct Flow *flow, int flag
413 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
420 struct Flow *flown, **flowpp;
421 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
426 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
427 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
430 pthread_mutex_lock(&flows_mutex[h]);
432 if (!(flown = find(flows[h], flow, &flowpp))) {
433 /* No suitable flow found - add */
434 if (flag == COPY_INTO) {
435 if ((flown = mem_alloc())) {
436 copy_flow(flow, flown);
439 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
440 my_log(LOG_ERR, "%s %s. %s",
441 "mem_alloc():", strerror(errno), "packet lost");
446 flow->next = flows[h];
448 #if ((DEBUG) & DEBUG_I)
450 if (flow->flags & FLOW_FRAG) flows_fragmented++;
452 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
454 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
459 /* Found suitable flow - update */
460 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
461 sprintf(buf, " +> %x", (unsigned) flown);
464 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
465 flown->mtime = flow->mtime;
466 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
467 flown->ctime = flow->ctime;
468 flown->tcp_flags |= flow->tcp_flags;
469 flown->size += flow->size;
470 flown->pkts += flow->pkts;
471 if (flow->flags & FLOW_FRAG) {
472 /* Fragmented flow require some additional work */
473 if (flow->flags & FLOW_TL) {
476 Several packets with FLOW_TL (attack)
478 flown->sp = flow->sp;
479 flown->dp = flow->dp;
481 if (flow->flags & FLOW_LASTFRAG) {
484 Several packets with FLOW_LASTFRAG (attack)
486 flown->sizeP = flow->sizeP;
488 flown->flags |= flow->flags;
489 flown->sizeF += flow->sizeF;
490 if ((flown->flags & FLOW_LASTFRAG)
491 && (flown->sizeF >= flown->sizeP)) {
492 /* All fragments received - flow reassembled */
493 *flowpp = flown->next;
494 pthread_mutex_unlock(&flows_mutex[h]);
495 #if ((DEBUG) & DEBUG_I)
500 flown->flags &= ~FLOW_FRAG;
501 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
504 ret = put_into(flown, MOVE_INTO
505 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
511 if (flag == MOVE_INTO) mem_free(flow);
513 pthread_mutex_unlock(&flows_mutex[h]);
517 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
521 for (i = 0; i < fields; i++) {
522 #if ((DEBUG) & DEBUG_F)
523 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
526 case NETFLOW_IPV4_SRC_ADDR:
527 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
528 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
531 case NETFLOW_IPV4_DST_ADDR:
532 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
533 p += NETFLOW_IPV4_DST_ADDR_SIZE;
536 case NETFLOW_INPUT_SNMP:
537 *((uint16_t *) p) = htons(flow->iif);
538 p += NETFLOW_INPUT_SNMP_SIZE;
541 case NETFLOW_OUTPUT_SNMP:
542 *((uint16_t *) p) = htons(flow->oif);
543 p += NETFLOW_OUTPUT_SNMP_SIZE;
546 case NETFLOW_PKTS_32:
547 *((uint32_t *) p) = htonl(flow->pkts);
548 p += NETFLOW_PKTS_32_SIZE;
551 case NETFLOW_BYTES_32:
552 *((uint32_t *) p) = htonl(flow->size);
553 p += NETFLOW_BYTES_32_SIZE;
556 case NETFLOW_FIRST_SWITCHED:
557 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
558 p += NETFLOW_FIRST_SWITCHED_SIZE;
561 case NETFLOW_LAST_SWITCHED:
562 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
563 p += NETFLOW_LAST_SWITCHED_SIZE;
566 case NETFLOW_L4_SRC_PORT:
567 *((uint16_t *) p) = flow->sp;
568 p += NETFLOW_L4_SRC_PORT_SIZE;
571 case NETFLOW_L4_DST_PORT:
572 *((uint16_t *) p) = flow->dp;
573 p += NETFLOW_L4_DST_PORT_SIZE;
577 *((uint8_t *) p) = flow->proto;
578 p += NETFLOW_PROT_SIZE;
581 case NETFLOW_SRC_TOS:
582 *((uint8_t *) p) = flow->tos;
583 p += NETFLOW_SRC_TOS_SIZE;
586 case NETFLOW_TCP_FLAGS:
587 *((uint8_t *) p) = flow->tcp_flags;
588 p += NETFLOW_TCP_FLAGS_SIZE;
591 case NETFLOW_VERSION:
592 *((uint16_t *) p) = htons(netflow->Version);
593 p += NETFLOW_VERSION_SIZE;
597 *((uint16_t *) p) = htons(emit_count);
598 p += NETFLOW_COUNT_SIZE;
602 *((uint32_t *) p) = htonl(getuptime(&emit_time));
603 p += NETFLOW_UPTIME_SIZE;
606 case NETFLOW_UNIX_SECS:
607 *((uint32_t *) p) = htonl(emit_time.sec);
608 p += NETFLOW_UNIX_SECS_SIZE;
611 case NETFLOW_UNIX_NSECS:
612 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
613 p += NETFLOW_UNIX_NSECS_SIZE;
616 case NETFLOW_FLOW_SEQUENCE:
617 //*((uint32_t *) p) = htonl(emit_sequence);
618 *((uint32_t *) p) = 0;
619 p += NETFLOW_FLOW_SEQUENCE_SIZE;
623 /* Unsupported (uint8_t) */
624 case NETFLOW_ENGINE_TYPE:
625 case NETFLOW_ENGINE_ID:
626 case NETFLOW_FLAGS7_1:
627 case NETFLOW_SRC_MASK:
628 case NETFLOW_DST_MASK:
629 *((uint8_t *) p) = 0;
630 p += NETFLOW_PAD8_SIZE;
632 case NETFLOW_PLANETLAB_XID:
633 *((uint16_t *) p) = flow->tos;
634 p += NETFLOW_PLANETLAB_XID_SIZE;
637 /* Unsupported (uint16_t) */
640 case NETFLOW_FLAGS7_2:
641 *((uint16_t *) p) = 0;
642 p += NETFLOW_PAD16_SIZE;
646 /* Unsupported (uint32_t) */
647 case NETFLOW_IPV4_NEXT_HOP:
648 case NETFLOW_ROUTER_SC:
649 *((uint32_t *) p) = 0;
650 p += NETFLOW_PAD32_SIZE;
654 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
655 format, i, format[i]);
659 #if ((DEBUG) & DEBUG_F)
660 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
667 Workaround for clone()-based threads
668 Try to change EUID independently of main thread
672 setregid(pw->pw_gid, pw->pw_gid);
673 setreuid(pw->pw_uid, pw->pw_uid);
682 struct timespec timeout;
683 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
685 p = (void *) &emit_packet + netflow->HeaderSize;
691 pthread_mutex_lock(&emit_mutex);
692 while (!flows_emit) {
693 gettimeofday(&now, 0);
694 timeout.tv_sec = now.tv_sec + emit_timeout;
695 /* Do not wait until emit_packet will filled - it may be too long */
696 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
697 pthread_mutex_unlock(&emit_mutex);
702 flows_emit = flows_emit->next;
703 #if ((DEBUG) & DEBUG_I)
706 pthread_mutex_unlock(&emit_mutex);
710 gettime(&start_time);
711 start_time.sec -= start_time_offset;
714 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
718 printf("Emit count = %d\n", emit_count);
721 if (emit_count == netflow->MaxFlows) {
724 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
725 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
726 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
727 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
729 for (i = 0; i < npeers; i++) {
730 if (peers[0].type == PEER_FILE) {
731 if (netflow->SeqOffset)
732 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
734 peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
735 ret = write(peers[0].write_fd, emit_packet, size);
737 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
738 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
739 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
743 #if ((DEBUG) & DEBUG_E)
745 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
746 emit_count, i + 1, peers[i].seq);
749 peers[0].seq += emit_count;
752 if (emit_rate_bytes) {
754 delay = sent / emit_rate_bytes;
756 sent %= emit_rate_bytes;
758 timeout.tv_nsec = emit_rate_delay * delay;
759 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
764 if (peers[i].type == PEER_MIRROR) goto sendreal;
766 if (peers[i].type == PEER_ROTATE)
767 if (peer_rot_cur++ == peer_rot_work) {
769 if (netflow->SeqOffset)
770 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
771 ret = send(peers[i].write_fd, emit_packet, size, 0);
773 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
774 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
775 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
778 #if ((DEBUG) & DEBUG_E)
780 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
781 emit_count, i + 1, peers[i].seq);
784 peers[i].seq += emit_count;
787 if (emit_rate_bytes) {
789 delay = sent / emit_rate_bytes;
791 sent %= emit_rate_bytes;
793 timeout.tv_nsec = emit_rate_delay * delay;
794 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
799 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
800 emit_sequence += emit_count;
802 #if ((DEBUG) & DEBUG_I)
809 void *unpending_thread()
812 struct timespec timeout;
813 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
820 pthread_mutex_lock(&unpending_mutex);
823 while (!(pending_tail->flags & FLOW_PENDING)) {
824 gettimeofday(&now, 0);
825 timeout.tv_sec = now.tv_sec + unpending_timeout;
826 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
829 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
832 if (put_into(pending_tail, COPY_INTO
833 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
837 #if ((DEBUG) & DEBUG_I)
838 pkts_lost_unpending++;
842 #if ((DEBUG) & DEBUG_U)
843 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
846 pending_tail->flags = 0;
847 pending_tail = pending_tail->next;
848 #if ((DEBUG) & DEBUG_I)
856 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
860 struct Flow *flow, **flowpp;
862 struct timespec timeout;
867 pthread_mutex_lock(&scan_mutex);
871 timeout.tv_sec = now.sec + scan_interval;
872 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
875 #if ((DEBUG) & DEBUG_S)
876 my_log(LOG_DEBUG, "S: %d", now.sec);
878 for (i = 0; i < 1 << HASH_BITS ; i++) {
879 pthread_mutex_lock(&flows_mutex[i]);
883 if (flow->flags & FLOW_FRAG) {
884 /* Process fragmented flow */
885 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
886 /* Fragmented flow expired - put it into special chain */
887 #if ((DEBUG) & DEBUG_I)
891 *flowpp = flow->next;
893 flow->flags &= ~FLOW_FRAG;
894 flow->next = scan_frag_dreg;
895 scan_frag_dreg = flow;
900 /* Flow is not frgamented */
901 if ((now.sec - flow->mtime.sec) > inactive_lifetime
902 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
904 #if ((DEBUG) & DEBUG_S)
905 my_log(LOG_DEBUG, "S: E %x", flow);
907 #if ((DEBUG) & DEBUG_I)
910 *flowpp = flow->next;
911 pthread_mutex_lock(&emit_mutex);
912 flow->next = flows_emit;
914 #if ((DEBUG) & DEBUG_I)
917 pthread_mutex_unlock(&emit_mutex);
922 flowpp = &flow->next;
925 pthread_mutex_unlock(&flows_mutex[i]);
927 if (flows_emit) pthread_cond_signal(&emit_cond);
929 while (scan_frag_dreg) {
930 flow = scan_frag_dreg;
931 scan_frag_dreg = flow->next;
932 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
935 put_into(flow, MOVE_INTO
936 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
940 #if ((DEBUG) & DEBUG_S)
941 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
949 struct ulog_packet_msg *ulog_msg;
953 int len, off_frag, psize;
954 #if ((DEBUG) & DEBUG_C)
962 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
964 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
967 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
969 #if ((DEBUG) & DEBUG_C)
970 sprintf(logbuf, "C: %d", ulog_msg->data_len);
973 nl = (void *) &ulog_msg->payload;
974 psize = ulog_msg->data_len;
977 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
978 #if ((DEBUG) & DEBUG_C)
979 strcat(logbuf, " U");
980 my_log(LOG_DEBUG, "%s", logbuf);
982 #if ((DEBUG) & DEBUG_I)
988 if (pending_head->flags) {
989 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
991 # if ((DEBUG) & DEBUG_C)
996 "pending queue full:", "packet lost");
998 #if ((DEBUG) & DEBUG_I)
1004 #if ((DEBUG) & DEBUG_I)
1008 flow = pending_head;
1010 /* ?FIXME? Add sanity check for ip_len? */
1011 flow->size = ntohs(nl->ip_len);
1012 #if ((DEBUG) & DEBUG_I)
1013 size_total += flow->size;
1016 flow->sip = nl->ip_src;
1017 flow->dip = nl->ip_dst;
1018 flow->iif = snmp_index(ulog_msg->indev_name);
1019 flow->oif = snmp_index(ulog_msg->outdev_name);
1020 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1021 flow->proto = nl->ip_p;
1023 flow->tcp_flags = 0;
1027 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1028 if (ulog_msg->timestamp_sec) {
1029 flow->ctime.sec = ulog_msg->timestamp_sec;
1030 flow->ctime.usec = ulog_msg->timestamp_usec;
1031 } else gettime(&flow->ctime);
1032 flow->mtime = flow->ctime;
1034 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1037 Offset (from network layer) to transport layer header/IP data
1038 IOW IP header size ;-)
1041 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1043 off_tl = nl->ip_hl << 2;
1044 tl = (void *) nl + off_tl;
1046 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1047 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1049 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1050 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1052 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1053 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1054 #if ((DEBUG) & DEBUG_C)
1055 strcat(logbuf, " F");
1057 #if ((DEBUG) & DEBUG_I)
1058 pkts_total_fragmented++;
1060 flow->flags |= FLOW_FRAG;
1061 flow->id = nl->ip_id;
1063 if (!(ntohs(nl->ip_off) & IP_MF)) {
1064 /* Packet whith IP_MF contains information about whole datagram size */
1065 flow->flags |= FLOW_LASTFRAG;
1066 /* size = frag_offset*8 + data_size */
1067 flow->sizeP = off_frag + flow->sizeF;
1071 #if ((DEBUG) & DEBUG_C)
1072 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1073 strcat(logbuf, buf);
1074 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1075 strcat(logbuf, buf);
1079 Fortunately most interesting transport layer information fit
1080 into first 8 bytes of IP data field (minimal nonzero size).
1081 Thus we don't need actual packet reassembling to build whole
1082 transport layer data. We only check the fragment offset for
1083 zero value to find packet with this information.
1085 if (!off_frag && psize >= 8) {
1086 switch (flow->proto) {
1089 flow->sp = ((struct udphdr *)tl)->uh_sport;
1090 flow->dp = ((struct udphdr *)tl)->uh_dport;
1095 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1096 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1099 #ifdef ICMP_TRICK_CISCO
1101 flow->dp = *((int32_t *) tl);
1106 /* Unknown transport layer */
1107 #if ((DEBUG) & DEBUG_C)
1108 strcat(logbuf, " U");
1115 #if ((DEBUG) & DEBUG_C)
1116 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1117 strcat(logbuf, buf);
1119 flow->flags |= FLOW_TL;
1123 /* Check for tcp flags presence (including CWR and ECE). */
1124 if (flow->proto == IPPROTO_TCP
1126 && psize >= 16 - off_frag) {
1127 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1128 #if ((DEBUG) & DEBUG_C)
1129 sprintf(buf, " TCP:%x", flow->tcp_flags);
1130 strcat(logbuf, buf);
1134 #if ((DEBUG) & DEBUG_C)
1135 sprintf(buf, " => %x", (unsigned) flow);
1136 strcat(logbuf, buf);
1137 my_log(LOG_DEBUG, "%s", logbuf);
1140 #if ((DEBUG) & DEBUG_I)
1142 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1143 if (pending_queue_trace < pending_queue_trace_candidate)
1144 pending_queue_trace = pending_queue_trace_candidate;
1147 /* Flow complete - inform unpending_thread() about it */
1148 pending_head->flags |= FLOW_PENDING;
1149 pending_head = pending_head->next;
1151 pthread_cond_signal(&unpending_cond);
1157 int main(int argc, char **argv)
1160 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1161 int c, i, write_fd, memory_limit = 0;
1162 struct addrinfo hints, *res;
1163 struct sockaddr_in saddr;
1164 pthread_attr_t tattr;
1165 struct sigaction sigact;
1166 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1167 struct timeval timeout;
1169 sched_min = sched_get_priority_min(SCHED);
1170 sched_max = sched_get_priority_max(SCHED);
1172 memset(&saddr, 0 , sizeof(saddr));
1173 memset(&hints, 0 , sizeof(hints));
1174 hints.ai_flags = AI_PASSIVE;
1175 hints.ai_family = AF_INET;
1176 hints.ai_socktype = SOCK_DGRAM;
1178 /* Process command line options */
1181 while ((c = my_getopt(argc, argv, parms)) != -1) {
1191 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1192 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1193 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1194 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1195 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1196 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1197 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1198 if (parms[nflag].count) {
1199 switch (atoi(parms[nflag].arg)) {
1201 netflow = &NetFlow1;
1208 netflow = &NetFlow7;
1212 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1216 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1217 if (parms[lflag].count) {
1218 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1221 sprintf(errpbuf, "[%s]", log_suffix);
1222 strcat(ident, errpbuf);
1225 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1226 if (log_suffix) *--log_suffix = ':';
1228 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1230 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1233 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1234 if (parms[qflag].count) {
1235 pending_queue_length = atoi(parms[qflag].arg);
1236 if (pending_queue_length < 1) {
1237 fprintf(stderr, "Illegal %s\n", "pending queue length");
1241 if (parms[rflag].count) {
1242 schedp.sched_priority = atoi(parms[rflag].arg);
1243 if (schedp.sched_priority
1244 && (schedp.sched_priority < sched_min
1245 || schedp.sched_priority > sched_max)) {
1246 fprintf(stderr, "Illegal %s\n", "realtime priority");
1250 if (parms[Bflag].count) {
1251 sockbufsize = atoi(parms[Bflag].arg) << 10;
1253 if (parms[bflag].count) {
1254 bulk_quantity = atoi(parms[bflag].arg);
1255 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1256 fprintf(stderr, "Illegal %s\n", "bulk size");
1260 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1261 if (parms[Xflag].count) {
1262 for(i = 0; parms[Xflag].arg[i]; i++)
1263 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1264 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1266 rule = strtok(parms[Xflag].arg, ":");
1267 for (i = 0; rule; i++) {
1268 snmp_rules[i].len = strlen(rule);
1269 if (snmp_rules[i].len > IFNAMSIZ) {
1270 fprintf(stderr, "Illegal %s\n", "interface basename");
1273 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1274 if (!*(rule - 1)) *(rule - 1) = ',';
1275 rule = strtok(NULL, ",");
1277 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1280 snmp_rules[i].base = atoi(rule);
1282 rule = strtok(NULL, ":");
1286 if (parms[tflag].count)
1287 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1288 if (parms[aflag].count) {
1289 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1291 fprintf(stderr, "Illegal %s\n", "source address");
1294 saddr = *((struct sockaddr_in *) res->ai_addr);
1298 if (parms[uflag].count)
1299 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1300 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1305 /* Process collectors parameters. Brrrr... :-[ */
1307 npeers = argc - optind;
1309 /* Send to remote Netflow collector */
1310 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1311 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1313 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1315 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1316 fprintf(stderr, "socket(): %s\n", strerror(errno));
1319 peers[npeers].write_fd = write_fd;
1320 peers[npeers].type = PEER_MIRROR;
1321 peers[npeers].laddr = saddr;
1322 peers[npeers].seq = 0;
1323 if ((lhost = strchr(dport, '/'))) {
1325 if ((type = strchr(lhost, '/'))) {
1333 peers[npeers].type = PEER_ROTATE;
1342 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1343 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1347 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1348 sizeof(struct sockaddr_in))) {
1349 fprintf(stderr, "bind(): %s\n", strerror(errno));
1352 if (getaddrinfo(dhost, dport, &hints, &res)) {
1354 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1357 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1359 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1360 sizeof(struct sockaddr_in))) {
1361 fprintf(stderr, "connect(): %s\n", strerror(errno));
1365 /* Restore command line */
1366 if (type) *--type = '/';
1367 if (lhost) *--lhost = '/';
1371 else if (parms[fflag].count) {
1373 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1374 if (!(peers[0].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1375 strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
1377 peers[0].write_fd = -1;
1378 peers[0].type = PEER_FILE;
1386 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1387 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1389 fprintf(stderr, "libipulog initialization error: %s",
1390 ipulog_strerror(ipulog_errno));
1394 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1395 &sockbufsize, sizeof(sockbufsize)) < 0)
1396 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1398 /* Daemonize (if log destination stdout-free) */
1400 my_log_open(ident, verbosity, log_dest);
1401 if (!(log_dest & 2)) {
1404 fprintf(stderr, "fork(): %s", strerror(errno));
1409 freopen("/dev/null", "r", stdin);
1410 freopen("/dev/null", "w", stdout);
1411 freopen("/dev/null", "w", stderr);
1418 setvbuf(stdout, (char *)0, _IONBF, 0);
1419 setvbuf(stderr, (char *)0, _IONBF, 0);
1423 sprintf(errpbuf, "[%ld]", (long) pid);
1424 strcat(ident, errpbuf);
1426 /* Initialization */
1428 hash_init(); /* Actually for crc16 only */
1429 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1430 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1433 /* Hope 12 days is enough :-/ */
1434 start_time_offset = 1 << 20;
1436 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1438 gettime(&start_time);
1441 Build static pending queue as circular buffer.
1443 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1444 pending_tail = pending_head;
1445 for (i = pending_queue_length - 1; i--;) {
1446 if (!(pending_tail->next = mem_alloc())) {
1448 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1451 pending_tail = pending_tail->next;
1453 pending_tail->next = pending_head;
1454 pending_tail = pending_head;
1456 sigemptyset(&sig_mask);
1457 sigact.sa_handler = &sighandler;
1458 sigact.sa_mask = sig_mask;
1459 sigact.sa_flags = 0;
1460 sigaddset(&sig_mask, SIGTERM);
1461 sigaction(SIGTERM, &sigact, 0);
1462 #if ((DEBUG) & DEBUG_I)
1463 sigaddset(&sig_mask, SIGUSR1);
1464 sigaction(SIGUSR1, &sigact, 0);
1466 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1467 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1471 my_log(LOG_INFO, "Starting %s...", VERSION);
1473 if (parms[cflag].count) {
1474 if (chdir(parms[cflag].arg) || chroot(".")) {
1475 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1480 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1481 pthread_attr_init(&tattr);
1482 for (i = 0; i < THREADS - 1; i++) {
1483 if (schedp.sched_priority > 0) {
1484 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1485 (pthread_attr_setschedparam(&tattr, &schedp))) {
1486 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1490 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1491 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1494 pthread_detach(thid);
1495 schedp.sched_priority++;
1499 if (setgroups(0, NULL)) {
1500 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1503 if (setregid(pw->pw_gid, pw->pw_gid)) {
1504 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1507 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1508 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1513 if (!(pidfile = fopen(pidfilepath, "w")))
1514 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1516 fprintf(pidfile, "%ld\n", (long) pid);
1520 my_log(LOG_INFO, "pid: %d", pid);
1521 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1522 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1523 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1524 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1525 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1526 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1527 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1528 for (i = 0; i < nsnmp_rules; i++) {
1529 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1530 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1532 for (i = 0; i < npeers; i++) {
1533 switch (peers[i].type) {
1541 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1542 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1543 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1546 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1548 timeout.tv_usec = 0;
1550 || (total_elements - free_elements - pending_queue_length)
1552 || pending_tail->flags) {
1555 timeout.tv_sec = scan_interval;
1556 select(0, 0, 0, 0, &timeout);
1559 if (sigs & SIGTERM_MASK && !killed) {
1560 sigs &= ~SIGTERM_MASK;
1561 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1564 active_lifetime = -1;
1565 inactive_lifetime = -1;
1567 unpending_timeout = 1;
1569 pthread_cond_signal(&scan_cond);
1570 pthread_cond_signal(&unpending_cond);
1573 #if ((DEBUG) & DEBUG_I)
1574 if (sigs & SIGUSR1_MASK) {
1575 sigs &= ~SIGUSR1_MASK;
1580 remove(pidfilepath);
1581 #if ((DEBUG) & DEBUG_I)
1584 my_log(LOG_INFO, "Done.");