2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
12 /* stdout, stderr, freopen() */
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
31 struct sockaddr_nl local;
32 struct sockaddr_nl peer;
33 struct nlmsghdr* last_nlhdr;
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
48 #include <sys/param.h>
73 #include <sys/select.h>
79 #include <fprobe-ulog.h>
81 #include <my_getopt.h>
112 static struct getopt_parms parms[] = {
113 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
114 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
115 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
116 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
117 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
118 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
119 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
120 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
123 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 extern int optind, opterr, optopt;
143 extern struct NetFlow NetFlow1;
144 extern struct NetFlow NetFlow5;
145 extern struct NetFlow NetFlow7;
147 #define mark_is_tos parms[Mflag].count
148 static unsigned scan_interval = 5;
149 static int frag_lifetime = 30;
150 static int inactive_lifetime = 60;
151 static int active_lifetime = 300;
152 static int sockbufsize;
153 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
154 #if (MEM_BITS == 0) || (MEM_BITS == 16)
155 #define BULK_QUANTITY 10000
157 #define BULK_QUANTITY 200
160 static unsigned epoch_length=60, log_epochs=1;
161 static unsigned cur_epoch=0,prev_uptime=0;
163 static unsigned bulk_quantity = BULK_QUANTITY;
164 static unsigned pending_queue_length = 100;
165 static struct NetFlow *netflow = &NetFlow5;
166 static unsigned verbosity = 6;
167 static unsigned log_dest = MY_LOG_SYSLOG;
168 static struct Time start_time;
169 static long start_time_offset;
172 extern unsigned total_elements;
173 extern unsigned free_elements;
174 extern unsigned total_memory;
175 #if ((DEBUG) & DEBUG_I)
176 static unsigned emit_pkts, emit_queue;
177 static uint64_t size_total;
178 static unsigned pkts_total, pkts_total_fragmented;
179 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
180 static unsigned pkts_pending, pkts_pending_done;
181 static unsigned pending_queue_trace, pending_queue_trace_candidate;
182 static unsigned flows_total, flows_fragmented;
184 static unsigned emit_count;
185 static uint32_t emit_sequence;
186 static unsigned emit_rate_bytes, emit_rate_delay;
187 static struct Time emit_time;
188 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
189 static pthread_t thid;
190 static sigset_t sig_mask;
191 static struct sched_param schedp;
192 static int sched_min, sched_max;
193 static int npeers, npeers_rot;
194 static struct peer *peers;
197 static struct Flow *flows[1 << HASH_BITS];
198 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
200 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
203 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
204 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
205 static struct Flow *pending_head, *pending_tail;
206 static struct Flow *scan_frag_dreg;
208 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
209 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
210 static struct Flow *flows_emit;
212 static char ident[256] = "fprobe-ulog";
213 static FILE *pidfile;
214 static char *pidfilepath;
217 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
218 static struct ipulog_handle *ulog_handle;
219 static uint32_t ulog_gmask = 1;
220 static char *cap_buf;
221 static int nsnmp_rules;
222 static struct snmp_rule *snmp_rules;
223 static struct passwd *pw = 0;
228 "fprobe-ulog: a NetFlow probe. Version %s\n"
229 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
231 "-h\t\tDisplay this help\n"
232 "-U <mask>\tULOG group bitwise mask [1]\n"
233 "-s <seconds>\tHow often scan for expired flows [5]\n"
234 "-g <seconds>\tFragmented flow lifetime [30]\n"
235 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
236 "-f <filename>\tLog flow data in a file\n"
237 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
238 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
239 "-a <address>\tUse <address> as source for NetFlow flow\n"
240 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
241 "-M\t\tUse netfilter mark value as ToS flag\n"
242 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
243 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
244 "-q <flows>\tPending queue length [100]\n"
245 "-B <kilobytes>\tKernel capture buffer size [0]\n"
246 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
247 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
248 "-c <directory>\tDirectory to chroot to\n"
249 "-u <user>\tUser to run as\n"
250 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
251 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
252 "-y <remote:port>\tAddress of the NetFlow collector\n",
253 "-f <writable file>\tFile to write data into\n"
254 "-T <n>\tRotate log file every n epochs\n"
255 "-E <[1..60]>\tSize of an epoch in minutes\n"
256 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
260 #if ((DEBUG) & DEBUG_I)
263 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
264 pkts_total, pkts_total_fragmented, size_total,
265 pkts_pending - pkts_pending_done, pending_queue_trace);
266 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
267 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
268 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
269 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
270 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
271 total_elements, free_elements, total_memory);
275 void sighandler(int sig)
279 sigs |= SIGTERM_MASK;
281 #if ((DEBUG) & DEBUG_I)
283 sigs |= SIGUSR1_MASK;
289 void gettime(struct Time *now)
295 now->usec = t.tv_usec;
298 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
300 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
303 /* Uptime in miliseconds */
304 uint32_t getuptime(struct Time *t)
306 /* Maximum uptime is about 49/2 days */
307 return cmpmtime(t, &start_time);
310 hash_t hash_flow(struct Flow *flow)
312 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
313 else return hash(flow, sizeof(struct Flow_TL));
316 uint16_t snmp_index(char *name) {
319 if (!*name) return 0;
321 for (i = 0; (int) i < nsnmp_rules; i++) {
322 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
323 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
326 if ((i = if_nametoindex(name))) return i;
331 inline void copy_flow(struct Flow *src, struct Flow *dst)
338 dst->proto = src->proto;
339 dst->tcp_flags = src->tcp_flags;
343 dst->pkts = src->pkts;
344 dst->size = src->size;
345 dst->sizeF = src->sizeF;
346 dst->sizeP = src->sizeP;
347 dst->ctime = src->ctime;
348 dst->mtime = src->mtime;
349 dst->flags = src->flags;
352 unsigned get_log_fd(char *fname, unsigned cur_fd) {
357 cur_uptime = getuptime(&now);
358 if ((cur_uptime - prev_uptime) > (1000 * epoch_length)) {
359 char nextname[MAX_PATH_LEN];
361 prev_uptime = cur_uptime;
362 cur_epoch = (cur_epoch + 1) % log_epochs;
364 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
365 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
366 fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
376 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
378 struct Flow **flowpp;
384 if (prev) flowpp = *prev;
387 if (where->sip.s_addr == what->sip.s_addr
388 && where->dip.s_addr == what->dip.s_addr
389 && where->proto == what->proto) {
390 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
392 /* Both unfragmented */
393 if ((what->sp == where->sp)
394 && (what->dp == where->dp)) goto done;
397 /* Both fragmented */
398 if (where->id == what->id) goto done;
402 flowpp = &where->next;
406 if (prev) *prev = flowpp;
410 int put_into(struct Flow *flow, int flag
411 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
418 struct Flow *flown, **flowpp;
419 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
424 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
425 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
428 pthread_mutex_lock(&flows_mutex[h]);
430 if (!(flown = find(flows[h], flow, &flowpp))) {
431 /* No suitable flow found - add */
432 if (flag == COPY_INTO) {
433 if ((flown = mem_alloc())) {
434 copy_flow(flow, flown);
437 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
438 my_log(LOG_ERR, "%s %s. %s",
439 "mem_alloc():", strerror(errno), "packet lost");
444 flow->next = flows[h];
446 #if ((DEBUG) & DEBUG_I)
448 if (flow->flags & FLOW_FRAG) flows_fragmented++;
450 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
452 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
457 /* Found suitable flow - update */
458 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
459 sprintf(buf, " +> %x", (unsigned) flown);
462 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
463 flown->mtime = flow->mtime;
464 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
465 flown->ctime = flow->ctime;
466 flown->tcp_flags |= flow->tcp_flags;
467 flown->size += flow->size;
468 flown->pkts += flow->pkts;
469 if (flow->flags & FLOW_FRAG) {
470 /* Fragmented flow require some additional work */
471 if (flow->flags & FLOW_TL) {
474 Several packets with FLOW_TL (attack)
476 flown->sp = flow->sp;
477 flown->dp = flow->dp;
479 if (flow->flags & FLOW_LASTFRAG) {
482 Several packets with FLOW_LASTFRAG (attack)
484 flown->sizeP = flow->sizeP;
486 flown->flags |= flow->flags;
487 flown->sizeF += flow->sizeF;
488 if ((flown->flags & FLOW_LASTFRAG)
489 && (flown->sizeF >= flown->sizeP)) {
490 /* All fragments received - flow reassembled */
491 *flowpp = flown->next;
492 pthread_mutex_unlock(&flows_mutex[h]);
493 #if ((DEBUG) & DEBUG_I)
498 flown->flags &= ~FLOW_FRAG;
499 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
502 ret = put_into(flown, MOVE_INTO
503 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
509 if (flag == MOVE_INTO) mem_free(flow);
511 pthread_mutex_unlock(&flows_mutex[h]);
515 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
519 for (i = 0; i < fields; i++) {
520 #if ((DEBUG) & DEBUG_F)
521 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
524 case NETFLOW_IPV4_SRC_ADDR:
525 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
526 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
529 case NETFLOW_IPV4_DST_ADDR:
530 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
531 p += NETFLOW_IPV4_DST_ADDR_SIZE;
534 case NETFLOW_INPUT_SNMP:
535 *((uint16_t *) p) = htons(flow->iif);
536 p += NETFLOW_INPUT_SNMP_SIZE;
539 case NETFLOW_OUTPUT_SNMP:
540 *((uint16_t *) p) = htons(flow->oif);
541 p += NETFLOW_OUTPUT_SNMP_SIZE;
544 case NETFLOW_PKTS_32:
545 *((uint32_t *) p) = htonl(flow->pkts);
546 p += NETFLOW_PKTS_32_SIZE;
549 case NETFLOW_BYTES_32:
550 *((uint32_t *) p) = htonl(flow->size);
551 p += NETFLOW_BYTES_32_SIZE;
554 case NETFLOW_FIRST_SWITCHED:
555 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
556 p += NETFLOW_FIRST_SWITCHED_SIZE;
559 case NETFLOW_LAST_SWITCHED:
560 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
561 p += NETFLOW_LAST_SWITCHED_SIZE;
564 case NETFLOW_L4_SRC_PORT:
565 *((uint16_t *) p) = flow->sp;
566 p += NETFLOW_L4_SRC_PORT_SIZE;
569 case NETFLOW_L4_DST_PORT:
570 *((uint16_t *) p) = flow->dp;
571 p += NETFLOW_L4_DST_PORT_SIZE;
575 *((uint8_t *) p) = flow->proto;
576 p += NETFLOW_PROT_SIZE;
579 case NETFLOW_SRC_TOS:
580 *((uint8_t *) p) = flow->tos;
581 p += NETFLOW_SRC_TOS_SIZE;
584 case NETFLOW_TCP_FLAGS:
585 *((uint8_t *) p) = flow->tcp_flags;
586 p += NETFLOW_TCP_FLAGS_SIZE;
589 case NETFLOW_VERSION:
590 *((uint16_t *) p) = htons(netflow->Version);
591 p += NETFLOW_VERSION_SIZE;
595 *((uint16_t *) p) = htons(emit_count);
596 p += NETFLOW_COUNT_SIZE;
600 *((uint32_t *) p) = htonl(getuptime(&emit_time));
601 p += NETFLOW_UPTIME_SIZE;
604 case NETFLOW_UNIX_SECS:
605 *((uint32_t *) p) = htonl(emit_time.sec);
606 p += NETFLOW_UNIX_SECS_SIZE;
609 case NETFLOW_UNIX_NSECS:
610 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
611 p += NETFLOW_UNIX_NSECS_SIZE;
614 case NETFLOW_FLOW_SEQUENCE:
615 //*((uint32_t *) p) = htonl(emit_sequence);
616 *((uint32_t *) p) = 0;
617 p += NETFLOW_FLOW_SEQUENCE_SIZE;
621 /* Unsupported (uint8_t) */
622 case NETFLOW_ENGINE_TYPE:
623 case NETFLOW_ENGINE_ID:
624 case NETFLOW_FLAGS7_1:
625 case NETFLOW_SRC_MASK:
626 case NETFLOW_DST_MASK:
627 *((uint8_t *) p) = 0;
628 p += NETFLOW_PAD8_SIZE;
632 /* Unsupported (uint16_t) */
635 case NETFLOW_FLAGS7_2:
636 *((uint16_t *) p) = 0;
637 p += NETFLOW_PAD16_SIZE;
641 /* Unsupported (uint32_t) */
642 case NETFLOW_IPV4_NEXT_HOP:
643 case NETFLOW_ROUTER_SC:
644 *((uint32_t *) p) = 0;
645 p += NETFLOW_PAD32_SIZE;
649 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
650 format, i, format[i]);
654 #if ((DEBUG) & DEBUG_F)
655 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
662 Workaround for clone()-based threads
663 Try to change EUID independently of main thread
667 setregid(pw->pw_gid, pw->pw_gid);
668 setreuid(pw->pw_uid, pw->pw_uid);
677 struct timespec timeout;
678 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
680 p = (void *) &emit_packet + netflow->HeaderSize;
686 pthread_mutex_lock(&emit_mutex);
687 while (!flows_emit) {
688 gettimeofday(&now, 0);
689 timeout.tv_sec = now.tv_sec + emit_timeout;
690 /* Do not wait until emit_packet will filled - it may be too long */
691 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
692 pthread_mutex_unlock(&emit_mutex);
697 flows_emit = flows_emit->next;
698 #if ((DEBUG) & DEBUG_I)
701 pthread_mutex_unlock(&emit_mutex);
705 gettime(&start_time);
706 start_time.sec -= start_time_offset;
709 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
713 printf("Emit count = %d\n", emit_count);
716 if (emit_count == netflow->MaxFlows) {
719 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
720 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
721 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
722 if (size < 1464) size = 1464;
724 for (i = 0; i < npeers; i++) {
725 if (peers[0].type == PEER_FILE) {
726 if (netflow->SeqOffset)
727 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
729 peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
730 ret = write(peers[0].write_fd, emit_packet, size);
732 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
733 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
734 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
738 #if ((DEBUG) & DEBUG_E)
740 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
741 emit_count, i + 1, peers[i].seq);
744 peers[0].seq += emit_count;
747 if (emit_rate_bytes) {
749 delay = sent / emit_rate_bytes;
751 sent %= emit_rate_bytes;
753 timeout.tv_nsec = emit_rate_delay * delay;
754 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
759 if (peers[i].type == PEER_MIRROR) goto sendreal;
761 if (peers[i].type == PEER_ROTATE)
762 if (peer_rot_cur++ == peer_rot_work) {
764 if (netflow->SeqOffset)
765 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
766 ret = send(peers[i].write_fd, emit_packet, size, 0);
768 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
769 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
770 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
773 #if ((DEBUG) & DEBUG_E)
775 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
776 emit_count, i + 1, peers[i].seq);
779 peers[i].seq += emit_count;
782 if (emit_rate_bytes) {
784 delay = sent / emit_rate_bytes;
786 sent %= emit_rate_bytes;
788 timeout.tv_nsec = emit_rate_delay * delay;
789 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
794 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
795 emit_sequence += emit_count;
797 #if ((DEBUG) & DEBUG_I)
804 void *unpending_thread()
807 struct timespec timeout;
808 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
815 pthread_mutex_lock(&unpending_mutex);
818 while (!(pending_tail->flags & FLOW_PENDING)) {
819 gettimeofday(&now, 0);
820 timeout.tv_sec = now.tv_sec + unpending_timeout;
821 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
824 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
827 if (put_into(pending_tail, COPY_INTO
828 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
832 #if ((DEBUG) & DEBUG_I)
833 pkts_lost_unpending++;
837 #if ((DEBUG) & DEBUG_U)
838 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
841 pending_tail->flags = 0;
842 pending_tail = pending_tail->next;
843 #if ((DEBUG) & DEBUG_I)
851 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
855 struct Flow *flow, **flowpp;
857 struct timespec timeout;
862 pthread_mutex_lock(&scan_mutex);
866 timeout.tv_sec = now.sec + scan_interval;
867 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
870 #if ((DEBUG) & DEBUG_S)
871 my_log(LOG_DEBUG, "S: %d", now.sec);
873 for (i = 0; i < 1 << HASH_BITS ; i++) {
874 pthread_mutex_lock(&flows_mutex[i]);
878 if (flow->flags & FLOW_FRAG) {
879 /* Process fragmented flow */
880 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
881 /* Fragmented flow expired - put it into special chain */
882 #if ((DEBUG) & DEBUG_I)
886 *flowpp = flow->next;
888 flow->flags &= ~FLOW_FRAG;
889 flow->next = scan_frag_dreg;
890 scan_frag_dreg = flow;
895 /* Flow is not frgamented */
896 if ((now.sec - flow->mtime.sec) > inactive_lifetime
897 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
899 #if ((DEBUG) & DEBUG_S)
900 my_log(LOG_DEBUG, "S: E %x", flow);
902 #if ((DEBUG) & DEBUG_I)
905 *flowpp = flow->next;
906 pthread_mutex_lock(&emit_mutex);
907 flow->next = flows_emit;
909 #if ((DEBUG) & DEBUG_I)
912 pthread_mutex_unlock(&emit_mutex);
917 flowpp = &flow->next;
920 pthread_mutex_unlock(&flows_mutex[i]);
922 if (flows_emit) pthread_cond_signal(&emit_cond);
924 while (scan_frag_dreg) {
925 flow = scan_frag_dreg;
926 scan_frag_dreg = flow->next;
927 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
930 put_into(flow, MOVE_INTO
931 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
935 #if ((DEBUG) & DEBUG_S)
936 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
944 struct ulog_packet_msg *ulog_msg;
948 int len, off_frag, psize;
949 #if ((DEBUG) & DEBUG_C)
957 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
959 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
962 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
964 #if ((DEBUG) & DEBUG_C)
965 sprintf(logbuf, "C: %d", ulog_msg->data_len);
968 nl = (void *) &ulog_msg->payload;
969 psize = ulog_msg->data_len;
972 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
973 #if ((DEBUG) & DEBUG_C)
974 strcat(logbuf, " U");
975 my_log(LOG_DEBUG, "%s", logbuf);
977 #if ((DEBUG) & DEBUG_I)
983 if (pending_head->flags) {
984 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
986 # if ((DEBUG) & DEBUG_C)
991 "pending queue full:", "packet lost");
993 #if ((DEBUG) & DEBUG_I)
999 #if ((DEBUG) & DEBUG_I)
1003 flow = pending_head;
1005 /* ?FIXME? Add sanity check for ip_len? */
1006 flow->size = ntohs(nl->ip_len);
1007 #if ((DEBUG) & DEBUG_I)
1008 size_total += flow->size;
1011 flow->sip = nl->ip_src;
1012 flow->dip = nl->ip_dst;
1013 flow->iif = snmp_index(ulog_msg->indev_name);
1014 flow->oif = snmp_index(ulog_msg->outdev_name);
1015 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1016 flow->proto = nl->ip_p;
1018 flow->tcp_flags = 0;
1022 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1023 if (ulog_msg->timestamp_sec) {
1024 flow->ctime.sec = ulog_msg->timestamp_sec;
1025 flow->ctime.usec = ulog_msg->timestamp_usec;
1026 } else gettime(&flow->ctime);
1027 flow->mtime = flow->ctime;
1029 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1032 Offset (from network layer) to transport layer header/IP data
1033 IOW IP header size ;-)
1036 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1038 off_tl = nl->ip_hl << 2;
1039 tl = (void *) nl + off_tl;
1041 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1042 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1044 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1045 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1047 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1048 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1049 #if ((DEBUG) & DEBUG_C)
1050 strcat(logbuf, " F");
1052 #if ((DEBUG) & DEBUG_I)
1053 pkts_total_fragmented++;
1055 flow->flags |= FLOW_FRAG;
1056 flow->id = nl->ip_id;
1058 if (!(ntohs(nl->ip_off) & IP_MF)) {
1059 /* Packet whith IP_MF contains information about whole datagram size */
1060 flow->flags |= FLOW_LASTFRAG;
1061 /* size = frag_offset*8 + data_size */
1062 flow->sizeP = off_frag + flow->sizeF;
1066 #if ((DEBUG) & DEBUG_C)
1067 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1068 strcat(logbuf, buf);
1069 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1070 strcat(logbuf, buf);
1074 Fortunately most interesting transport layer information fit
1075 into first 8 bytes of IP data field (minimal nonzero size).
1076 Thus we don't need actual packet reassembling to build whole
1077 transport layer data. We only check the fragment offset for
1078 zero value to find packet with this information.
1080 if (!off_frag && psize >= 8) {
1081 switch (flow->proto) {
1084 flow->sp = ((struct udphdr *)tl)->uh_sport;
1085 flow->dp = ((struct udphdr *)tl)->uh_dport;
1090 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1091 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1094 #ifdef ICMP_TRICK_CISCO
1096 flow->dp = *((int32_t *) tl);
1101 /* Unknown transport layer */
1102 #if ((DEBUG) & DEBUG_C)
1103 strcat(logbuf, " U");
1110 #if ((DEBUG) & DEBUG_C)
1111 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1112 strcat(logbuf, buf);
1114 flow->flags |= FLOW_TL;
1118 /* Check for tcp flags presence (including CWR and ECE). */
1119 if (flow->proto == IPPROTO_TCP
1121 && psize >= 16 - off_frag) {
1122 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1123 #if ((DEBUG) & DEBUG_C)
1124 sprintf(buf, " TCP:%x", flow->tcp_flags);
1125 strcat(logbuf, buf);
1129 #if ((DEBUG) & DEBUG_C)
1130 sprintf(buf, " => %x", (unsigned) flow);
1131 strcat(logbuf, buf);
1132 my_log(LOG_DEBUG, "%s", logbuf);
1135 #if ((DEBUG) & DEBUG_I)
1137 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1138 if (pending_queue_trace < pending_queue_trace_candidate)
1139 pending_queue_trace = pending_queue_trace_candidate;
1142 /* Flow complete - inform unpending_thread() about it */
1143 pending_head->flags |= FLOW_PENDING;
1144 pending_head = pending_head->next;
1146 pthread_cond_signal(&unpending_cond);
1152 int main(int argc, char **argv)
1155 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1156 int c, i, write_fd, memory_limit = 0;
1157 struct addrinfo hints, *res;
1158 struct sockaddr_in saddr;
1159 pthread_attr_t tattr;
1160 struct sigaction sigact;
1161 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1162 struct timeval timeout;
1164 sched_min = sched_get_priority_min(SCHED);
1165 sched_max = sched_get_priority_max(SCHED);
1167 memset(&saddr, 0 , sizeof(saddr));
1168 memset(&hints, 0 , sizeof(hints));
1169 hints.ai_flags = AI_PASSIVE;
1170 hints.ai_family = AF_INET;
1171 hints.ai_socktype = SOCK_DGRAM;
1173 /* Process command line options */
1176 while ((c = my_getopt(argc, argv, parms)) != -1) {
1186 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1187 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1188 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1189 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1190 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1191 if (parms[nflag].count) {
1192 switch (atoi(parms[nflag].arg)) {
1194 netflow = &NetFlow1;
1201 netflow = &NetFlow7;
1205 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1209 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1210 if (parms[lflag].count) {
1211 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1214 sprintf(errpbuf, "[%s]", log_suffix);
1215 strcat(ident, errpbuf);
1218 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1219 if (log_suffix) *--log_suffix = ':';
1221 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1223 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1226 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1227 if (parms[qflag].count) {
1228 pending_queue_length = atoi(parms[qflag].arg);
1229 if (pending_queue_length < 1) {
1230 fprintf(stderr, "Illegal %s\n", "pending queue length");
1234 if (parms[rflag].count) {
1235 schedp.sched_priority = atoi(parms[rflag].arg);
1236 if (schedp.sched_priority
1237 && (schedp.sched_priority < sched_min
1238 || schedp.sched_priority > sched_max)) {
1239 fprintf(stderr, "Illegal %s\n", "realtime priority");
1243 if (parms[Bflag].count) {
1244 sockbufsize = atoi(parms[Bflag].arg) << 10;
1246 if (parms[bflag].count) {
1247 bulk_quantity = atoi(parms[bflag].arg);
1248 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1249 fprintf(stderr, "Illegal %s\n", "bulk size");
1253 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1254 if (parms[Xflag].count) {
1255 for(i = 0; parms[Xflag].arg[i]; i++)
1256 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1257 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1259 rule = strtok(parms[Xflag].arg, ":");
1260 for (i = 0; rule; i++) {
1261 snmp_rules[i].len = strlen(rule);
1262 if (snmp_rules[i].len > IFNAMSIZ) {
1263 fprintf(stderr, "Illegal %s\n", "interface basename");
1266 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1267 if (!*(rule - 1)) *(rule - 1) = ',';
1268 rule = strtok(NULL, ",");
1270 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1273 snmp_rules[i].base = atoi(rule);
1275 rule = strtok(NULL, ":");
1279 if (parms[tflag].count)
1280 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1281 if (parms[aflag].count) {
1282 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1284 fprintf(stderr, "Illegal %s\n", "source address");
1287 saddr = *((struct sockaddr_in *) res->ai_addr);
1291 if (parms[uflag].count)
1292 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1293 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1298 /* Process collectors parameters. Brrrr... :-[ */
1300 npeers = argc - optind;
1302 /* Send to remote Netflow collector */
1303 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1304 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1306 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1308 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1309 fprintf(stderr, "socket(): %s\n", strerror(errno));
1312 peers[npeers].write_fd = write_fd;
1313 peers[npeers].type = PEER_MIRROR;
1314 peers[npeers].laddr = saddr;
1315 peers[npeers].seq = 0;
1316 if ((lhost = strchr(dport, '/'))) {
1318 if ((type = strchr(lhost, '/'))) {
1326 peers[npeers].type = PEER_ROTATE;
1335 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1336 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1340 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1341 sizeof(struct sockaddr_in))) {
1342 fprintf(stderr, "bind(): %s\n", strerror(errno));
1345 if (getaddrinfo(dhost, dport, &hints, &res)) {
1347 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1350 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1352 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1353 sizeof(struct sockaddr_in))) {
1354 fprintf(stderr, "connect(): %s\n", strerror(errno));
1358 /* Restore command line */
1359 if (type) *--type = '/';
1360 if (lhost) *--lhost = '/';
1364 else if (parms[fflag].count) {
1367 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1368 if (!(fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1369 strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
1371 peers[0].write_fd = -1;
1372 peers[0].type = PEER_FILE;
1380 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1381 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1383 fprintf(stderr, "libipulog initialization error: %s",
1384 ipulog_strerror(ipulog_errno));
1388 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1389 &sockbufsize, sizeof(sockbufsize)) < 0)
1390 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1392 /* Daemonize (if log destination stdout-free) */
1394 my_log_open(ident, verbosity, log_dest);
1395 if (!(log_dest & 2)) {
1398 fprintf(stderr, "fork(): %s", strerror(errno));
1403 freopen("/dev/null", "r", stdin);
1404 freopen("/dev/null", "w", stdout);
1405 freopen("/dev/null", "w", stderr);
1412 setvbuf(stdout, (char *)0, _IONBF, 0);
1413 setvbuf(stderr, (char *)0, _IONBF, 0);
1417 sprintf(errpbuf, "[%ld]", (long) pid);
1418 strcat(ident, errpbuf);
1420 /* Initialization */
1422 hash_init(); /* Actually for crc16 only */
1423 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1424 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1427 /* Hope 12 days is enough :-/ */
1428 start_time_offset = 1 << 20;
1430 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1432 gettime(&start_time);
1435 Build static pending queue as circular buffer.
1437 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1438 pending_tail = pending_head;
1439 for (i = pending_queue_length - 1; i--;) {
1440 if (!(pending_tail->next = mem_alloc())) {
1442 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1445 pending_tail = pending_tail->next;
1447 pending_tail->next = pending_head;
1448 pending_tail = pending_head;
1450 sigemptyset(&sig_mask);
1451 sigact.sa_handler = &sighandler;
1452 sigact.sa_mask = sig_mask;
1453 sigact.sa_flags = 0;
1454 sigaddset(&sig_mask, SIGTERM);
1455 sigaction(SIGTERM, &sigact, 0);
1456 #if ((DEBUG) & DEBUG_I)
1457 sigaddset(&sig_mask, SIGUSR1);
1458 sigaction(SIGUSR1, &sigact, 0);
1460 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1461 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1465 my_log(LOG_INFO, "Starting %s...", VERSION);
1467 if (parms[cflag].count) {
1468 if (chdir(parms[cflag].arg) || chroot(".")) {
1469 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1474 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1475 pthread_attr_init(&tattr);
1476 for (i = 0; i < THREADS - 1; i++) {
1477 if (schedp.sched_priority > 0) {
1478 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1479 (pthread_attr_setschedparam(&tattr, &schedp))) {
1480 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1484 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1485 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1488 pthread_detach(thid);
1489 schedp.sched_priority++;
1493 if (setgroups(0, NULL)) {
1494 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1497 if (setregid(pw->pw_gid, pw->pw_gid)) {
1498 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1501 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1502 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1507 if (!(pidfile = fopen(pidfilepath, "w")))
1508 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1510 fprintf(pidfile, "%ld\n", (long) pid);
1514 my_log(LOG_INFO, "pid: %d", pid);
1515 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1516 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1517 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1518 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1519 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1520 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1521 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1522 for (i = 0; i < nsnmp_rules; i++) {
1523 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1524 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1526 for (i = 0; i < npeers; i++) {
1527 switch (peers[i].type) {
1535 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1536 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1537 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1540 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1542 timeout.tv_usec = 0;
1544 || (total_elements - free_elements - pending_queue_length)
1546 || pending_tail->flags) {
1549 timeout.tv_sec = scan_interval;
1550 select(0, 0, 0, 0, &timeout);
1553 if (sigs & SIGTERM_MASK && !killed) {
1554 sigs &= ~SIGTERM_MASK;
1555 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1558 active_lifetime = -1;
1559 inactive_lifetime = -1;
1561 unpending_timeout = 1;
1563 pthread_cond_signal(&scan_cond);
1564 pthread_cond_signal(&unpending_cond);
1567 #if ((DEBUG) & DEBUG_I)
1568 if (sigs & SIGUSR1_MASK) {
1569 sigs &= ~SIGUSR1_MASK;
1574 remove(pidfilepath);
1575 #if ((DEBUG) & DEBUG_I)
1578 my_log(LOG_INFO, "Done.");