2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
12 /* stdout, stderr, freopen() */
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
31 struct sockaddr_nl local;
32 struct sockaddr_nl peer;
33 struct nlmsghdr* last_nlhdr;
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
48 #include <sys/param.h>
73 #include <sys/select.h>
79 #include <fprobe-ulog.h>
81 #include <my_getopt.h>
110 static struct getopt_parms parms[] = {
111 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
112 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
113 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
114 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
115 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
116 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
117 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
118 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
120 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
123 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 extern int optind, opterr, optopt;
139 extern struct NetFlow NetFlow1;
140 extern struct NetFlow NetFlow5;
141 extern struct NetFlow NetFlow7;
143 #define mark_is_tos parms[Mflag].count
144 static unsigned scan_interval = 5;
145 static int frag_lifetime = 30;
146 static int inactive_lifetime = 60;
147 static int active_lifetime = 300;
148 static int sockbufsize;
149 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
150 #if (MEM_BITS == 0) || (MEM_BITS == 16)
151 #define BULK_QUANTITY 10000
153 #define BULK_QUANTITY 200
155 static unsigned bulk_quantity = BULK_QUANTITY;
156 static unsigned pending_queue_length = 100;
157 static struct NetFlow *netflow = &NetFlow5;
158 static unsigned verbosity = 6;
159 static unsigned log_dest = MY_LOG_SYSLOG;
160 static struct Time start_time;
161 static long start_time_offset;
164 extern unsigned total_elements;
165 extern unsigned free_elements;
166 extern unsigned total_memory;
167 #if ((DEBUG) & DEBUG_I)
168 static unsigned emit_pkts, emit_queue;
169 static uint64_t size_total;
170 static unsigned pkts_total, pkts_total_fragmented;
171 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
172 static unsigned pkts_pending, pkts_pending_done;
173 static unsigned pending_queue_trace, pending_queue_trace_candidate;
174 static unsigned flows_total, flows_fragmented;
176 static unsigned emit_count;
177 static uint32_t emit_sequence;
178 static unsigned emit_rate_bytes, emit_rate_delay;
179 static struct Time emit_time;
180 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
181 static pthread_t thid;
182 static sigset_t sig_mask;
183 static struct sched_param schedp;
184 static int sched_min, sched_max;
185 static int npeers, npeers_rot;
186 static struct peer *peers;
189 static struct Flow *flows[1 << HASH_BITS];
190 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
192 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
193 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
195 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
196 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
197 static struct Flow *pending_head, *pending_tail;
198 static struct Flow *scan_frag_dreg;
200 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
202 static struct Flow *flows_emit;
204 static char ident[256] = "fprobe-ulog";
205 static FILE *pidfile;
206 static char *pidfilepath;
209 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
210 static struct ipulog_handle *ulog_handle;
211 static uint32_t ulog_gmask = 1;
212 static char *cap_buf;
213 static int nsnmp_rules;
214 static struct snmp_rule *snmp_rules;
215 static struct passwd *pw = 0;
220 "fprobe-ulog: a NetFlow probe. Version %s\n"
221 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
223 "-h\t\tDisplay this help\n"
224 "-U <mask>\tULOG group bitwise mask [1]\n"
225 "-s <seconds>\tHow often scan for expired flows [5]\n"
226 "-g <seconds>\tFragmented flow lifetime [30]\n"
227 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
228 "-f <filename>\tLog flow data in a file\n"
229 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
230 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
231 "-a <address>\tUse <address> as source for NetFlow flow\n"
232 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
233 "-M\t\tUse netfilter mark value as ToS flag\n"
234 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
235 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
236 "-q <flows>\tPending queue length [100]\n"
237 "-B <kilobytes>\tKernel capture buffer size [0]\n"
238 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
239 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
240 "-c <directory>\tDirectory to chroot to\n"
241 "-u <user>\tUser to run as\n"
242 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
243 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
244 "-y <remote:port>\tAddress of the NetFlow collector\n",
245 "-f <writable file>\tFile to write data into\n"
246 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
250 #if ((DEBUG) & DEBUG_I)
253 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
254 pkts_total, pkts_total_fragmented, size_total,
255 pkts_pending - pkts_pending_done, pending_queue_trace);
256 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
257 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
258 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
259 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
260 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
261 total_elements, free_elements, total_memory);
265 void sighandler(int sig)
269 sigs |= SIGTERM_MASK;
271 #if ((DEBUG) & DEBUG_I)
273 sigs |= SIGUSR1_MASK;
279 void gettime(struct Time *now)
285 now->usec = t.tv_usec;
288 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
290 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
293 /* Uptime in miliseconds */
294 uint32_t getuptime(struct Time *t)
296 /* Maximum uptime is about 49/2 days */
297 return cmpmtime(t, &start_time);
300 hash_t hash_flow(struct Flow *flow)
302 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
303 else return hash(flow, sizeof(struct Flow_TL));
306 uint16_t snmp_index(char *name) {
309 if (!*name) return 0;
311 for (i = 0; (int) i < nsnmp_rules; i++) {
312 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
313 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
316 if ((i = if_nametoindex(name))) return i;
321 inline void copy_flow(struct Flow *src, struct Flow *dst)
328 dst->proto = src->proto;
329 dst->tcp_flags = src->tcp_flags;
333 dst->pkts = src->pkts;
334 dst->size = src->size;
335 dst->sizeF = src->sizeF;
336 dst->sizeP = src->sizeP;
337 dst->ctime = src->ctime;
338 dst->mtime = src->mtime;
339 dst->flags = src->flags;
342 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
344 struct Flow **flowpp;
350 if (prev) flowpp = *prev;
353 if (where->sip.s_addr == what->sip.s_addr
354 && where->dip.s_addr == what->dip.s_addr
355 && where->proto == what->proto) {
356 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
358 /* Both unfragmented */
359 if ((what->sp == where->sp)
360 && (what->dp == where->dp)) goto done;
363 /* Both fragmented */
364 if (where->id == what->id) goto done;
368 flowpp = &where->next;
372 if (prev) *prev = flowpp;
376 int put_into(struct Flow *flow, int flag
377 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
384 struct Flow *flown, **flowpp;
385 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
390 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
391 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
394 pthread_mutex_lock(&flows_mutex[h]);
396 if (!(flown = find(flows[h], flow, &flowpp))) {
397 /* No suitable flow found - add */
398 if (flag == COPY_INTO) {
399 if ((flown = mem_alloc())) {
400 copy_flow(flow, flown);
403 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
404 my_log(LOG_ERR, "%s %s. %s",
405 "mem_alloc():", strerror(errno), "packet lost");
410 flow->next = flows[h];
412 #if ((DEBUG) & DEBUG_I)
414 if (flow->flags & FLOW_FRAG) flows_fragmented++;
416 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
418 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
423 /* Found suitable flow - update */
424 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
425 sprintf(buf, " +> %x", (unsigned) flown);
428 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
429 flown->mtime = flow->mtime;
430 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
431 flown->ctime = flow->ctime;
432 flown->tcp_flags |= flow->tcp_flags;
433 flown->size += flow->size;
434 flown->pkts += flow->pkts;
435 if (flow->flags & FLOW_FRAG) {
436 /* Fragmented flow require some additional work */
437 if (flow->flags & FLOW_TL) {
440 Several packets with FLOW_TL (attack)
442 flown->sp = flow->sp;
443 flown->dp = flow->dp;
445 if (flow->flags & FLOW_LASTFRAG) {
448 Several packets with FLOW_LASTFRAG (attack)
450 flown->sizeP = flow->sizeP;
452 flown->flags |= flow->flags;
453 flown->sizeF += flow->sizeF;
454 if ((flown->flags & FLOW_LASTFRAG)
455 && (flown->sizeF >= flown->sizeP)) {
456 /* All fragments received - flow reassembled */
457 *flowpp = flown->next;
458 pthread_mutex_unlock(&flows_mutex[h]);
459 #if ((DEBUG) & DEBUG_I)
464 flown->flags &= ~FLOW_FRAG;
465 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
468 ret = put_into(flown, MOVE_INTO
469 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
475 if (flag == MOVE_INTO) mem_free(flow);
477 pthread_mutex_unlock(&flows_mutex[h]);
481 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
485 for (i = 0; i < fields; i++) {
486 #if ((DEBUG) & DEBUG_F)
487 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
490 case NETFLOW_IPV4_SRC_ADDR:
491 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
492 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
495 case NETFLOW_IPV4_DST_ADDR:
496 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
497 p += NETFLOW_IPV4_DST_ADDR_SIZE;
500 case NETFLOW_INPUT_SNMP:
501 *((uint16_t *) p) = htons(flow->iif);
502 p += NETFLOW_INPUT_SNMP_SIZE;
505 case NETFLOW_OUTPUT_SNMP:
506 *((uint16_t *) p) = htons(flow->oif);
507 p += NETFLOW_OUTPUT_SNMP_SIZE;
510 case NETFLOW_PKTS_32:
511 *((uint32_t *) p) = htonl(flow->pkts);
512 p += NETFLOW_PKTS_32_SIZE;
515 case NETFLOW_BYTES_32:
516 *((uint32_t *) p) = htonl(flow->size);
517 p += NETFLOW_BYTES_32_SIZE;
520 case NETFLOW_FIRST_SWITCHED:
521 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
522 p += NETFLOW_FIRST_SWITCHED_SIZE;
525 case NETFLOW_LAST_SWITCHED:
526 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
527 p += NETFLOW_LAST_SWITCHED_SIZE;
530 case NETFLOW_L4_SRC_PORT:
531 *((uint16_t *) p) = flow->sp;
532 p += NETFLOW_L4_SRC_PORT_SIZE;
535 case NETFLOW_L4_DST_PORT:
536 *((uint16_t *) p) = flow->dp;
537 p += NETFLOW_L4_DST_PORT_SIZE;
541 *((uint8_t *) p) = flow->proto;
542 p += NETFLOW_PROT_SIZE;
545 case NETFLOW_SRC_TOS:
546 *((uint8_t *) p) = flow->tos;
547 p += NETFLOW_SRC_TOS_SIZE;
550 case NETFLOW_TCP_FLAGS:
551 *((uint8_t *) p) = flow->tcp_flags;
552 p += NETFLOW_TCP_FLAGS_SIZE;
555 case NETFLOW_VERSION:
556 *((uint16_t *) p) = htons(netflow->Version);
557 p += NETFLOW_VERSION_SIZE;
561 *((uint16_t *) p) = htons(emit_count);
562 p += NETFLOW_COUNT_SIZE;
566 *((uint32_t *) p) = htonl(getuptime(&emit_time));
567 p += NETFLOW_UPTIME_SIZE;
570 case NETFLOW_UNIX_SECS:
571 *((uint32_t *) p) = htonl(emit_time.sec);
572 p += NETFLOW_UNIX_SECS_SIZE;
575 case NETFLOW_UNIX_NSECS:
576 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
577 p += NETFLOW_UNIX_NSECS_SIZE;
580 case NETFLOW_FLOW_SEQUENCE:
581 //*((uint32_t *) p) = htonl(emit_sequence);
582 *((uint32_t *) p) = 0;
583 p += NETFLOW_FLOW_SEQUENCE_SIZE;
587 /* Unsupported (uint8_t) */
588 case NETFLOW_ENGINE_TYPE:
589 case NETFLOW_ENGINE_ID:
590 case NETFLOW_FLAGS7_1:
591 case NETFLOW_SRC_MASK:
592 case NETFLOW_DST_MASK:
593 *((uint8_t *) p) = 0;
594 p += NETFLOW_PAD8_SIZE;
598 /* Unsupported (uint16_t) */
601 case NETFLOW_FLAGS7_2:
602 *((uint16_t *) p) = 0;
603 p += NETFLOW_PAD16_SIZE;
607 /* Unsupported (uint32_t) */
608 case NETFLOW_IPV4_NEXT_HOP:
609 case NETFLOW_ROUTER_SC:
610 *((uint32_t *) p) = 0;
611 p += NETFLOW_PAD32_SIZE;
615 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
616 format, i, format[i]);
620 #if ((DEBUG) & DEBUG_F)
621 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
628 Workaround for clone()-based threads
629 Try to change EUID independently of main thread
633 setregid(pw->pw_gid, pw->pw_gid);
634 setreuid(pw->pw_uid, pw->pw_uid);
643 struct timespec timeout;
644 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
646 p = (void *) &emit_packet + netflow->HeaderSize;
652 pthread_mutex_lock(&emit_mutex);
653 while (!flows_emit) {
654 gettimeofday(&now, 0);
655 timeout.tv_sec = now.tv_sec + emit_timeout;
656 /* Do not wait until emit_packet will filled - it may be too long */
657 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
658 pthread_mutex_unlock(&emit_mutex);
663 flows_emit = flows_emit->next;
664 #if ((DEBUG) & DEBUG_I)
667 pthread_mutex_unlock(&emit_mutex);
671 gettime(&start_time);
672 start_time.sec -= start_time_offset;
675 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
679 printf("Emit count = %d\n", emit_count);
682 if (emit_count == netflow->MaxFlows) {
685 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
686 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
687 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
688 if (size < 1464) size = 1464;
690 for (i = 0; i < npeers; i++) {
691 if (peers[0].type == PEER_FILE) {
692 if (netflow->SeqOffset)
693 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
695 ret = write(peers[0].write_fd, emit_packet, size);
697 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
698 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
699 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
703 #if ((DEBUG) & DEBUG_E)
705 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
706 emit_count, i + 1, peers[i].seq);
709 peers[0].seq += emit_count;
712 if (emit_rate_bytes) {
714 delay = sent / emit_rate_bytes;
716 sent %= emit_rate_bytes;
718 timeout.tv_nsec = emit_rate_delay * delay;
719 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
724 if (peers[i].type == PEER_MIRROR) goto sendreal;
726 if (peers[i].type == PEER_ROTATE)
727 if (peer_rot_cur++ == peer_rot_work) {
729 if (netflow->SeqOffset)
730 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
731 ret = send(peers[i].write_fd, emit_packet, size, 0);
733 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
734 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
735 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
738 #if ((DEBUG) & DEBUG_E)
740 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
741 emit_count, i + 1, peers[i].seq);
744 peers[i].seq += emit_count;
747 if (emit_rate_bytes) {
749 delay = sent / emit_rate_bytes;
751 sent %= emit_rate_bytes;
753 timeout.tv_nsec = emit_rate_delay * delay;
754 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
759 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
760 emit_sequence += emit_count;
762 #if ((DEBUG) & DEBUG_I)
769 void *unpending_thread()
772 struct timespec timeout;
773 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
780 pthread_mutex_lock(&unpending_mutex);
783 while (!(pending_tail->flags & FLOW_PENDING)) {
784 gettimeofday(&now, 0);
785 timeout.tv_sec = now.tv_sec + unpending_timeout;
786 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
789 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
792 if (put_into(pending_tail, COPY_INTO
793 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
797 #if ((DEBUG) & DEBUG_I)
798 pkts_lost_unpending++;
802 #if ((DEBUG) & DEBUG_U)
803 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
806 pending_tail->flags = 0;
807 pending_tail = pending_tail->next;
808 #if ((DEBUG) & DEBUG_I)
816 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
820 struct Flow *flow, **flowpp;
822 struct timespec timeout;
827 pthread_mutex_lock(&scan_mutex);
831 timeout.tv_sec = now.sec + scan_interval;
832 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
835 #if ((DEBUG) & DEBUG_S)
836 my_log(LOG_DEBUG, "S: %d", now.sec);
838 for (i = 0; i < 1 << HASH_BITS ; i++) {
839 pthread_mutex_lock(&flows_mutex[i]);
843 if (flow->flags & FLOW_FRAG) {
844 /* Process fragmented flow */
845 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
846 /* Fragmented flow expired - put it into special chain */
847 #if ((DEBUG) & DEBUG_I)
851 *flowpp = flow->next;
853 flow->flags &= ~FLOW_FRAG;
854 flow->next = scan_frag_dreg;
855 scan_frag_dreg = flow;
860 /* Flow is not frgamented */
861 if ((now.sec - flow->mtime.sec) > inactive_lifetime
862 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
864 #if ((DEBUG) & DEBUG_S)
865 my_log(LOG_DEBUG, "S: E %x", flow);
867 #if ((DEBUG) & DEBUG_I)
870 *flowpp = flow->next;
871 pthread_mutex_lock(&emit_mutex);
872 flow->next = flows_emit;
874 #if ((DEBUG) & DEBUG_I)
877 pthread_mutex_unlock(&emit_mutex);
882 flowpp = &flow->next;
885 pthread_mutex_unlock(&flows_mutex[i]);
887 if (flows_emit) pthread_cond_signal(&emit_cond);
889 while (scan_frag_dreg) {
890 flow = scan_frag_dreg;
891 scan_frag_dreg = flow->next;
892 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
895 put_into(flow, MOVE_INTO
896 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
900 #if ((DEBUG) & DEBUG_S)
901 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
909 struct ulog_packet_msg *ulog_msg;
913 int len, off_frag, psize;
914 #if ((DEBUG) & DEBUG_C)
922 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
924 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
927 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
929 #if ((DEBUG) & DEBUG_C)
930 sprintf(logbuf, "C: %d", ulog_msg->data_len);
933 nl = (void *) &ulog_msg->payload;
934 psize = ulog_msg->data_len;
937 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
938 #if ((DEBUG) & DEBUG_C)
939 strcat(logbuf, " U");
940 my_log(LOG_DEBUG, "%s", logbuf);
942 #if ((DEBUG) & DEBUG_I)
948 if (pending_head->flags) {
949 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
951 # if ((DEBUG) & DEBUG_C)
956 "pending queue full:", "packet lost");
958 #if ((DEBUG) & DEBUG_I)
964 #if ((DEBUG) & DEBUG_I)
970 /* ?FIXME? Add sanity check for ip_len? */
971 flow->size = ntohs(nl->ip_len);
972 #if ((DEBUG) & DEBUG_I)
973 size_total += flow->size;
976 flow->sip = nl->ip_src;
977 flow->dip = nl->ip_dst;
978 flow->iif = snmp_index(ulog_msg->indev_name);
979 flow->oif = snmp_index(ulog_msg->outdev_name);
980 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
981 flow->proto = nl->ip_p;
987 /* Packets captured from OUTPUT table didn't contains valid timestamp */
988 if (ulog_msg->timestamp_sec) {
989 flow->ctime.sec = ulog_msg->timestamp_sec;
990 flow->ctime.usec = ulog_msg->timestamp_usec;
991 } else gettime(&flow->ctime);
992 flow->mtime = flow->ctime;
994 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
997 Offset (from network layer) to transport layer header/IP data
998 IOW IP header size ;-)
1001 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1003 off_tl = nl->ip_hl << 2;
1004 tl = (void *) nl + off_tl;
1006 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1007 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1009 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1010 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1012 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1013 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1014 #if ((DEBUG) & DEBUG_C)
1015 strcat(logbuf, " F");
1017 #if ((DEBUG) & DEBUG_I)
1018 pkts_total_fragmented++;
1020 flow->flags |= FLOW_FRAG;
1021 flow->id = nl->ip_id;
1023 if (!(ntohs(nl->ip_off) & IP_MF)) {
1024 /* Packet whith IP_MF contains information about whole datagram size */
1025 flow->flags |= FLOW_LASTFRAG;
1026 /* size = frag_offset*8 + data_size */
1027 flow->sizeP = off_frag + flow->sizeF;
1031 #if ((DEBUG) & DEBUG_C)
1032 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1033 strcat(logbuf, buf);
1034 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1035 strcat(logbuf, buf);
1039 Fortunately most interesting transport layer information fit
1040 into first 8 bytes of IP data field (minimal nonzero size).
1041 Thus we don't need actual packet reassembling to build whole
1042 transport layer data. We only check the fragment offset for
1043 zero value to find packet with this information.
1045 if (!off_frag && psize >= 8) {
1046 switch (flow->proto) {
1049 flow->sp = ((struct udphdr *)tl)->uh_sport;
1050 flow->dp = ((struct udphdr *)tl)->uh_dport;
1055 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1056 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1059 #ifdef ICMP_TRICK_CISCO
1061 flow->dp = *((int32_t *) tl);
1066 /* Unknown transport layer */
1067 #if ((DEBUG) & DEBUG_C)
1068 strcat(logbuf, " U");
1075 #if ((DEBUG) & DEBUG_C)
1076 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1077 strcat(logbuf, buf);
1079 flow->flags |= FLOW_TL;
1083 /* Check for tcp flags presence (including CWR and ECE). */
1084 if (flow->proto == IPPROTO_TCP
1086 && psize >= 16 - off_frag) {
1087 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1088 #if ((DEBUG) & DEBUG_C)
1089 sprintf(buf, " TCP:%x", flow->tcp_flags);
1090 strcat(logbuf, buf);
1094 #if ((DEBUG) & DEBUG_C)
1095 sprintf(buf, " => %x", (unsigned) flow);
1096 strcat(logbuf, buf);
1097 my_log(LOG_DEBUG, "%s", logbuf);
1100 #if ((DEBUG) & DEBUG_I)
1102 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1103 if (pending_queue_trace < pending_queue_trace_candidate)
1104 pending_queue_trace = pending_queue_trace_candidate;
1107 /* Flow complete - inform unpending_thread() about it */
1108 pending_head->flags |= FLOW_PENDING;
1109 pending_head = pending_head->next;
1111 pthread_cond_signal(&unpending_cond);
1117 int main(int argc, char **argv)
1120 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1121 int c, i, write_fd, memory_limit = 0;
1122 struct addrinfo hints, *res;
1123 struct sockaddr_in saddr;
1124 pthread_attr_t tattr;
1125 struct sigaction sigact;
1126 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1127 struct timeval timeout;
1129 sched_min = sched_get_priority_min(SCHED);
1130 sched_max = sched_get_priority_max(SCHED);
1132 memset(&saddr, 0 , sizeof(saddr));
1133 memset(&hints, 0 , sizeof(hints));
1134 hints.ai_flags = AI_PASSIVE;
1135 hints.ai_family = AF_INET;
1136 hints.ai_socktype = SOCK_DGRAM;
1138 /* Process command line options */
1141 while ((c = my_getopt(argc, argv, parms)) != -1) {
1151 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1152 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1153 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1154 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1155 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1156 if (parms[nflag].count) {
1157 switch (atoi(parms[nflag].arg)) {
1159 netflow = &NetFlow1;
1166 netflow = &NetFlow7;
1170 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1174 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1175 if (parms[lflag].count) {
1176 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1179 sprintf(errpbuf, "[%s]", log_suffix);
1180 strcat(ident, errpbuf);
1183 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1184 if (log_suffix) *--log_suffix = ':';
1186 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1188 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1191 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1192 if (parms[qflag].count) {
1193 pending_queue_length = atoi(parms[qflag].arg);
1194 if (pending_queue_length < 1) {
1195 fprintf(stderr, "Illegal %s\n", "pending queue length");
1199 if (parms[rflag].count) {
1200 schedp.sched_priority = atoi(parms[rflag].arg);
1201 if (schedp.sched_priority
1202 && (schedp.sched_priority < sched_min
1203 || schedp.sched_priority > sched_max)) {
1204 fprintf(stderr, "Illegal %s\n", "realtime priority");
1208 if (parms[Bflag].count) {
1209 sockbufsize = atoi(parms[Bflag].arg) << 10;
1211 if (parms[bflag].count) {
1212 bulk_quantity = atoi(parms[bflag].arg);
1213 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1214 fprintf(stderr, "Illegal %s\n", "bulk size");
1218 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1219 if (parms[Xflag].count) {
1220 for(i = 0; parms[Xflag].arg[i]; i++)
1221 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1222 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1224 rule = strtok(parms[Xflag].arg, ":");
1225 for (i = 0; rule; i++) {
1226 snmp_rules[i].len = strlen(rule);
1227 if (snmp_rules[i].len > IFNAMSIZ) {
1228 fprintf(stderr, "Illegal %s\n", "interface basename");
1231 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1232 if (!*(rule - 1)) *(rule - 1) = ',';
1233 rule = strtok(NULL, ",");
1235 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1238 snmp_rules[i].base = atoi(rule);
1240 rule = strtok(NULL, ":");
1244 if (parms[tflag].count)
1245 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1246 if (parms[aflag].count) {
1247 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1249 fprintf(stderr, "Illegal %s\n", "source address");
1252 saddr = *((struct sockaddr_in *) res->ai_addr);
1256 if (parms[uflag].count)
1257 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1258 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1263 /* Process collectors parameters. Brrrr... :-[ */
1265 npeers = argc - optind;
1267 /* Send to remote Netflow collector */
1268 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1269 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1271 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1273 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1274 fprintf(stderr, "socket(): %s\n", strerror(errno));
1277 peers[npeers].write_fd = write_fd;
1278 peers[npeers].type = PEER_MIRROR;
1279 peers[npeers].laddr = saddr;
1280 peers[npeers].seq = 0;
1281 if ((lhost = strchr(dport, '/'))) {
1283 if ((type = strchr(lhost, '/'))) {
1291 peers[npeers].type = PEER_ROTATE;
1300 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1301 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1305 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1306 sizeof(struct sockaddr_in))) {
1307 fprintf(stderr, "bind(): %s\n", strerror(errno));
1310 if (getaddrinfo(dhost, dport, &hints, &res)) {
1312 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1315 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1317 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1318 sizeof(struct sockaddr_in))) {
1319 fprintf(stderr, "connect(): %s\n", strerror(errno));
1323 /* Restore command line */
1324 if (type) *--type = '/';
1325 if (lhost) *--lhost = '/';
1329 else if (parms[fflag].count) {
1332 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1333 fname = parms[fflag].arg;
1334 if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
1335 fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
1338 peers[0].write_fd = write_fd;
1339 peers[0].type = PEER_FILE;
1347 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1348 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1350 fprintf(stderr, "libipulog initialization error: %s",
1351 ipulog_strerror(ipulog_errno));
1355 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1356 &sockbufsize, sizeof(sockbufsize)) < 0)
1357 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1359 /* Daemonize (if log destination stdout-free) */
1361 my_log_open(ident, verbosity, log_dest);
1362 if (!(log_dest & 2)) {
1365 fprintf(stderr, "fork(): %s", strerror(errno));
1370 freopen("/dev/null", "r", stdin);
1371 freopen("/dev/null", "w", stdout);
1372 freopen("/dev/null", "w", stderr);
1379 setvbuf(stdout, (char *)0, _IONBF, 0);
1380 setvbuf(stderr, (char *)0, _IONBF, 0);
1384 sprintf(errpbuf, "[%ld]", (long) pid);
1385 strcat(ident, errpbuf);
1387 /* Initialization */
1389 hash_init(); /* Actually for crc16 only */
1390 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1391 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1394 /* Hope 12 days is enough :-/ */
1395 start_time_offset = 1 << 20;
1397 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1399 gettime(&start_time);
1402 Build static pending queue as circular buffer.
1404 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1405 pending_tail = pending_head;
1406 for (i = pending_queue_length - 1; i--;) {
1407 if (!(pending_tail->next = mem_alloc())) {
1409 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1412 pending_tail = pending_tail->next;
1414 pending_tail->next = pending_head;
1415 pending_tail = pending_head;
1417 sigemptyset(&sig_mask);
1418 sigact.sa_handler = &sighandler;
1419 sigact.sa_mask = sig_mask;
1420 sigact.sa_flags = 0;
1421 sigaddset(&sig_mask, SIGTERM);
1422 sigaction(SIGTERM, &sigact, 0);
1423 #if ((DEBUG) & DEBUG_I)
1424 sigaddset(&sig_mask, SIGUSR1);
1425 sigaction(SIGUSR1, &sigact, 0);
1427 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1428 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1432 my_log(LOG_INFO, "Starting %s...", VERSION);
1434 if (parms[cflag].count) {
1435 if (chdir(parms[cflag].arg) || chroot(".")) {
1436 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1441 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1442 pthread_attr_init(&tattr);
1443 for (i = 0; i < THREADS - 1; i++) {
1444 if (schedp.sched_priority > 0) {
1445 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1446 (pthread_attr_setschedparam(&tattr, &schedp))) {
1447 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1451 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1452 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1455 pthread_detach(thid);
1456 schedp.sched_priority++;
1460 if (setgroups(0, NULL)) {
1461 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1464 if (setregid(pw->pw_gid, pw->pw_gid)) {
1465 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1468 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1469 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1474 if (!(pidfile = fopen(pidfilepath, "w")))
1475 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1477 fprintf(pidfile, "%ld\n", (long) pid);
1481 my_log(LOG_INFO, "pid: %d", pid);
1482 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1483 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1484 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1485 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1486 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1487 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1488 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1489 for (i = 0; i < nsnmp_rules; i++) {
1490 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1491 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1493 for (i = 0; i < npeers; i++) {
1494 switch (peers[i].type) {
1502 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1503 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1504 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1507 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1509 timeout.tv_usec = 0;
1511 || (total_elements - free_elements - pending_queue_length)
1513 || pending_tail->flags) {
1516 timeout.tv_sec = scan_interval;
1517 select(0, 0, 0, 0, &timeout);
1520 if (sigs & SIGTERM_MASK && !killed) {
1521 sigs &= ~SIGTERM_MASK;
1522 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1525 active_lifetime = -1;
1526 inactive_lifetime = -1;
1528 unpending_timeout = 1;
1530 pthread_cond_signal(&scan_cond);
1531 pthread_cond_signal(&unpending_cond);
1534 #if ((DEBUG) & DEBUG_I)
1535 if (sigs & SIGUSR1_MASK) {
1536 sigs &= ~SIGUSR1_MASK;
1541 remove(pidfilepath);
1542 #if ((DEBUG) & DEBUG_I)
1545 my_log(LOG_INFO, "Done.");