2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
12 /* stdout, stderr, freopen() */
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
31 struct sockaddr_nl local;
32 struct sockaddr_nl peer;
33 struct nlmsghdr* last_nlhdr;
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
48 #include <sys/param.h>
73 #include <sys/select.h>
79 #include <fprobe-ulog.h>
81 #include <my_getopt.h>
110 static struct getopt_parms parms[] = {
111 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
112 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
113 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
114 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
115 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
116 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
117 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
118 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
120 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
123 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 extern int optind, opterr, optopt;
139 extern struct NetFlow NetFlow1;
140 extern struct NetFlow NetFlow5;
141 extern struct NetFlow NetFlow7;
143 #define mark_is_tos parms[Mflag].count
144 static unsigned scan_interval = 5;
145 static int frag_lifetime = 30;
146 static int inactive_lifetime = 60;
147 static int active_lifetime = 300;
148 static int sockbufsize;
149 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
150 #if (MEM_BITS == 0) || (MEM_BITS == 16)
151 #define BULK_QUANTITY 10000
153 #define BULK_QUANTITY 200
155 static unsigned bulk_quantity = BULK_QUANTITY;
156 static unsigned pending_queue_length = 100;
157 static struct NetFlow *netflow = &NetFlow5;
158 static unsigned verbosity = 6;
159 static unsigned log_dest = MY_LOG_SYSLOG;
160 static struct Time start_time;
161 static long start_time_offset;
164 extern unsigned total_elements;
165 extern unsigned free_elements;
166 extern unsigned total_memory;
167 #if ((DEBUG) & DEBUG_I)
168 static unsigned emit_pkts, emit_queue;
169 static uint64_t size_total;
170 static unsigned pkts_total, pkts_total_fragmented;
171 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
172 static unsigned pkts_pending, pkts_pending_done;
173 static unsigned pending_queue_trace, pending_queue_trace_candidate;
174 static unsigned flows_total, flows_fragmented;
176 static unsigned emit_count;
177 static uint32_t emit_sequence;
178 static unsigned emit_rate_bytes, emit_rate_delay;
179 static struct Time emit_time;
180 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
181 static pthread_t thid;
182 static sigset_t sig_mask;
183 static struct sched_param schedp;
184 static int sched_min, sched_max;
185 static int npeers, npeers_rot;
186 static struct peer *peers;
189 static struct Flow *flows[1 << HASH_BITS];
190 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
192 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
193 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
195 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
196 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
197 static struct Flow *pending_head, *pending_tail;
198 static struct Flow *scan_frag_dreg;
200 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
202 static struct Flow *flows_emit;
204 static char ident[256] = "fprobe-ulog";
205 static FILE *pidfile;
206 static char *pidfilepath;
209 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
210 static struct ipulog_handle *ulog_handle;
211 static uint32_t ulog_gmask = 1;
212 static char *cap_buf;
213 static int nsnmp_rules;
214 static struct snmp_rule *snmp_rules;
215 static struct passwd *pw = 0;
220 "fprobe-ulog: a NetFlow probe. Version %s\n"
221 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
223 "-h\t\tDisplay this help\n"
224 "-U <mask>\tULOG group bitwise mask [1]\n"
225 "-s <seconds>\tHow often scan for expired flows [5]\n"
226 "-g <seconds>\tFragmented flow lifetime [30]\n"
227 "-d <seconds>\tIdle flow lifetime (inactive timer) [60]\n"
228 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
229 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
230 "-a <address>\tUse <address> as source for NetFlow flow\n"
231 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
232 "-M\t\tUse netfilter mark value as ToS flag\n"
233 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
234 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
235 "-q <flows>\tPending queue length [100]\n"
236 "-B <kilobytes>\tKernel capture buffer size [0]\n"
237 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
238 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
239 "-c <directory>\tDirectory to chroot to\n"
240 "-u <user>\tUser to run as\n"
241 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
242 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
243 "-y <remote:port>\tAddress of the NetFlow collector\n",
244 "-f <writable file>\tFile to write data into\n"
245 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
249 #if ((DEBUG) & DEBUG_I)
252 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
253 pkts_total, pkts_total_fragmented, size_total,
254 pkts_pending - pkts_pending_done, pending_queue_trace);
255 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
256 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
257 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
258 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
259 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
260 total_elements, free_elements, total_memory);
264 void sighandler(int sig)
268 sigs |= SIGTERM_MASK;
270 #if ((DEBUG) & DEBUG_I)
272 sigs |= SIGUSR1_MASK;
278 void gettime(struct Time *now)
284 now->usec = t.tv_usec;
287 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
289 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
292 /* Uptime in miliseconds */
293 uint32_t getuptime(struct Time *t)
295 /* Maximum uptime is about 49/2 days */
296 return cmpmtime(t, &start_time);
299 hash_t hash_flow(struct Flow *flow)
301 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
302 else return hash(flow, sizeof(struct Flow_TL));
305 uint16_t snmp_index(char *name) {
308 if (!*name) return 0;
310 for (i = 0; (int) i < nsnmp_rules; i++) {
311 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
312 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
315 if ((i = if_nametoindex(name))) return i;
320 inline void copy_flow(struct Flow *src, struct Flow *dst)
327 dst->proto = src->proto;
328 dst->tcp_flags = src->tcp_flags;
332 dst->pkts = src->pkts;
333 dst->size = src->size;
334 dst->sizeF = src->sizeF;
335 dst->sizeP = src->sizeP;
336 dst->ctime = src->ctime;
337 dst->mtime = src->mtime;
338 dst->flags = src->flags;
341 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
343 struct Flow **flowpp;
349 if (prev) flowpp = *prev;
352 if (where->sip.s_addr == what->sip.s_addr
353 && where->dip.s_addr == what->dip.s_addr
354 && where->proto == what->proto) {
355 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
357 /* Both unfragmented */
358 if ((what->sp == where->sp)
359 && (what->dp == where->dp)) goto done;
362 /* Both fragmented */
363 if (where->id == what->id) goto done;
367 flowpp = &where->next;
371 if (prev) *prev = flowpp;
375 int put_into(struct Flow *flow, int flag
376 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
383 struct Flow *flown, **flowpp;
384 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
389 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
390 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
393 pthread_mutex_lock(&flows_mutex[h]);
395 if (!(flown = find(flows[h], flow, &flowpp))) {
396 /* No suitable flow found - add */
397 if (flag == COPY_INTO) {
398 if ((flown = mem_alloc())) {
399 copy_flow(flow, flown);
402 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
403 my_log(LOG_ERR, "%s %s. %s",
404 "mem_alloc():", strerror(errno), "packet lost");
409 flow->next = flows[h];
411 #if ((DEBUG) & DEBUG_I)
413 if (flow->flags & FLOW_FRAG) flows_fragmented++;
415 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
417 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
422 /* Found suitable flow - update */
423 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
424 sprintf(buf, " +> %x", (unsigned) flown);
427 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
428 flown->mtime = flow->mtime;
429 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
430 flown->ctime = flow->ctime;
431 flown->tcp_flags |= flow->tcp_flags;
432 flown->size += flow->size;
433 flown->pkts += flow->pkts;
434 if (flow->flags & FLOW_FRAG) {
435 /* Fragmented flow require some additional work */
436 if (flow->flags & FLOW_TL) {
439 Several packets with FLOW_TL (attack)
441 flown->sp = flow->sp;
442 flown->dp = flow->dp;
444 if (flow->flags & FLOW_LASTFRAG) {
447 Several packets with FLOW_LASTFRAG (attack)
449 flown->sizeP = flow->sizeP;
451 flown->flags |= flow->flags;
452 flown->sizeF += flow->sizeF;
453 if ((flown->flags & FLOW_LASTFRAG)
454 && (flown->sizeF >= flown->sizeP)) {
455 /* All fragments received - flow reassembled */
456 *flowpp = flown->next;
457 pthread_mutex_unlock(&flows_mutex[h]);
458 #if ((DEBUG) & DEBUG_I)
463 flown->flags &= ~FLOW_FRAG;
464 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
467 ret = put_into(flown, MOVE_INTO
468 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
474 if (flag == MOVE_INTO) mem_free(flow);
476 pthread_mutex_unlock(&flows_mutex[h]);
480 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
484 for (i = 0; i < fields; i++) {
485 #if ((DEBUG) & DEBUG_F)
486 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
489 case NETFLOW_IPV4_SRC_ADDR:
490 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
491 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
494 case NETFLOW_IPV4_DST_ADDR:
495 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
496 p += NETFLOW_IPV4_DST_ADDR_SIZE;
499 case NETFLOW_INPUT_SNMP:
500 *((uint16_t *) p) = htons(flow->iif);
501 p += NETFLOW_INPUT_SNMP_SIZE;
504 case NETFLOW_OUTPUT_SNMP:
505 *((uint16_t *) p) = htons(flow->oif);
506 p += NETFLOW_OUTPUT_SNMP_SIZE;
509 case NETFLOW_PKTS_32:
510 *((uint32_t *) p) = htonl(flow->pkts);
511 p += NETFLOW_PKTS_32_SIZE;
514 case NETFLOW_BYTES_32:
515 *((uint32_t *) p) = htonl(flow->size);
516 p += NETFLOW_BYTES_32_SIZE;
519 case NETFLOW_FIRST_SWITCHED:
520 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
521 p += NETFLOW_FIRST_SWITCHED_SIZE;
524 case NETFLOW_LAST_SWITCHED:
525 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
526 p += NETFLOW_LAST_SWITCHED_SIZE;
529 case NETFLOW_L4_SRC_PORT:
530 *((uint16_t *) p) = flow->sp;
531 p += NETFLOW_L4_SRC_PORT_SIZE;
534 case NETFLOW_L4_DST_PORT:
535 *((uint16_t *) p) = flow->dp;
536 p += NETFLOW_L4_DST_PORT_SIZE;
540 *((uint8_t *) p) = flow->proto;
541 p += NETFLOW_PROT_SIZE;
544 case NETFLOW_SRC_TOS:
545 *((uint8_t *) p) = flow->tos;
546 p += NETFLOW_SRC_TOS_SIZE;
549 case NETFLOW_TCP_FLAGS:
550 *((uint8_t *) p) = flow->tcp_flags;
551 p += NETFLOW_TCP_FLAGS_SIZE;
554 case NETFLOW_VERSION:
555 *((uint16_t *) p) = htons(netflow->Version);
556 p += NETFLOW_VERSION_SIZE;
560 *((uint16_t *) p) = htons(emit_count);
561 p += NETFLOW_COUNT_SIZE;
565 *((uint32_t *) p) = htonl(getuptime(&emit_time));
566 p += NETFLOW_UPTIME_SIZE;
569 case NETFLOW_UNIX_SECS:
570 *((uint32_t *) p) = htonl(emit_time.sec);
571 p += NETFLOW_UNIX_SECS_SIZE;
574 case NETFLOW_UNIX_NSECS:
575 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
576 p += NETFLOW_UNIX_NSECS_SIZE;
579 case NETFLOW_FLOW_SEQUENCE:
580 //*((uint32_t *) p) = htonl(emit_sequence);
581 *((uint32_t *) p) = 0;
582 p += NETFLOW_FLOW_SEQUENCE_SIZE;
586 /* Unsupported (uint8_t) */
587 case NETFLOW_ENGINE_TYPE:
588 case NETFLOW_ENGINE_ID:
589 case NETFLOW_FLAGS7_1:
590 case NETFLOW_SRC_MASK:
591 case NETFLOW_DST_MASK:
592 *((uint8_t *) p) = 0;
593 p += NETFLOW_PAD8_SIZE;
597 /* Unsupported (uint16_t) */
600 case NETFLOW_FLAGS7_2:
601 *((uint16_t *) p) = 0;
602 p += NETFLOW_PAD16_SIZE;
606 /* Unsupported (uint32_t) */
607 case NETFLOW_IPV4_NEXT_HOP:
608 case NETFLOW_ROUTER_SC:
609 *((uint32_t *) p) = 0;
610 p += NETFLOW_PAD32_SIZE;
614 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
615 format, i, format[i]);
619 #if ((DEBUG) & DEBUG_F)
620 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
627 Workaround for clone()-based threads
628 Try to change EUID independently of main thread
632 setregid(pw->pw_gid, pw->pw_gid);
633 setreuid(pw->pw_uid, pw->pw_uid);
642 struct timespec timeout;
643 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
645 p = (void *) &emit_packet + netflow->HeaderSize;
651 pthread_mutex_lock(&emit_mutex);
652 while (!flows_emit) {
653 gettimeofday(&now, 0);
654 timeout.tv_sec = now.tv_sec + emit_timeout;
655 /* Do not wait until emit_packet will filled - it may be too long */
656 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
657 pthread_mutex_unlock(&emit_mutex);
662 flows_emit = flows_emit->next;
663 #if ((DEBUG) & DEBUG_I)
666 pthread_mutex_unlock(&emit_mutex);
670 gettime(&start_time);
671 start_time.sec -= start_time_offset;
674 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
677 if (emit_count == netflow->MaxFlows) {
680 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
681 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
683 for (i = 0; i < npeers; i++) {
684 if (peers[i].type == PEER_FILE) {
686 if (netflow->SeqOffset)
687 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
689 ret = write(peers[i].write_fd, emit_packet, size);
691 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
692 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
693 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
697 #if ((DEBUG) & DEBUG_E)
699 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
700 emit_count, i + 1, peers[i].seq);
703 peers[i].seq += emit_count;
706 if (emit_rate_bytes) {
708 delay = sent / emit_rate_bytes;
710 sent %= emit_rate_bytes;
712 timeout.tv_nsec = emit_rate_delay * delay;
713 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
718 if (peers[i].type == PEER_MIRROR) goto sendreal;
720 if (peers[i].type == PEER_ROTATE)
721 if (peer_rot_cur++ == peer_rot_work) {
723 if (netflow->SeqOffset)
724 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
725 ret = send(peers[i].write_fd, emit_packet, size, 0);
727 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
728 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
729 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
732 #if ((DEBUG) & DEBUG_E)
734 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
735 emit_count, i + 1, peers[i].seq);
738 peers[i].seq += emit_count;
741 if (emit_rate_bytes) {
743 delay = sent / emit_rate_bytes;
745 sent %= emit_rate_bytes;
747 timeout.tv_nsec = emit_rate_delay * delay;
748 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
753 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
754 emit_sequence += emit_count;
756 #if ((DEBUG) & DEBUG_I)
763 void *unpending_thread()
766 struct timespec timeout;
767 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
774 pthread_mutex_lock(&unpending_mutex);
777 while (!(pending_tail->flags & FLOW_PENDING)) {
778 gettimeofday(&now, 0);
779 timeout.tv_sec = now.tv_sec + unpending_timeout;
780 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
783 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
786 if (put_into(pending_tail, COPY_INTO
787 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
791 #if ((DEBUG) & DEBUG_I)
792 pkts_lost_unpending++;
796 #if ((DEBUG) & DEBUG_U)
797 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
800 pending_tail->flags = 0;
801 pending_tail = pending_tail->next;
802 #if ((DEBUG) & DEBUG_I)
810 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
814 struct Flow *flow, **flowpp;
816 struct timespec timeout;
821 pthread_mutex_lock(&scan_mutex);
825 timeout.tv_sec = now.sec + scan_interval;
826 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
829 #if ((DEBUG) & DEBUG_S)
830 my_log(LOG_DEBUG, "S: %d", now.sec);
832 for (i = 0; i < 1 << HASH_BITS ; i++) {
833 pthread_mutex_lock(&flows_mutex[i]);
837 if (flow->flags & FLOW_FRAG) {
838 /* Process fragmented flow */
839 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
840 /* Fragmented flow expired - put it into special chain */
841 #if ((DEBUG) & DEBUG_I)
845 *flowpp = flow->next;
847 flow->flags &= ~FLOW_FRAG;
848 flow->next = scan_frag_dreg;
849 scan_frag_dreg = flow;
854 /* Flow is not frgamented */
855 if ((now.sec - flow->mtime.sec) > inactive_lifetime
856 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
858 #if ((DEBUG) & DEBUG_S)
859 my_log(LOG_DEBUG, "S: E %x", flow);
861 #if ((DEBUG) & DEBUG_I)
864 *flowpp = flow->next;
865 pthread_mutex_lock(&emit_mutex);
866 flow->next = flows_emit;
868 #if ((DEBUG) & DEBUG_I)
871 pthread_mutex_unlock(&emit_mutex);
876 flowpp = &flow->next;
879 pthread_mutex_unlock(&flows_mutex[i]);
881 if (flows_emit) pthread_cond_signal(&emit_cond);
883 while (scan_frag_dreg) {
884 flow = scan_frag_dreg;
885 scan_frag_dreg = flow->next;
886 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
889 put_into(flow, MOVE_INTO
890 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
894 #if ((DEBUG) & DEBUG_S)
895 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
903 struct ulog_packet_msg *ulog_msg;
907 int len, off_frag, psize;
908 #if ((DEBUG) & DEBUG_C)
916 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
918 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
921 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
923 #if ((DEBUG) & DEBUG_C)
924 sprintf(logbuf, "C: %d", ulog_msg->data_len);
927 nl = (void *) &ulog_msg->payload;
928 psize = ulog_msg->data_len;
931 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
932 #if ((DEBUG) & DEBUG_C)
933 strcat(logbuf, " U");
934 my_log(LOG_DEBUG, "%s", logbuf);
936 #if ((DEBUG) & DEBUG_I)
942 if (pending_head->flags) {
943 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
945 # if ((DEBUG) & DEBUG_C)
950 "pending queue full:", "packet lost");
952 #if ((DEBUG) & DEBUG_I)
958 #if ((DEBUG) & DEBUG_I)
964 /* ?FIXME? Add sanity check for ip_len? */
965 flow->size = ntohs(nl->ip_len);
966 #if ((DEBUG) & DEBUG_I)
967 size_total += flow->size;
970 flow->sip = nl->ip_src;
971 flow->dip = nl->ip_dst;
972 flow->iif = snmp_index(ulog_msg->indev_name);
973 flow->oif = snmp_index(ulog_msg->outdev_name);
974 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
975 flow->proto = nl->ip_p;
981 /* Packets captured from OUTPUT table didn't contains valid timestamp */
982 if (ulog_msg->timestamp_sec) {
983 flow->ctime.sec = ulog_msg->timestamp_sec;
984 flow->ctime.usec = ulog_msg->timestamp_usec;
985 } else gettime(&flow->ctime);
986 flow->mtime = flow->ctime;
988 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
991 Offset (from network layer) to transport layer header/IP data
992 IOW IP header size ;-)
995 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
997 off_tl = nl->ip_hl << 2;
998 tl = (void *) nl + off_tl;
1000 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1001 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1003 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1004 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1006 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1007 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1008 #if ((DEBUG) & DEBUG_C)
1009 strcat(logbuf, " F");
1011 #if ((DEBUG) & DEBUG_I)
1012 pkts_total_fragmented++;
1014 flow->flags |= FLOW_FRAG;
1015 flow->id = nl->ip_id;
1017 if (!(ntohs(nl->ip_off) & IP_MF)) {
1018 /* Packet whith IP_MF contains information about whole datagram size */
1019 flow->flags |= FLOW_LASTFRAG;
1020 /* size = frag_offset*8 + data_size */
1021 flow->sizeP = off_frag + flow->sizeF;
1025 #if ((DEBUG) & DEBUG_C)
1026 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1027 strcat(logbuf, buf);
1028 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1029 strcat(logbuf, buf);
1033 Fortunately most interesting transport layer information fit
1034 into first 8 bytes of IP data field (minimal nonzero size).
1035 Thus we don't need actual packet reassembling to build whole
1036 transport layer data. We only check the fragment offset for
1037 zero value to find packet with this information.
1039 if (!off_frag && psize >= 8) {
1040 switch (flow->proto) {
1043 flow->sp = ((struct udphdr *)tl)->uh_sport;
1044 flow->dp = ((struct udphdr *)tl)->uh_dport;
1049 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1050 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1053 #ifdef ICMP_TRICK_CISCO
1055 flow->dp = *((int32_t *) tl);
1060 /* Unknown transport layer */
1061 #if ((DEBUG) & DEBUG_C)
1062 strcat(logbuf, " U");
1069 #if ((DEBUG) & DEBUG_C)
1070 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1071 strcat(logbuf, buf);
1073 flow->flags |= FLOW_TL;
1077 /* Check for tcp flags presence (including CWR and ECE). */
1078 if (flow->proto == IPPROTO_TCP
1080 && psize >= 16 - off_frag) {
1081 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1082 #if ((DEBUG) & DEBUG_C)
1083 sprintf(buf, " TCP:%x", flow->tcp_flags);
1084 strcat(logbuf, buf);
1088 #if ((DEBUG) & DEBUG_C)
1089 sprintf(buf, " => %x", (unsigned) flow);
1090 strcat(logbuf, buf);
1091 my_log(LOG_DEBUG, "%s", logbuf);
1094 #if ((DEBUG) & DEBUG_I)
1096 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1097 if (pending_queue_trace < pending_queue_trace_candidate)
1098 pending_queue_trace = pending_queue_trace_candidate;
1101 /* Flow complete - inform unpending_thread() about it */
1102 pending_head->flags |= FLOW_PENDING;
1103 pending_head = pending_head->next;
1105 pthread_cond_signal(&unpending_cond);
1111 int main(int argc, char **argv)
1114 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1115 int c, i, write_fd, memory_limit = 0;
1116 struct addrinfo hints, *res;
1117 struct sockaddr_in saddr;
1118 pthread_attr_t tattr;
1119 struct sigaction sigact;
1120 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1121 struct timeval timeout;
1123 sched_min = sched_get_priority_min(SCHED);
1124 sched_max = sched_get_priority_max(SCHED);
1126 memset(&saddr, 0 , sizeof(saddr));
1127 memset(&hints, 0 , sizeof(hints));
1128 hints.ai_flags = AI_PASSIVE;
1129 hints.ai_family = AF_INET;
1130 hints.ai_socktype = SOCK_DGRAM;
1132 /* Process command line options */
1135 while ((c = my_getopt(argc, argv, parms)) != -1) {
1145 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1146 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1147 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1148 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1149 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1150 if (parms[nflag].count) {
1151 switch (atoi(parms[nflag].arg)) {
1153 netflow = &NetFlow1;
1160 netflow = &NetFlow7;
1164 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1168 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1169 if (parms[lflag].count) {
1170 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1173 sprintf(errpbuf, "[%s]", log_suffix);
1174 strcat(ident, errpbuf);
1177 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1178 if (log_suffix) *--log_suffix = ':';
1180 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1182 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1185 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1186 if (parms[qflag].count) {
1187 pending_queue_length = atoi(parms[qflag].arg);
1188 if (pending_queue_length < 1) {
1189 fprintf(stderr, "Illegal %s\n", "pending queue length");
1193 if (parms[rflag].count) {
1194 schedp.sched_priority = atoi(parms[rflag].arg);
1195 if (schedp.sched_priority
1196 && (schedp.sched_priority < sched_min
1197 || schedp.sched_priority > sched_max)) {
1198 fprintf(stderr, "Illegal %s\n", "realtime priority");
1202 if (parms[Bflag].count) {
1203 sockbufsize = atoi(parms[Bflag].arg) << 10;
1205 if (parms[bflag].count) {
1206 bulk_quantity = atoi(parms[bflag].arg);
1207 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1208 fprintf(stderr, "Illegal %s\n", "bulk size");
1212 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1213 if (parms[Xflag].count) {
1214 for(i = 0; parms[Xflag].arg[i]; i++)
1215 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1216 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1218 rule = strtok(parms[Xflag].arg, ":");
1219 for (i = 0; rule; i++) {
1220 snmp_rules[i].len = strlen(rule);
1221 if (snmp_rules[i].len > IFNAMSIZ) {
1222 fprintf(stderr, "Illegal %s\n", "interface basename");
1225 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1226 if (!*(rule - 1)) *(rule - 1) = ',';
1227 rule = strtok(NULL, ",");
1229 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1232 snmp_rules[i].base = atoi(rule);
1234 rule = strtok(NULL, ":");
1238 if (parms[tflag].count)
1239 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1240 if (parms[aflag].count) {
1241 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1243 fprintf(stderr, "Illegal %s\n", "source address");
1246 saddr = *((struct sockaddr_in *) res->ai_addr);
1250 if (parms[uflag].count)
1251 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1252 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1257 /* Process collectors parameters. Brrrr... :-[ */
1259 npeers = argc - optind;
1261 /* Send to remote Netflow collector */
1262 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1263 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1265 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1267 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1268 fprintf(stderr, "socket(): %s\n", strerror(errno));
1271 peers[npeers].write_fd = write_fd;
1272 peers[npeers].type = PEER_MIRROR;
1273 peers[npeers].laddr = saddr;
1274 peers[npeers].seq = 0;
1275 if ((lhost = strchr(dport, '/'))) {
1277 if ((type = strchr(lhost, '/'))) {
1285 peers[npeers].type = PEER_ROTATE;
1294 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1295 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1299 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1300 sizeof(struct sockaddr_in))) {
1301 fprintf(stderr, "bind(): %s\n", strerror(errno));
1304 if (getaddrinfo(dhost, dport, &hints, &res)) {
1306 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1309 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1311 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1312 sizeof(struct sockaddr_in))) {
1313 fprintf(stderr, "connect(): %s\n", strerror(errno));
1317 /* Restore command line */
1318 if (type) *--type = '/';
1319 if (lhost) *--lhost = '/';
1323 else if (parms[fflag].count) {
1326 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1327 fname = parms[fflag].arg;
1328 if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
1329 fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
1332 peers[0].write_fd = write_fd;
1333 peers[0].type = PEER_FILE;
1341 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1342 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1344 fprintf(stderr, "libipulog initialization error: %s",
1345 ipulog_strerror(ipulog_errno));
1349 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1350 &sockbufsize, sizeof(sockbufsize)) < 0)
1351 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1353 /* Daemonize (if log destination stdout-free) */
1355 my_log_open(ident, verbosity, log_dest);
1356 if (!(log_dest & 2)) {
1359 fprintf(stderr, "fork(): %s", strerror(errno));
1364 freopen("/dev/null", "r", stdin);
1365 freopen("/dev/null", "w", stdout);
1366 freopen("/dev/null", "w", stderr);
1373 setvbuf(stdout, (char *)0, _IONBF, 0);
1374 setvbuf(stderr, (char *)0, _IONBF, 0);
1378 sprintf(errpbuf, "[%ld]", (long) pid);
1379 strcat(ident, errpbuf);
1381 /* Initialization */
1383 hash_init(); /* Actually for crc16 only */
1384 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1385 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1388 /* Hope 12 days is enough :-/ */
1389 start_time_offset = 1 << 20;
1391 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1393 gettime(&start_time);
1396 Build static pending queue as circular buffer.
1398 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1399 pending_tail = pending_head;
1400 for (i = pending_queue_length - 1; i--;) {
1401 if (!(pending_tail->next = mem_alloc())) {
1403 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1406 pending_tail = pending_tail->next;
1408 pending_tail->next = pending_head;
1409 pending_tail = pending_head;
1411 sigemptyset(&sig_mask);
1412 sigact.sa_handler = &sighandler;
1413 sigact.sa_mask = sig_mask;
1414 sigact.sa_flags = 0;
1415 sigaddset(&sig_mask, SIGTERM);
1416 sigaction(SIGTERM, &sigact, 0);
1417 #if ((DEBUG) & DEBUG_I)
1418 sigaddset(&sig_mask, SIGUSR1);
1419 sigaction(SIGUSR1, &sigact, 0);
1421 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1422 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1426 my_log(LOG_INFO, "Starting %s...", VERSION);
1428 if (parms[cflag].count) {
1429 if (chdir(parms[cflag].arg) || chroot(".")) {
1430 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1435 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1436 pthread_attr_init(&tattr);
1437 for (i = 0; i < THREADS - 1; i++) {
1438 if (schedp.sched_priority > 0) {
1439 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1440 (pthread_attr_setschedparam(&tattr, &schedp))) {
1441 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1445 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1446 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1449 pthread_detach(thid);
1450 schedp.sched_priority++;
1454 if (setgroups(0, NULL)) {
1455 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1458 if (setregid(pw->pw_gid, pw->pw_gid)) {
1459 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1462 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1463 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1468 if (!(pidfile = fopen(pidfilepath, "w")))
1469 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1471 fprintf(pidfile, "%ld\n", (long) pid);
1475 my_log(LOG_INFO, "pid: %d", pid);
1476 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1477 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1478 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1479 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1480 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1481 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1482 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1483 for (i = 0; i < nsnmp_rules; i++) {
1484 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1485 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1487 for (i = 0; i < npeers; i++) {
1488 switch (peers[i].type) {
1496 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1497 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1498 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1501 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1503 timeout.tv_usec = 0;
1505 || (total_elements - free_elements - pending_queue_length)
1507 || pending_tail->flags) {
1510 timeout.tv_sec = scan_interval;
1511 select(0, 0, 0, 0, &timeout);
1514 if (sigs & SIGTERM_MASK && !killed) {
1515 sigs &= ~SIGTERM_MASK;
1516 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1519 active_lifetime = -1;
1520 inactive_lifetime = -1;
1522 unpending_timeout = 1;
1524 pthread_cond_signal(&scan_cond);
1525 pthread_cond_signal(&unpending_cond);
1528 #if ((DEBUG) & DEBUG_I)
1529 if (sigs & SIGUSR1_MASK) {
1530 sigs &= ~SIGUSR1_MASK;
1535 remove(pidfilepath);
1536 #if ((DEBUG) & DEBUG_I)
1539 my_log(LOG_INFO, "Done.");