2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
13 /* stdout, stderr, freopen() */
19 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
28 #include <libipulog/libipulog.h>
29 struct ipulog_handle {
32 struct sockaddr_nl local;
33 struct sockaddr_nl peer;
34 struct nlmsghdr* last_nlhdr;
37 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
38 #include <sys/types.h>
39 #include <netinet/in_systm.h>
40 #include <sys/socket.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <netinet/ip.h>
44 #include <netinet/tcp.h>
45 #include <netinet/udp.h>
46 #include <netinet/ip_icmp.h>
49 #include <sys/param.h>
74 #include <sys/select.h>
80 #include <fprobe-ulog.h>
82 #include <my_getopt.h>
113 static struct getopt_parms parms[] = {
114 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
115 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
116 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
117 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
118 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
119 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
120 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
122 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 extern int optind, opterr, optopt;
144 extern struct NetFlow NetFlow1;
145 extern struct NetFlow NetFlow5;
146 extern struct NetFlow NetFlow7;
148 #define mark_is_tos parms[Mflag].count
149 static unsigned scan_interval = 5;
150 static int frag_lifetime = 30;
151 static int inactive_lifetime = 60;
152 static int active_lifetime = 300;
153 static int sockbufsize;
154 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
155 #if (MEM_BITS == 0) || (MEM_BITS == 16)
156 #define BULK_QUANTITY 10000
158 #define BULK_QUANTITY 200
161 static unsigned epoch_length=60, log_epochs=1;
162 static unsigned cur_epoch=0,prev_uptime=0;
164 static unsigned bulk_quantity = BULK_QUANTITY;
165 static unsigned pending_queue_length = 100;
166 static struct NetFlow *netflow = &NetFlow5;
167 static unsigned verbosity = 6;
168 static unsigned log_dest = MY_LOG_SYSLOG;
169 static struct Time start_time;
170 static long start_time_offset;
173 extern unsigned total_elements;
174 extern unsigned free_elements;
175 extern unsigned total_memory;
176 #if ((DEBUG) & DEBUG_I)
177 static unsigned emit_pkts, emit_queue;
178 static uint64_t size_total;
179 static unsigned pkts_total, pkts_total_fragmented;
180 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
181 static unsigned pkts_pending, pkts_pending_done;
182 static unsigned pending_queue_trace, pending_queue_trace_candidate;
183 static unsigned flows_total, flows_fragmented;
185 static unsigned emit_count;
186 static uint32_t emit_sequence;
187 static unsigned emit_rate_bytes, emit_rate_delay;
188 static struct Time emit_time;
189 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
190 static pthread_t thid;
191 static sigset_t sig_mask;
192 static struct sched_param schedp;
193 static int sched_min, sched_max;
194 static int npeers, npeers_rot;
195 static struct peer *peers;
198 static struct Flow *flows[1 << HASH_BITS];
199 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
201 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
202 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
204 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
205 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
206 static struct Flow *pending_head, *pending_tail;
207 static struct Flow *scan_frag_dreg;
209 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
210 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
211 static struct Flow *flows_emit;
213 static char ident[256] = "fprobe-ulog";
214 static FILE *pidfile;
215 static char *pidfilepath;
218 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
219 static struct ipulog_handle *ulog_handle;
220 static uint32_t ulog_gmask = 1;
221 static char *cap_buf;
222 static int nsnmp_rules;
223 static struct snmp_rule *snmp_rules;
224 static struct passwd *pw = 0;
229 "fprobe-ulog: a NetFlow probe. Version %s\n"
230 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
232 "-h\t\tDisplay this help\n"
233 "-U <mask>\tULOG group bitwise mask [1]\n"
234 "-s <seconds>\tHow often scan for expired flows [5]\n"
235 "-g <seconds>\tFragmented flow lifetime [30]\n"
236 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
237 "-f <filename>\tLog flow data in a file\n"
238 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
239 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
240 "-a <address>\tUse <address> as source for NetFlow flow\n"
241 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
242 "-M\t\tUse netfilter mark value as ToS flag\n"
243 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
244 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
245 "-q <flows>\tPending queue length [100]\n"
246 "-B <kilobytes>\tKernel capture buffer size [0]\n"
247 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
248 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
249 "-c <directory>\tDirectory to chroot to\n"
250 "-u <user>\tUser to run as\n"
251 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
252 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
253 "-y <remote:port>\tAddress of the NetFlow collector\n"
254 "-f <writable file>\tFile to write data into\n"
255 "-T <n>\tRotate log file every n epochs\n"
256 "-E <[1..60]>\tSize of an epoch in minutes\n",
257 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
261 #if ((DEBUG) & DEBUG_I)
264 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
265 pkts_total, pkts_total_fragmented, size_total,
266 pkts_pending - pkts_pending_done, pending_queue_trace);
267 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
268 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
269 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
270 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
271 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
272 total_elements, free_elements, total_memory);
276 void sighandler(int sig)
280 sigs |= SIGTERM_MASK;
282 #if ((DEBUG) & DEBUG_I)
284 sigs |= SIGUSR1_MASK;
290 void gettime(struct Time *now)
296 now->usec = t.tv_usec;
299 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
301 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
304 /* Uptime in miliseconds */
305 uint32_t getuptime(struct Time *t)
307 /* Maximum uptime is about 49/2 days */
308 return cmpmtime(t, &start_time);
311 hash_t hash_flow(struct Flow *flow)
313 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
314 else return hash(flow, sizeof(struct Flow_TL));
317 uint16_t snmp_index(char *name) {
320 if (!*name) return 0;
322 for (i = 0; (int) i < nsnmp_rules; i++) {
323 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
324 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
327 if ((i = if_nametoindex(name))) return i;
332 inline void copy_flow(struct Flow *src, struct Flow *dst)
339 dst->proto = src->proto;
340 dst->tcp_flags = src->tcp_flags;
344 dst->pkts = src->pkts;
345 dst->size = src->size;
346 dst->sizeF = src->sizeF;
347 dst->sizeP = src->sizeP;
348 dst->ctime = src->ctime;
349 dst->mtime = src->mtime;
350 dst->flags = src->flags;
353 unsigned get_log_fd(char *fname, unsigned cur_fd) {
358 cur_uptime = getuptime(&now);
360 /* Epoch lenght in minutes */
361 if ((cur_uptime - prev_uptime) > (1000 * 60 * epoch_length)) {
362 char nextname[MAX_PATH_LEN];
364 prev_uptime = cur_uptime;
365 cur_epoch = (cur_epoch + 1) % log_epochs;
367 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
368 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
369 fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
379 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
381 struct Flow **flowpp;
387 if (prev) flowpp = *prev;
390 if (where->sip.s_addr == what->sip.s_addr
391 && where->dip.s_addr == what->dip.s_addr
392 && where->proto == what->proto) {
393 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
395 /* Both unfragmented */
396 if ((what->sp == where->sp)
397 && (what->dp == where->dp)) goto done;
400 /* Both fragmented */
401 if (where->id == what->id) goto done;
405 flowpp = &where->next;
409 if (prev) *prev = flowpp;
413 int put_into(struct Flow *flow, int flag
414 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
421 struct Flow *flown, **flowpp;
422 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
427 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
428 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
431 pthread_mutex_lock(&flows_mutex[h]);
433 if (!(flown = find(flows[h], flow, &flowpp))) {
434 /* No suitable flow found - add */
435 if (flag == COPY_INTO) {
436 if ((flown = mem_alloc())) {
437 copy_flow(flow, flown);
440 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
441 my_log(LOG_ERR, "%s %s. %s",
442 "mem_alloc():", strerror(errno), "packet lost");
447 flow->next = flows[h];
449 #if ((DEBUG) & DEBUG_I)
451 if (flow->flags & FLOW_FRAG) flows_fragmented++;
453 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
455 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
460 /* Found suitable flow - update */
461 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
462 sprintf(buf, " +> %x", (unsigned) flown);
465 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
466 flown->mtime = flow->mtime;
467 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
468 flown->ctime = flow->ctime;
469 flown->tcp_flags |= flow->tcp_flags;
470 flown->size += flow->size;
471 flown->pkts += flow->pkts;
472 if (flow->flags & FLOW_FRAG) {
473 /* Fragmented flow require some additional work */
474 if (flow->flags & FLOW_TL) {
477 Several packets with FLOW_TL (attack)
479 flown->sp = flow->sp;
480 flown->dp = flow->dp;
482 if (flow->flags & FLOW_LASTFRAG) {
485 Several packets with FLOW_LASTFRAG (attack)
487 flown->sizeP = flow->sizeP;
489 flown->flags |= flow->flags;
490 flown->sizeF += flow->sizeF;
491 if ((flown->flags & FLOW_LASTFRAG)
492 && (flown->sizeF >= flown->sizeP)) {
493 /* All fragments received - flow reassembled */
494 *flowpp = flown->next;
495 pthread_mutex_unlock(&flows_mutex[h]);
496 #if ((DEBUG) & DEBUG_I)
501 flown->flags &= ~FLOW_FRAG;
502 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
505 ret = put_into(flown, MOVE_INTO
506 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
512 if (flag == MOVE_INTO) mem_free(flow);
514 pthread_mutex_unlock(&flows_mutex[h]);
518 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
522 for (i = 0; i < fields; i++) {
523 #if ((DEBUG) & DEBUG_F)
524 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
527 case NETFLOW_IPV4_SRC_ADDR:
528 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
529 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
532 case NETFLOW_IPV4_DST_ADDR:
533 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
534 p += NETFLOW_IPV4_DST_ADDR_SIZE;
537 case NETFLOW_INPUT_SNMP:
538 *((uint16_t *) p) = htons(flow->iif);
539 p += NETFLOW_INPUT_SNMP_SIZE;
542 case NETFLOW_OUTPUT_SNMP:
543 *((uint16_t *) p) = htons(flow->oif);
544 p += NETFLOW_OUTPUT_SNMP_SIZE;
547 case NETFLOW_PKTS_32:
548 *((uint32_t *) p) = htonl(flow->pkts);
549 p += NETFLOW_PKTS_32_SIZE;
552 case NETFLOW_BYTES_32:
553 *((uint32_t *) p) = htonl(flow->size);
554 p += NETFLOW_BYTES_32_SIZE;
557 case NETFLOW_FIRST_SWITCHED:
558 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
559 p += NETFLOW_FIRST_SWITCHED_SIZE;
562 case NETFLOW_LAST_SWITCHED:
563 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
564 p += NETFLOW_LAST_SWITCHED_SIZE;
567 case NETFLOW_L4_SRC_PORT:
568 *((uint16_t *) p) = flow->sp;
569 p += NETFLOW_L4_SRC_PORT_SIZE;
572 case NETFLOW_L4_DST_PORT:
573 *((uint16_t *) p) = flow->dp;
574 p += NETFLOW_L4_DST_PORT_SIZE;
578 *((uint8_t *) p) = flow->proto;
579 p += NETFLOW_PROT_SIZE;
582 case NETFLOW_SRC_TOS:
583 *((uint8_t *) p) = flow->tos;
584 p += NETFLOW_SRC_TOS_SIZE;
587 case NETFLOW_TCP_FLAGS:
588 *((uint8_t *) p) = flow->tcp_flags;
589 p += NETFLOW_TCP_FLAGS_SIZE;
592 case NETFLOW_VERSION:
593 *((uint16_t *) p) = htons(netflow->Version);
594 p += NETFLOW_VERSION_SIZE;
598 *((uint16_t *) p) = htons(emit_count);
599 p += NETFLOW_COUNT_SIZE;
603 *((uint32_t *) p) = htonl(getuptime(&emit_time));
604 p += NETFLOW_UPTIME_SIZE;
607 case NETFLOW_UNIX_SECS:
608 *((uint32_t *) p) = htonl(emit_time.sec);
609 p += NETFLOW_UNIX_SECS_SIZE;
612 case NETFLOW_UNIX_NSECS:
613 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
614 p += NETFLOW_UNIX_NSECS_SIZE;
617 case NETFLOW_FLOW_SEQUENCE:
618 //*((uint32_t *) p) = htonl(emit_sequence);
619 *((uint32_t *) p) = 0;
620 p += NETFLOW_FLOW_SEQUENCE_SIZE;
624 /* Unsupported (uint8_t) */
625 case NETFLOW_ENGINE_TYPE:
626 case NETFLOW_ENGINE_ID:
627 case NETFLOW_FLAGS7_1:
628 case NETFLOW_SRC_MASK:
629 case NETFLOW_DST_MASK:
630 *((uint8_t *) p) = 0;
631 p += NETFLOW_PAD8_SIZE;
633 case NETFLOW_PLANETLAB_XID:
634 *((uint16_t *) p) = flow->tos;
635 p += NETFLOW_PLANETLAB_XID_SIZE;
638 /* Unsupported (uint16_t) */
641 case NETFLOW_FLAGS7_2:
642 *((uint16_t *) p) = 0;
643 p += NETFLOW_PAD16_SIZE;
647 /* Unsupported (uint32_t) */
648 case NETFLOW_IPV4_NEXT_HOP:
649 case NETFLOW_ROUTER_SC:
650 *((uint32_t *) p) = 0;
651 p += NETFLOW_PAD32_SIZE;
655 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
656 format, i, format[i]);
660 #if ((DEBUG) & DEBUG_F)
661 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
668 Workaround for clone()-based threads
669 Try to change EUID independently of main thread
673 setregid(pw->pw_gid, pw->pw_gid);
674 setreuid(pw->pw_uid, pw->pw_uid);
683 struct timespec timeout;
684 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
686 p = (void *) &emit_packet + netflow->HeaderSize;
692 pthread_mutex_lock(&emit_mutex);
693 while (!flows_emit) {
694 gettimeofday(&now, 0);
695 timeout.tv_sec = now.tv_sec + emit_timeout;
696 /* Do not wait until emit_packet will filled - it may be too long */
697 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
698 pthread_mutex_unlock(&emit_mutex);
703 flows_emit = flows_emit->next;
704 #if ((DEBUG) & DEBUG_I)
707 pthread_mutex_unlock(&emit_mutex);
711 gettime(&start_time);
712 start_time.sec -= start_time_offset;
715 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
719 printf("Emit count = %d\n", emit_count);
722 if (emit_count == netflow->MaxFlows) {
725 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
726 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
727 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
728 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
730 for (i = 0; i < npeers; i++) {
731 if (peers[0].type == PEER_FILE) {
732 if (netflow->SeqOffset)
733 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
735 peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
736 ret = write(peers[0].write_fd, emit_packet, size);
738 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
739 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
740 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
744 #if ((DEBUG) & DEBUG_E)
746 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
747 emit_count, i + 1, peers[i].seq);
750 peers[0].seq += emit_count;
753 if (emit_rate_bytes) {
755 delay = sent / emit_rate_bytes;
757 sent %= emit_rate_bytes;
759 timeout.tv_nsec = emit_rate_delay * delay;
760 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
765 if (peers[i].type == PEER_MIRROR) goto sendreal;
767 if (peers[i].type == PEER_ROTATE)
768 if (peer_rot_cur++ == peer_rot_work) {
770 if (netflow->SeqOffset)
771 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
772 ret = send(peers[i].write_fd, emit_packet, size, 0);
774 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
775 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
776 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
779 #if ((DEBUG) & DEBUG_E)
781 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
782 emit_count, i + 1, peers[i].seq);
785 peers[i].seq += emit_count;
788 if (emit_rate_bytes) {
790 delay = sent / emit_rate_bytes;
792 sent %= emit_rate_bytes;
794 timeout.tv_nsec = emit_rate_delay * delay;
795 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
800 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
801 emit_sequence += emit_count;
803 #if ((DEBUG) & DEBUG_I)
810 void *unpending_thread()
813 struct timespec timeout;
814 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
821 pthread_mutex_lock(&unpending_mutex);
824 while (!(pending_tail->flags & FLOW_PENDING)) {
825 gettimeofday(&now, 0);
826 timeout.tv_sec = now.tv_sec + unpending_timeout;
827 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
830 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
833 if (put_into(pending_tail, COPY_INTO
834 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
838 #if ((DEBUG) & DEBUG_I)
839 pkts_lost_unpending++;
843 #if ((DEBUG) & DEBUG_U)
844 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
847 pending_tail->flags = 0;
848 pending_tail = pending_tail->next;
849 #if ((DEBUG) & DEBUG_I)
857 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
861 struct Flow *flow, **flowpp;
863 struct timespec timeout;
868 pthread_mutex_lock(&scan_mutex);
872 timeout.tv_sec = now.sec + scan_interval;
873 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
876 #if ((DEBUG) & DEBUG_S)
877 my_log(LOG_DEBUG, "S: %d", now.sec);
879 for (i = 0; i < 1 << HASH_BITS ; i++) {
880 pthread_mutex_lock(&flows_mutex[i]);
884 if (flow->flags & FLOW_FRAG) {
885 /* Process fragmented flow */
886 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
887 /* Fragmented flow expired - put it into special chain */
888 #if ((DEBUG) & DEBUG_I)
892 *flowpp = flow->next;
894 flow->flags &= ~FLOW_FRAG;
895 flow->next = scan_frag_dreg;
896 scan_frag_dreg = flow;
901 /* Flow is not frgamented */
902 if ((now.sec - flow->mtime.sec) > inactive_lifetime
903 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
905 #if ((DEBUG) & DEBUG_S)
906 my_log(LOG_DEBUG, "S: E %x", flow);
908 #if ((DEBUG) & DEBUG_I)
911 *flowpp = flow->next;
912 pthread_mutex_lock(&emit_mutex);
913 flow->next = flows_emit;
915 #if ((DEBUG) & DEBUG_I)
918 pthread_mutex_unlock(&emit_mutex);
923 flowpp = &flow->next;
926 pthread_mutex_unlock(&flows_mutex[i]);
928 if (flows_emit) pthread_cond_signal(&emit_cond);
930 while (scan_frag_dreg) {
931 flow = scan_frag_dreg;
932 scan_frag_dreg = flow->next;
933 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
936 put_into(flow, MOVE_INTO
937 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
941 #if ((DEBUG) & DEBUG_S)
942 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
950 struct ulog_packet_msg *ulog_msg;
954 int len, off_frag, psize;
955 #if ((DEBUG) & DEBUG_C)
963 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
965 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
968 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
970 #if ((DEBUG) & DEBUG_C)
971 sprintf(logbuf, "C: %d", ulog_msg->data_len);
974 nl = (void *) &ulog_msg->payload;
975 psize = ulog_msg->data_len;
978 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
979 #if ((DEBUG) & DEBUG_C)
980 strcat(logbuf, " U");
981 my_log(LOG_DEBUG, "%s", logbuf);
983 #if ((DEBUG) & DEBUG_I)
989 if (pending_head->flags) {
990 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
992 # if ((DEBUG) & DEBUG_C)
997 "pending queue full:", "packet lost");
999 #if ((DEBUG) & DEBUG_I)
1000 pkts_lost_capture++;
1005 #if ((DEBUG) & DEBUG_I)
1009 flow = pending_head;
1011 /* ?FIXME? Add sanity check for ip_len? */
1012 flow->size = ntohs(nl->ip_len);
1013 #if ((DEBUG) & DEBUG_I)
1014 size_total += flow->size;
1017 flow->sip = nl->ip_src;
1018 flow->dip = nl->ip_dst;
1019 flow->iif = snmp_index(ulog_msg->indev_name);
1020 flow->oif = snmp_index(ulog_msg->outdev_name);
1021 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1022 flow->proto = nl->ip_p;
1024 flow->tcp_flags = 0;
1028 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1029 if (ulog_msg->timestamp_sec) {
1030 flow->ctime.sec = ulog_msg->timestamp_sec;
1031 flow->ctime.usec = ulog_msg->timestamp_usec;
1032 } else gettime(&flow->ctime);
1033 flow->mtime = flow->ctime;
1035 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1038 Offset (from network layer) to transport layer header/IP data
1039 IOW IP header size ;-)
1042 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1044 off_tl = nl->ip_hl << 2;
1045 tl = (void *) nl + off_tl;
1047 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1048 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1050 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1051 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1053 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1054 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1055 #if ((DEBUG) & DEBUG_C)
1056 strcat(logbuf, " F");
1058 #if ((DEBUG) & DEBUG_I)
1059 pkts_total_fragmented++;
1061 flow->flags |= FLOW_FRAG;
1062 flow->id = nl->ip_id;
1064 if (!(ntohs(nl->ip_off) & IP_MF)) {
1065 /* Packet whith IP_MF contains information about whole datagram size */
1066 flow->flags |= FLOW_LASTFRAG;
1067 /* size = frag_offset*8 + data_size */
1068 flow->sizeP = off_frag + flow->sizeF;
1072 #if ((DEBUG) & DEBUG_C)
1073 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1074 strcat(logbuf, buf);
1075 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1076 strcat(logbuf, buf);
1080 Fortunately most interesting transport layer information fit
1081 into first 8 bytes of IP data field (minimal nonzero size).
1082 Thus we don't need actual packet reassembling to build whole
1083 transport layer data. We only check the fragment offset for
1084 zero value to find packet with this information.
1086 if (!off_frag && psize >= 8) {
1087 switch (flow->proto) {
1090 flow->sp = ((struct udphdr *)tl)->uh_sport;
1091 flow->dp = ((struct udphdr *)tl)->uh_dport;
1096 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1097 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1100 #ifdef ICMP_TRICK_CISCO
1102 flow->dp = *((int32_t *) tl);
1107 /* Unknown transport layer */
1108 #if ((DEBUG) & DEBUG_C)
1109 strcat(logbuf, " U");
1116 #if ((DEBUG) & DEBUG_C)
1117 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1118 strcat(logbuf, buf);
1120 flow->flags |= FLOW_TL;
1124 /* Check for tcp flags presence (including CWR and ECE). */
1125 if (flow->proto == IPPROTO_TCP
1127 && psize >= 16 - off_frag) {
1128 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1129 #if ((DEBUG) & DEBUG_C)
1130 sprintf(buf, " TCP:%x", flow->tcp_flags);
1131 strcat(logbuf, buf);
1135 #if ((DEBUG) & DEBUG_C)
1136 sprintf(buf, " => %x", (unsigned) flow);
1137 strcat(logbuf, buf);
1138 my_log(LOG_DEBUG, "%s", logbuf);
1141 #if ((DEBUG) & DEBUG_I)
1143 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1144 if (pending_queue_trace < pending_queue_trace_candidate)
1145 pending_queue_trace = pending_queue_trace_candidate;
1148 /* Flow complete - inform unpending_thread() about it */
1149 pending_head->flags |= FLOW_PENDING;
1150 pending_head = pending_head->next;
1152 pthread_cond_signal(&unpending_cond);
1158 int main(int argc, char **argv)
1161 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1162 int c, i, write_fd, memory_limit = 0;
1163 struct addrinfo hints, *res;
1164 struct sockaddr_in saddr;
1165 pthread_attr_t tattr;
1166 struct sigaction sigact;
1167 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1168 struct timeval timeout;
1170 sched_min = sched_get_priority_min(SCHED);
1171 sched_max = sched_get_priority_max(SCHED);
1173 memset(&saddr, 0 , sizeof(saddr));
1174 memset(&hints, 0 , sizeof(hints));
1175 hints.ai_flags = AI_PASSIVE;
1176 hints.ai_family = AF_INET;
1177 hints.ai_socktype = SOCK_DGRAM;
1179 /* Process command line options */
1182 while ((c = my_getopt(argc, argv, parms)) != -1) {
1192 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1193 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1194 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1195 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1196 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1197 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1198 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1199 if (parms[nflag].count) {
1200 switch (atoi(parms[nflag].arg)) {
1202 netflow = &NetFlow1;
1209 netflow = &NetFlow7;
1213 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1217 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1218 if (parms[lflag].count) {
1219 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1222 sprintf(errpbuf, "[%s]", log_suffix);
1223 strcat(ident, errpbuf);
1226 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1227 if (log_suffix) *--log_suffix = ':';
1229 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1231 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1234 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1235 if (parms[qflag].count) {
1236 pending_queue_length = atoi(parms[qflag].arg);
1237 if (pending_queue_length < 1) {
1238 fprintf(stderr, "Illegal %s\n", "pending queue length");
1242 if (parms[rflag].count) {
1243 schedp.sched_priority = atoi(parms[rflag].arg);
1244 if (schedp.sched_priority
1245 && (schedp.sched_priority < sched_min
1246 || schedp.sched_priority > sched_max)) {
1247 fprintf(stderr, "Illegal %s\n", "realtime priority");
1251 if (parms[Bflag].count) {
1252 sockbufsize = atoi(parms[Bflag].arg) << 10;
1254 if (parms[bflag].count) {
1255 bulk_quantity = atoi(parms[bflag].arg);
1256 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1257 fprintf(stderr, "Illegal %s\n", "bulk size");
1261 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1262 if (parms[Xflag].count) {
1263 for(i = 0; parms[Xflag].arg[i]; i++)
1264 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1265 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1267 rule = strtok(parms[Xflag].arg, ":");
1268 for (i = 0; rule; i++) {
1269 snmp_rules[i].len = strlen(rule);
1270 if (snmp_rules[i].len > IFNAMSIZ) {
1271 fprintf(stderr, "Illegal %s\n", "interface basename");
1274 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1275 if (!*(rule - 1)) *(rule - 1) = ',';
1276 rule = strtok(NULL, ",");
1278 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1281 snmp_rules[i].base = atoi(rule);
1283 rule = strtok(NULL, ":");
1287 if (parms[tflag].count)
1288 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1289 if (parms[aflag].count) {
1290 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1292 fprintf(stderr, "Illegal %s\n", "source address");
1295 saddr = *((struct sockaddr_in *) res->ai_addr);
1299 if (parms[uflag].count)
1300 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1301 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1306 /* Process collectors parameters. Brrrr... :-[ */
1308 npeers = argc - optind;
1310 /* Send to remote Netflow collector */
1311 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1312 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1314 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1316 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1317 fprintf(stderr, "socket(): %s\n", strerror(errno));
1320 peers[npeers].write_fd = write_fd;
1321 peers[npeers].type = PEER_MIRROR;
1322 peers[npeers].laddr = saddr;
1323 peers[npeers].seq = 0;
1324 if ((lhost = strchr(dport, '/'))) {
1326 if ((type = strchr(lhost, '/'))) {
1334 peers[npeers].type = PEER_ROTATE;
1343 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1344 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1348 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1349 sizeof(struct sockaddr_in))) {
1350 fprintf(stderr, "bind(): %s\n", strerror(errno));
1353 if (getaddrinfo(dhost, dport, &hints, &res)) {
1355 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1358 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1360 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1361 sizeof(struct sockaddr_in))) {
1362 fprintf(stderr, "connect(): %s\n", strerror(errno));
1366 /* Restore command line */
1367 if (type) *--type = '/';
1368 if (lhost) *--lhost = '/';
1372 else if (parms[fflag].count) {
1374 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1375 if (!(peers[0].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1376 strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
1378 peers[0].write_fd = -1;
1379 peers[0].type = PEER_FILE;
1387 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1388 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1390 fprintf(stderr, "libipulog initialization error: %s",
1391 ipulog_strerror(ipulog_errno));
1395 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1396 &sockbufsize, sizeof(sockbufsize)) < 0)
1397 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1399 /* Daemonize (if log destination stdout-free) */
1401 my_log_open(ident, verbosity, log_dest);
1402 if (!(log_dest & 2)) {
1405 fprintf(stderr, "fork(): %s", strerror(errno));
1410 freopen("/dev/null", "r", stdin);
1411 freopen("/dev/null", "w", stdout);
1412 freopen("/dev/null", "w", stderr);
1419 setvbuf(stdout, (char *)0, _IONBF, 0);
1420 setvbuf(stderr, (char *)0, _IONBF, 0);
1424 sprintf(errpbuf, "[%ld]", (long) pid);
1425 strcat(ident, errpbuf);
1427 /* Initialization */
1429 hash_init(); /* Actually for crc16 only */
1430 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1431 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1434 /* Hope 12 days is enough :-/ */
1435 start_time_offset = 1 << 20;
1437 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1439 gettime(&start_time);
1442 Build static pending queue as circular buffer.
1444 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1445 pending_tail = pending_head;
1446 for (i = pending_queue_length - 1; i--;) {
1447 if (!(pending_tail->next = mem_alloc())) {
1449 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1452 pending_tail = pending_tail->next;
1454 pending_tail->next = pending_head;
1455 pending_tail = pending_head;
1457 sigemptyset(&sig_mask);
1458 sigact.sa_handler = &sighandler;
1459 sigact.sa_mask = sig_mask;
1460 sigact.sa_flags = 0;
1461 sigaddset(&sig_mask, SIGTERM);
1462 sigaction(SIGTERM, &sigact, 0);
1463 #if ((DEBUG) & DEBUG_I)
1464 sigaddset(&sig_mask, SIGUSR1);
1465 sigaction(SIGUSR1, &sigact, 0);
1467 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1468 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1472 my_log(LOG_INFO, "Starting %s...", VERSION);
1474 if (parms[cflag].count) {
1475 if (chdir(parms[cflag].arg) || chroot(".")) {
1476 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1481 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1482 pthread_attr_init(&tattr);
1483 for (i = 0; i < THREADS - 1; i++) {
1484 if (schedp.sched_priority > 0) {
1485 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1486 (pthread_attr_setschedparam(&tattr, &schedp))) {
1487 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1491 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1492 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1495 pthread_detach(thid);
1496 schedp.sched_priority++;
1500 if (setgroups(0, NULL)) {
1501 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1504 if (setregid(pw->pw_gid, pw->pw_gid)) {
1505 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1508 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1509 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1514 if (!(pidfile = fopen(pidfilepath, "w")))
1515 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1517 fprintf(pidfile, "%ld\n", (long) pid);
1521 my_log(LOG_INFO, "pid: %d", pid);
1522 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1523 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1524 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1525 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1526 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1527 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1528 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1529 for (i = 0; i < nsnmp_rules; i++) {
1530 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1531 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1533 for (i = 0; i < npeers; i++) {
1534 switch (peers[i].type) {
1542 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1543 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1544 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1547 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1549 timeout.tv_usec = 0;
1551 || (total_elements - free_elements - pending_queue_length)
1553 || pending_tail->flags) {
1556 timeout.tv_sec = scan_interval;
1557 select(0, 0, 0, 0, &timeout);
1560 if (sigs & SIGTERM_MASK && !killed) {
1561 sigs &= ~SIGTERM_MASK;
1562 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1565 active_lifetime = -1;
1566 inactive_lifetime = -1;
1568 unpending_timeout = 1;
1570 pthread_cond_signal(&scan_cond);
1571 pthread_cond_signal(&unpending_cond);
1574 #if ((DEBUG) & DEBUG_I)
1575 if (sigs & SIGUSR1_MASK) {
1576 sigs &= ~SIGUSR1_MASK;
1581 remove(pidfilepath);
1582 #if ((DEBUG) & DEBUG_I)
1585 my_log(LOG_INFO, "Done.");