2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
12 /* stdout, stderr, freopen() */
18 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
27 #include <libipulog/libipulog.h>
28 struct ipulog_handle {
31 struct sockaddr_nl local;
32 struct sockaddr_nl peer;
33 struct nlmsghdr* last_nlhdr;
36 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
37 #include <sys/types.h>
38 #include <netinet/in_systm.h>
39 #include <sys/socket.h>
40 #include <netinet/in.h>
41 #include <arpa/inet.h>
42 #include <netinet/ip.h>
43 #include <netinet/tcp.h>
44 #include <netinet/udp.h>
45 #include <netinet/ip_icmp.h>
48 #include <sys/param.h>
73 #include <sys/select.h>
79 #include <fprobe-ulog.h>
81 #include <my_getopt.h>
109 static struct getopt_parms parms[] = {
110 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
111 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
112 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
113 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
114 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
115 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
116 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
118 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
119 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
122 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
123 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 extern int optind, opterr, optopt;
137 extern struct NetFlow NetFlow1;
138 extern struct NetFlow NetFlow5;
139 extern struct NetFlow NetFlow7;
141 #define mark_is_tos parms[Mflag].count
142 static unsigned scan_interval = 5;
143 static int frag_lifetime = 30;
144 static int inactive_lifetime = 60;
145 static int active_lifetime = 300;
146 static int sockbufsize;
147 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
148 #if (MEM_BITS == 0) || (MEM_BITS == 16)
149 #define BULK_QUANTITY 10000
151 #define BULK_QUANTITY 200
153 static unsigned bulk_quantity = BULK_QUANTITY;
154 static unsigned pending_queue_length = 100;
155 static struct NetFlow *netflow = &NetFlow5;
156 static unsigned verbosity = 6;
157 static unsigned log_dest = MY_LOG_SYSLOG;
158 static struct Time start_time;
159 static long start_time_offset;
162 extern unsigned total_elements;
163 extern unsigned free_elements;
164 extern unsigned total_memory;
165 #if ((DEBUG) & DEBUG_I)
166 static unsigned emit_pkts, emit_queue;
167 static uint64_t size_total;
168 static unsigned pkts_total, pkts_total_fragmented;
169 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
170 static unsigned pkts_pending, pkts_pending_done;
171 static unsigned pending_queue_trace, pending_queue_trace_candidate;
172 static unsigned flows_total, flows_fragmented;
174 static unsigned emit_count;
175 static uint32_t emit_sequence;
176 static unsigned emit_rate_bytes, emit_rate_delay;
177 static struct Time emit_time;
178 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
179 static pthread_t thid;
180 static sigset_t sig_mask;
181 static struct sched_param schedp;
182 static int sched_min, sched_max;
183 static int npeers, npeers_rot;
184 static struct peer *peers;
187 static struct Flow *flows[1 << HASH_BITS];
188 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
190 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
193 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
194 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
195 static struct Flow *pending_head, *pending_tail;
196 static struct Flow *scan_frag_dreg;
198 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
199 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
200 static struct Flow *flows_emit;
202 static char ident[256] = "fprobe-ulog";
203 static FILE *pidfile;
204 static char *pidfilepath;
207 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
208 static struct ipulog_handle *ulog_handle;
209 static uint32_t ulog_gmask = 1;
210 static char *cap_buf;
211 static int nsnmp_rules;
212 static struct snmp_rule *snmp_rules;
213 static struct passwd *pw = 0;
218 "fprobe-ulog: a NetFlow probe. Version %s\n"
219 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
221 "-h\t\tDisplay this help\n"
222 "-U <mask>\tULOG group bitwise mask [1]\n"
223 "-s <seconds>\tHow often scan for expired flows [5]\n"
224 "-g <seconds>\tFragmented flow lifetime [30]\n"
225 "-d <seconds>\tIdle flow lifetime (inactive timer) [60]\n"
226 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
227 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
228 "-a <address>\tUse <address> as source for NetFlow flow\n"
229 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
230 "-M\t\tUse netfilter mark value as ToS flag\n"
231 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
232 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
233 "-q <flows>\tPending queue length [100]\n"
234 "-B <kilobytes>\tKernel capture buffer size [0]\n"
235 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
236 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
237 "-c <directory>\tDirectory to chroot to\n"
238 "-u <user>\tUser to run as\n"
239 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
240 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
241 "remote:port\tAddress of the NetFlow collector\n",
242 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
246 #if ((DEBUG) & DEBUG_I)
249 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
250 pkts_total, pkts_total_fragmented, size_total,
251 pkts_pending - pkts_pending_done, pending_queue_trace);
252 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
253 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
254 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
255 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
256 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
257 total_elements, free_elements, total_memory);
261 void sighandler(int sig)
265 sigs |= SIGTERM_MASK;
267 #if ((DEBUG) & DEBUG_I)
269 sigs |= SIGUSR1_MASK;
275 void gettime(struct Time *now)
281 now->usec = t.tv_usec;
284 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
286 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
289 /* Uptime in miliseconds */
290 uint32_t getuptime(struct Time *t)
292 /* Maximum uptime is about 49/2 days */
293 return cmpmtime(t, &start_time);
296 hash_t hash_flow(struct Flow *flow)
298 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
299 else return hash(flow, sizeof(struct Flow_TL));
302 uint16_t snmp_index(char *name) {
305 if (!*name) return 0;
307 for (i = 0; (int) i < nsnmp_rules; i++) {
308 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
309 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
312 if ((i = if_nametoindex(name))) return i;
317 inline void copy_flow(struct Flow *src, struct Flow *dst)
324 dst->proto = src->proto;
325 dst->tcp_flags = src->tcp_flags;
329 dst->pkts = src->pkts;
330 dst->size = src->size;
331 dst->sizeF = src->sizeF;
332 dst->sizeP = src->sizeP;
333 dst->ctime = src->ctime;
334 dst->mtime = src->mtime;
335 dst->flags = src->flags;
338 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
340 struct Flow **flowpp;
346 if (prev) flowpp = *prev;
349 if (where->sip.s_addr == what->sip.s_addr
350 && where->dip.s_addr == what->dip.s_addr
351 && where->proto == what->proto) {
352 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
354 /* Both unfragmented */
355 if ((what->sp == where->sp)
356 && (what->dp == where->dp)) goto done;
359 /* Both fragmented */
360 if (where->id == what->id) goto done;
364 flowpp = &where->next;
368 if (prev) *prev = flowpp;
372 int put_into(struct Flow *flow, int flag
373 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
380 struct Flow *flown, **flowpp;
381 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
386 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
387 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
390 pthread_mutex_lock(&flows_mutex[h]);
392 if (!(flown = find(flows[h], flow, &flowpp))) {
393 /* No suitable flow found - add */
394 if (flag == COPY_INTO) {
395 if ((flown = mem_alloc())) {
396 copy_flow(flow, flown);
399 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
400 my_log(LOG_ERR, "%s %s. %s",
401 "mem_alloc():", strerror(errno), "packet lost");
406 flow->next = flows[h];
408 #if ((DEBUG) & DEBUG_I)
410 if (flow->flags & FLOW_FRAG) flows_fragmented++;
412 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
414 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
419 /* Found suitable flow - update */
420 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
421 sprintf(buf, " +> %x", (unsigned) flown);
424 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
425 flown->mtime = flow->mtime;
426 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
427 flown->ctime = flow->ctime;
428 flown->tcp_flags |= flow->tcp_flags;
429 flown->size += flow->size;
430 flown->pkts += flow->pkts;
431 if (flow->flags & FLOW_FRAG) {
432 /* Fragmented flow require some additional work */
433 if (flow->flags & FLOW_TL) {
436 Several packets with FLOW_TL (attack)
438 flown->sp = flow->sp;
439 flown->dp = flow->dp;
441 if (flow->flags & FLOW_LASTFRAG) {
444 Several packets with FLOW_LASTFRAG (attack)
446 flown->sizeP = flow->sizeP;
448 flown->flags |= flow->flags;
449 flown->sizeF += flow->sizeF;
450 if ((flown->flags & FLOW_LASTFRAG)
451 && (flown->sizeF >= flown->sizeP)) {
452 /* All fragments received - flow reassembled */
453 *flowpp = flown->next;
454 pthread_mutex_unlock(&flows_mutex[h]);
455 #if ((DEBUG) & DEBUG_I)
460 flown->flags &= ~FLOW_FRAG;
461 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
464 ret = put_into(flown, MOVE_INTO
465 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
471 if (flag == MOVE_INTO) mem_free(flow);
473 pthread_mutex_unlock(&flows_mutex[h]);
477 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
481 for (i = 0; i < fields; i++) {
482 #if ((DEBUG) & DEBUG_F)
483 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
486 case NETFLOW_IPV4_SRC_ADDR:
487 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
488 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
491 case NETFLOW_IPV4_DST_ADDR:
492 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
493 p += NETFLOW_IPV4_DST_ADDR_SIZE;
496 case NETFLOW_INPUT_SNMP:
497 *((uint16_t *) p) = htons(flow->iif);
498 p += NETFLOW_INPUT_SNMP_SIZE;
501 case NETFLOW_OUTPUT_SNMP:
502 *((uint16_t *) p) = htons(flow->oif);
503 p += NETFLOW_OUTPUT_SNMP_SIZE;
506 case NETFLOW_PKTS_32:
507 *((uint32_t *) p) = htonl(flow->pkts);
508 p += NETFLOW_PKTS_32_SIZE;
511 case NETFLOW_BYTES_32:
512 *((uint32_t *) p) = htonl(flow->size);
513 p += NETFLOW_BYTES_32_SIZE;
516 case NETFLOW_FIRST_SWITCHED:
517 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
518 p += NETFLOW_FIRST_SWITCHED_SIZE;
521 case NETFLOW_LAST_SWITCHED:
522 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
523 p += NETFLOW_LAST_SWITCHED_SIZE;
526 case NETFLOW_L4_SRC_PORT:
527 *((uint16_t *) p) = flow->sp;
528 p += NETFLOW_L4_SRC_PORT_SIZE;
531 case NETFLOW_L4_DST_PORT:
532 *((uint16_t *) p) = flow->dp;
533 p += NETFLOW_L4_DST_PORT_SIZE;
537 *((uint8_t *) p) = flow->proto;
538 p += NETFLOW_PROT_SIZE;
541 case NETFLOW_SRC_TOS:
542 *((uint8_t *) p) = flow->tos;
543 p += NETFLOW_SRC_TOS_SIZE;
546 case NETFLOW_TCP_FLAGS:
547 *((uint8_t *) p) = flow->tcp_flags;
548 p += NETFLOW_TCP_FLAGS_SIZE;
551 case NETFLOW_VERSION:
552 *((uint16_t *) p) = htons(netflow->Version);
553 p += NETFLOW_VERSION_SIZE;
557 *((uint16_t *) p) = htons(emit_count);
558 p += NETFLOW_COUNT_SIZE;
562 *((uint32_t *) p) = htonl(getuptime(&emit_time));
563 p += NETFLOW_UPTIME_SIZE;
566 case NETFLOW_UNIX_SECS:
567 *((uint32_t *) p) = htonl(emit_time.sec);
568 p += NETFLOW_UNIX_SECS_SIZE;
571 case NETFLOW_UNIX_NSECS:
572 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
573 p += NETFLOW_UNIX_NSECS_SIZE;
576 case NETFLOW_FLOW_SEQUENCE:
577 //*((uint32_t *) p) = htonl(emit_sequence);
578 *((uint32_t *) p) = 0;
579 p += NETFLOW_FLOW_SEQUENCE_SIZE;
583 /* Unsupported (uint8_t) */
584 case NETFLOW_ENGINE_TYPE:
585 case NETFLOW_ENGINE_ID:
586 case NETFLOW_FLAGS7_1:
587 case NETFLOW_SRC_MASK:
588 case NETFLOW_DST_MASK:
589 *((uint8_t *) p) = 0;
590 p += NETFLOW_PAD8_SIZE;
594 /* Unsupported (uint16_t) */
597 case NETFLOW_FLAGS7_2:
598 *((uint16_t *) p) = 0;
599 p += NETFLOW_PAD16_SIZE;
603 /* Unsupported (uint32_t) */
604 case NETFLOW_IPV4_NEXT_HOP:
605 case NETFLOW_ROUTER_SC:
606 *((uint32_t *) p) = 0;
607 p += NETFLOW_PAD32_SIZE;
611 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
612 format, i, format[i]);
616 #if ((DEBUG) & DEBUG_F)
617 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
624 Workaround for clone()-based threads
625 Try to change EUID independently of main thread
629 setregid(pw->pw_gid, pw->pw_gid);
630 setreuid(pw->pw_uid, pw->pw_uid);
639 struct timespec timeout;
640 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
642 p = (void *) &emit_packet + netflow->HeaderSize;
648 pthread_mutex_lock(&emit_mutex);
649 while (!flows_emit) {
650 gettimeofday(&now, 0);
651 timeout.tv_sec = now.tv_sec + emit_timeout;
652 /* Do not wait until emit_packet will filled - it may be too long */
653 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
654 pthread_mutex_unlock(&emit_mutex);
659 flows_emit = flows_emit->next;
660 #if ((DEBUG) & DEBUG_I)
663 pthread_mutex_unlock(&emit_mutex);
667 gettime(&start_time);
668 start_time.sec -= start_time_offset;
671 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
674 if (emit_count == netflow->MaxFlows) {
677 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
678 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
680 for (i = 0; i < npeers; i++) {
681 if (peers[i].type == PEER_MIRROR) goto sendreal;
682 if (peers[i].type == PEER_ROTATE)
683 if (peer_rot_cur++ == peer_rot_work) {
685 if (netflow->SeqOffset)
686 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
687 ret = send(peers[i].sock, emit_packet, size, 0);
689 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
690 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
691 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
694 #if ((DEBUG) & DEBUG_E)
696 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
697 emit_count, i + 1, peers[i].seq);
700 peers[i].seq += emit_count;
703 if (emit_rate_bytes) {
705 delay = sent / emit_rate_bytes;
707 sent %= emit_rate_bytes;
709 timeout.tv_nsec = emit_rate_delay * delay;
710 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
715 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
716 emit_sequence += emit_count;
718 #if ((DEBUG) & DEBUG_I)
725 void *unpending_thread()
728 struct timespec timeout;
729 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
736 pthread_mutex_lock(&unpending_mutex);
739 while (!(pending_tail->flags & FLOW_PENDING)) {
740 gettimeofday(&now, 0);
741 timeout.tv_sec = now.tv_sec + unpending_timeout;
742 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
745 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
748 if (put_into(pending_tail, COPY_INTO
749 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
753 #if ((DEBUG) & DEBUG_I)
754 pkts_lost_unpending++;
758 #if ((DEBUG) & DEBUG_U)
759 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
762 pending_tail->flags = 0;
763 pending_tail = pending_tail->next;
764 #if ((DEBUG) & DEBUG_I)
772 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
776 struct Flow *flow, **flowpp;
778 struct timespec timeout;
783 pthread_mutex_lock(&scan_mutex);
787 timeout.tv_sec = now.sec + scan_interval;
788 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
791 #if ((DEBUG) & DEBUG_S)
792 my_log(LOG_DEBUG, "S: %d", now.sec);
794 for (i = 0; i < 1 << HASH_BITS ; i++) {
795 pthread_mutex_lock(&flows_mutex[i]);
799 if (flow->flags & FLOW_FRAG) {
800 /* Process fragmented flow */
801 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
802 /* Fragmented flow expired - put it into special chain */
803 #if ((DEBUG) & DEBUG_I)
807 *flowpp = flow->next;
809 flow->flags &= ~FLOW_FRAG;
810 flow->next = scan_frag_dreg;
811 scan_frag_dreg = flow;
816 /* Flow is not frgamented */
817 if ((now.sec - flow->mtime.sec) > inactive_lifetime
818 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
820 #if ((DEBUG) & DEBUG_S)
821 my_log(LOG_DEBUG, "S: E %x", flow);
823 #if ((DEBUG) & DEBUG_I)
826 *flowpp = flow->next;
827 pthread_mutex_lock(&emit_mutex);
828 flow->next = flows_emit;
830 #if ((DEBUG) & DEBUG_I)
833 pthread_mutex_unlock(&emit_mutex);
838 flowpp = &flow->next;
841 pthread_mutex_unlock(&flows_mutex[i]);
843 if (flows_emit) pthread_cond_signal(&emit_cond);
845 while (scan_frag_dreg) {
846 flow = scan_frag_dreg;
847 scan_frag_dreg = flow->next;
848 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
851 put_into(flow, MOVE_INTO
852 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
856 #if ((DEBUG) & DEBUG_S)
857 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
865 struct ulog_packet_msg *ulog_msg;
869 int len, off_frag, psize;
870 #if ((DEBUG) & DEBUG_C)
878 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
880 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
883 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
885 #if ((DEBUG) & DEBUG_C)
886 sprintf(logbuf, "C: %d", ulog_msg->data_len);
889 nl = (void *) &ulog_msg->payload;
890 psize = ulog_msg->data_len;
893 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
894 #if ((DEBUG) & DEBUG_C)
895 strcat(logbuf, " U");
896 my_log(LOG_DEBUG, "%s", logbuf);
898 #if ((DEBUG) & DEBUG_I)
904 if (pending_head->flags) {
905 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
907 # if ((DEBUG) & DEBUG_C)
912 "pending queue full:", "packet lost");
914 #if ((DEBUG) & DEBUG_I)
920 #if ((DEBUG) & DEBUG_I)
926 /* ?FIXME? Add sanity check for ip_len? */
927 flow->size = ntohs(nl->ip_len);
928 #if ((DEBUG) & DEBUG_I)
929 size_total += flow->size;
932 flow->sip = nl->ip_src;
933 flow->dip = nl->ip_dst;
934 flow->iif = snmp_index(ulog_msg->indev_name);
935 flow->oif = snmp_index(ulog_msg->outdev_name);
936 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
937 flow->proto = nl->ip_p;
943 /* Packets captured from OUTPUT table didn't contains valid timestamp */
944 if (ulog_msg->timestamp_sec) {
945 flow->ctime.sec = ulog_msg->timestamp_sec;
946 flow->ctime.usec = ulog_msg->timestamp_usec;
947 } else gettime(&flow->ctime);
948 flow->mtime = flow->ctime;
950 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
953 Offset (from network layer) to transport layer header/IP data
954 IOW IP header size ;-)
957 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
959 off_tl = nl->ip_hl << 2;
960 tl = (void *) nl + off_tl;
962 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
963 flow->sizeF = ntohs(nl->ip_len) - off_tl;
965 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
966 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
968 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
969 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
970 #if ((DEBUG) & DEBUG_C)
971 strcat(logbuf, " F");
973 #if ((DEBUG) & DEBUG_I)
974 pkts_total_fragmented++;
976 flow->flags |= FLOW_FRAG;
977 flow->id = nl->ip_id;
979 if (!(ntohs(nl->ip_off) & IP_MF)) {
980 /* Packet whith IP_MF contains information about whole datagram size */
981 flow->flags |= FLOW_LASTFRAG;
982 /* size = frag_offset*8 + data_size */
983 flow->sizeP = off_frag + flow->sizeF;
987 #if ((DEBUG) & DEBUG_C)
988 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
990 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
995 Fortunately most interesting transport layer information fit
996 into first 8 bytes of IP data field (minimal nonzero size).
997 Thus we don't need actual packet reassembling to build whole
998 transport layer data. We only check the fragment offset for
999 zero value to find packet with this information.
1001 if (!off_frag && psize >= 8) {
1002 switch (flow->proto) {
1005 flow->sp = ((struct udphdr *)tl)->uh_sport;
1006 flow->dp = ((struct udphdr *)tl)->uh_dport;
1011 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1012 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1015 #ifdef ICMP_TRICK_CISCO
1017 flow->dp = *((int32_t *) tl);
1022 /* Unknown transport layer */
1023 #if ((DEBUG) & DEBUG_C)
1024 strcat(logbuf, " U");
1031 #if ((DEBUG) & DEBUG_C)
1032 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1033 strcat(logbuf, buf);
1035 flow->flags |= FLOW_TL;
1039 /* Check for tcp flags presence (including CWR and ECE). */
1040 if (flow->proto == IPPROTO_TCP
1042 && psize >= 16 - off_frag) {
1043 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1044 #if ((DEBUG) & DEBUG_C)
1045 sprintf(buf, " TCP:%x", flow->tcp_flags);
1046 strcat(logbuf, buf);
1050 #if ((DEBUG) & DEBUG_C)
1051 sprintf(buf, " => %x", (unsigned) flow);
1052 strcat(logbuf, buf);
1053 my_log(LOG_DEBUG, "%s", logbuf);
1056 #if ((DEBUG) & DEBUG_I)
1058 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1059 if (pending_queue_trace < pending_queue_trace_candidate)
1060 pending_queue_trace = pending_queue_trace_candidate;
1063 /* Flow complete - inform unpending_thread() about it */
1064 pending_head->flags |= FLOW_PENDING;
1065 pending_head = pending_head->next;
1067 pthread_cond_signal(&unpending_cond);
1073 int main(int argc, char **argv)
1076 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1077 int c, i, sock, memory_limit = 0;
1078 struct addrinfo hints, *res;
1079 struct sockaddr_in saddr;
1080 pthread_attr_t tattr;
1081 struct sigaction sigact;
1082 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1083 struct timeval timeout;
1085 sched_min = sched_get_priority_min(SCHED);
1086 sched_max = sched_get_priority_max(SCHED);
1088 memset(&saddr, 0 , sizeof(saddr));
1089 memset(&hints, 0 , sizeof(hints));
1090 hints.ai_flags = AI_PASSIVE;
1091 hints.ai_family = AF_INET;
1092 hints.ai_socktype = SOCK_DGRAM;
1094 /* Process command line options */
1097 while ((c = my_getopt(argc, argv, parms)) != -1) {
1107 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1108 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1109 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1110 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1111 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1112 if (parms[nflag].count) {
1113 switch (atoi(parms[nflag].arg)) {
1115 netflow = &NetFlow1;
1122 netflow = &NetFlow7;
1126 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1130 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1131 if (parms[lflag].count) {
1132 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1135 sprintf(errpbuf, "[%s]", log_suffix);
1136 strcat(ident, errpbuf);
1139 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1140 if (log_suffix) *--log_suffix = ':';
1142 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1144 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1147 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1148 if (parms[qflag].count) {
1149 pending_queue_length = atoi(parms[qflag].arg);
1150 if (pending_queue_length < 1) {
1151 fprintf(stderr, "Illegal %s\n", "pending queue length");
1155 if (parms[rflag].count) {
1156 schedp.sched_priority = atoi(parms[rflag].arg);
1157 if (schedp.sched_priority
1158 && (schedp.sched_priority < sched_min
1159 || schedp.sched_priority > sched_max)) {
1160 fprintf(stderr, "Illegal %s\n", "realtime priority");
1164 if (parms[Bflag].count) {
1165 sockbufsize = atoi(parms[Bflag].arg) << 10;
1167 if (parms[bflag].count) {
1168 bulk_quantity = atoi(parms[bflag].arg);
1169 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1170 fprintf(stderr, "Illegal %s\n", "bulk size");
1174 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1175 if (parms[Xflag].count) {
1176 for(i = 0; parms[Xflag].arg[i]; i++)
1177 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1178 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1180 rule = strtok(parms[Xflag].arg, ":");
1181 for (i = 0; rule; i++) {
1182 snmp_rules[i].len = strlen(rule);
1183 if (snmp_rules[i].len > IFNAMSIZ) {
1184 fprintf(stderr, "Illegal %s\n", "interface basename");
1187 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1188 if (!*(rule - 1)) *(rule - 1) = ',';
1189 rule = strtok(NULL, ",");
1191 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1194 snmp_rules[i].base = atoi(rule);
1196 rule = strtok(NULL, ":");
1200 if (parms[tflag].count)
1201 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1202 if (parms[aflag].count) {
1203 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1205 fprintf(stderr, "Illegal %s\n", "source address");
1208 saddr = *((struct sockaddr_in *) res->ai_addr);
1212 if (parms[uflag].count)
1213 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1214 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1219 /* Process collectors parameters. Brrrr... :-[ */
1221 npeers = argc - optind;
1222 if (npeers < 1) usage();
1223 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1224 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1226 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1228 if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1229 fprintf(stderr, "socket(): %s\n", strerror(errno));
1232 peers[npeers].sock = sock;
1233 peers[npeers].type = PEER_MIRROR;
1234 peers[npeers].laddr = saddr;
1235 peers[npeers].seq = 0;
1236 if ((lhost = strchr(dport, '/'))) {
1238 if ((type = strchr(lhost, '/'))) {
1246 peers[npeers].type = PEER_ROTATE;
1255 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1256 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1260 if (bind(sock, (struct sockaddr *) &peers[npeers].laddr,
1261 sizeof(struct sockaddr_in))) {
1262 fprintf(stderr, "bind(): %s\n", strerror(errno));
1265 if (getaddrinfo(dhost, dport, &hints, &res)) {
1267 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1270 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1272 if (connect(sock, (struct sockaddr *) &peers[npeers].addr,
1273 sizeof(struct sockaddr_in))) {
1274 fprintf(stderr, "connect(): %s\n", strerror(errno));
1278 /* Restore command line */
1279 if (type) *--type = '/';
1280 if (lhost) *--lhost = '/';
1284 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1285 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1287 fprintf(stderr, "libipulog initialization error: %s",
1288 ipulog_strerror(ipulog_errno));
1292 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1293 &sockbufsize, sizeof(sockbufsize)) < 0)
1294 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1296 /* Daemonize (if log destination stdout-free) */
1298 my_log_open(ident, verbosity, log_dest);
1299 if (!(log_dest & 2)) {
1302 fprintf(stderr, "fork(): %s", strerror(errno));
1307 freopen("/dev/null", "r", stdin);
1308 freopen("/dev/null", "w", stdout);
1309 freopen("/dev/null", "w", stderr);
1316 setvbuf(stdout, (char *)0, _IONBF, 0);
1317 setvbuf(stderr, (char *)0, _IONBF, 0);
1321 sprintf(errpbuf, "[%ld]", (long) pid);
1322 strcat(ident, errpbuf);
1324 /* Initialization */
1326 hash_init(); /* Actually for crc16 only */
1327 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1328 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1331 /* Hope 12 days is enough :-/ */
1332 start_time_offset = 1 << 20;
1334 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1336 gettime(&start_time);
1339 Build static pending queue as circular buffer.
1341 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1342 pending_tail = pending_head;
1343 for (i = pending_queue_length - 1; i--;) {
1344 if (!(pending_tail->next = mem_alloc())) {
1346 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1349 pending_tail = pending_tail->next;
1351 pending_tail->next = pending_head;
1352 pending_tail = pending_head;
1354 sigemptyset(&sig_mask);
1355 sigact.sa_handler = &sighandler;
1356 sigact.sa_mask = sig_mask;
1357 sigact.sa_flags = 0;
1358 sigaddset(&sig_mask, SIGTERM);
1359 sigaction(SIGTERM, &sigact, 0);
1360 #if ((DEBUG) & DEBUG_I)
1361 sigaddset(&sig_mask, SIGUSR1);
1362 sigaction(SIGUSR1, &sigact, 0);
1364 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1365 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1369 my_log(LOG_INFO, "Starting %s...", VERSION);
1371 if (parms[cflag].count) {
1372 if (chdir(parms[cflag].arg) || chroot(".")) {
1373 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1378 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1379 pthread_attr_init(&tattr);
1380 for (i = 0; i < THREADS - 1; i++) {
1381 if (schedp.sched_priority > 0) {
1382 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1383 (pthread_attr_setschedparam(&tattr, &schedp))) {
1384 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1388 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1389 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1392 pthread_detach(thid);
1393 schedp.sched_priority++;
1397 if (setgroups(0, NULL)) {
1398 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1401 if (setregid(pw->pw_gid, pw->pw_gid)) {
1402 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1405 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1406 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1411 if (!(pidfile = fopen(pidfilepath, "w")))
1412 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1414 fprintf(pidfile, "%ld\n", (long) pid);
1418 my_log(LOG_INFO, "pid: %d", pid);
1419 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1420 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1421 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1422 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1423 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1424 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1425 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1426 for (i = 0; i < nsnmp_rules; i++) {
1427 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1428 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1430 for (i = 0; i < npeers; i++) {
1431 switch (peers[i].type) {
1439 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1440 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1441 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1444 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1446 timeout.tv_usec = 0;
1448 || (total_elements - free_elements - pending_queue_length)
1450 || pending_tail->flags) {
1453 timeout.tv_sec = scan_interval;
1454 select(0, 0, 0, 0, &timeout);
1457 if (sigs & SIGTERM_MASK && !killed) {
1458 sigs &= ~SIGTERM_MASK;
1459 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1462 active_lifetime = -1;
1463 inactive_lifetime = -1;
1465 unpending_timeout = 1;
1467 pthread_cond_signal(&scan_cond);
1468 pthread_cond_signal(&unpending_cond);
1471 #if ((DEBUG) & DEBUG_I)
1472 if (sigs & SIGUSR1_MASK) {
1473 sigs &= ~SIGUSR1_MASK;
1478 remove(pidfilepath);
1479 #if ((DEBUG) & DEBUG_I)
1482 my_log(LOG_INFO, "Done.");