2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
11 Added data collection (-f) functionality, xid support in the header and log file
17 /* stdout, stderr, freopen() */
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
32 #include <libipulog/libipulog.h>
33 struct ipulog_handle {
36 struct sockaddr_nl local;
37 struct sockaddr_nl peer;
38 struct nlmsghdr* last_nlhdr;
41 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
42 #include <sys/types.h>
43 #include <netinet/in_systm.h>
44 #include <sys/socket.h>
45 #include <netinet/in.h>
46 #include <arpa/inet.h>
47 #include <netinet/ip.h>
48 #include <netinet/tcp.h>
49 #include <netinet/udp.h>
50 #include <netinet/ip_icmp.h>
53 #include <sys/param.h>
78 #include <sys/select.h>
84 #include <fprobe-ulog.h>
86 #include <my_getopt.h>
117 static struct getopt_parms parms[] = {
118 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
119 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
120 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
121 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
122 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
123 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
124 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 extern int optind, opterr, optopt;
148 extern struct NetFlow NetFlow1;
149 extern struct NetFlow NetFlow5;
150 extern struct NetFlow NetFlow7;
152 #define mark_is_tos parms[Mflag].count
153 static unsigned scan_interval = 5;
154 static int frag_lifetime = 30;
155 static int inactive_lifetime = 60;
156 static int active_lifetime = 300;
157 static int sockbufsize;
158 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
159 #if (MEM_BITS == 0) || (MEM_BITS == 16)
160 #define BULK_QUANTITY 10000
162 #define BULK_QUANTITY 200
165 static unsigned epoch_length=60, log_epochs=1;
166 static unsigned cur_epoch=0,prev_uptime=0;
168 static unsigned bulk_quantity = BULK_QUANTITY;
169 static unsigned pending_queue_length = 100;
170 static struct NetFlow *netflow = &NetFlow5;
171 static unsigned verbosity = 6;
172 static unsigned log_dest = MY_LOG_SYSLOG;
173 static struct Time start_time;
174 static long start_time_offset;
177 extern unsigned total_elements;
178 extern unsigned free_elements;
179 extern unsigned total_memory;
180 #if ((DEBUG) & DEBUG_I)
181 static unsigned emit_pkts, emit_queue;
182 static uint64_t size_total;
183 static unsigned pkts_total, pkts_total_fragmented;
184 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
185 static unsigned pkts_pending, pkts_pending_done;
186 static unsigned pending_queue_trace, pending_queue_trace_candidate;
187 static unsigned flows_total, flows_fragmented;
189 static unsigned emit_count;
190 static uint32_t emit_sequence;
191 static unsigned emit_rate_bytes, emit_rate_delay;
192 static struct Time emit_time;
193 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
194 static pthread_t thid;
195 static sigset_t sig_mask;
196 static struct sched_param schedp;
197 static int sched_min, sched_max;
198 static int npeers, npeers_rot;
199 static struct peer *peers;
202 static struct Flow *flows[1 << HASH_BITS];
203 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
205 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
206 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
208 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
209 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
210 static struct Flow *pending_head, *pending_tail;
211 static struct Flow *scan_frag_dreg;
213 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
215 static struct Flow *flows_emit;
217 static char ident[256] = "fprobe-ulog";
218 static FILE *pidfile;
219 static char *pidfilepath;
222 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
223 static struct ipulog_handle *ulog_handle;
224 static uint32_t ulog_gmask = 1;
225 static char *cap_buf;
226 static int nsnmp_rules;
227 static struct snmp_rule *snmp_rules;
228 static struct passwd *pw = 0;
233 "fprobe-ulog: a NetFlow probe. Version %s\n"
234 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
236 "-h\t\tDisplay this help\n"
237 "-U <mask>\tULOG group bitwise mask [1]\n"
238 "-s <seconds>\tHow often scan for expired flows [5]\n"
239 "-g <seconds>\tFragmented flow lifetime [30]\n"
240 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
241 "-f <filename>\tLog flow data in a file\n"
242 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
243 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
244 "-a <address>\tUse <address> as source for NetFlow flow\n"
245 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
246 "-M\t\tUse netfilter mark value as ToS flag\n"
247 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
248 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
249 "-q <flows>\tPending queue length [100]\n"
250 "-B <kilobytes>\tKernel capture buffer size [0]\n"
251 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
252 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
253 "-c <directory>\tDirectory to chroot to\n"
254 "-u <user>\tUser to run as\n"
255 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
256 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
257 "-y <remote:port>\tAddress of the NetFlow collector\n"
258 "-f <writable file>\tFile to write data into\n"
259 "-T <n>\tRotate log file every n epochs\n"
260 "-E <[1..60]>\tSize of an epoch in minutes\n",
261 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
265 #if ((DEBUG) & DEBUG_I)
268 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
269 pkts_total, pkts_total_fragmented, size_total,
270 pkts_pending - pkts_pending_done, pending_queue_trace);
271 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
272 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
273 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
274 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
275 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
276 total_elements, free_elements, total_memory);
280 void sighandler(int sig)
284 sigs |= SIGTERM_MASK;
286 #if ((DEBUG) & DEBUG_I)
288 sigs |= SIGUSR1_MASK;
294 void gettime(struct Time *now)
300 now->usec = t.tv_usec;
304 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
306 return (t1->sec - t2->sec)/60;
309 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
311 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
314 /* Uptime in miliseconds */
315 uint32_t getuptime(struct Time *t)
317 /* Maximum uptime is about 49/2 days */
318 return cmpmtime(t, &start_time);
321 /* Uptime in minutes */
322 uint32_t getuptime_minutes(struct Time *t)
324 /* Maximum uptime is about 49/2 days */
325 return cmpMtime(t, &start_time);
328 hash_t hash_flow(struct Flow *flow)
330 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
331 else return hash(flow, sizeof(struct Flow_TL));
334 uint16_t snmp_index(char *name) {
337 if (!*name) return 0;
339 for (i = 0; (int) i < nsnmp_rules; i++) {
340 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
341 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
344 if ((i = if_nametoindex(name))) return i;
349 inline void copy_flow(struct Flow *src, struct Flow *dst)
356 dst->proto = src->proto;
357 dst->tcp_flags = src->tcp_flags;
361 dst->pkts = src->pkts;
362 dst->size = src->size;
363 dst->sizeF = src->sizeF;
364 dst->sizeP = src->sizeP;
365 dst->ctime = src->ctime;
366 dst->mtime = src->mtime;
367 dst->flags = src->flags;
370 unsigned get_log_fd(char *fname, unsigned cur_fd) {
375 cur_uptime = getuptime_minutes(&now);
377 /* Epoch length in minutes */
378 if ((cur_uptime - prev_uptime) > epoch_length || cur_fd==-1) {
379 char nextname[MAX_PATH_LEN];
381 prev_uptime = cur_uptime;
382 cur_epoch = (cur_epoch + 1) % log_epochs;
384 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
385 if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
386 fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
396 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
398 struct Flow **flowpp;
404 if (prev) flowpp = *prev;
407 if (where->sip.s_addr == what->sip.s_addr
408 && where->dip.s_addr == what->dip.s_addr
409 && where->proto == what->proto) {
410 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
412 /* Both unfragmented */
413 if ((what->sp == where->sp)
414 && (what->dp == where->dp)) goto done;
417 /* Both fragmented */
418 if (where->id == what->id) goto done;
422 flowpp = &where->next;
426 if (prev) *prev = flowpp;
430 int put_into(struct Flow *flow, int flag
431 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
438 struct Flow *flown, **flowpp;
439 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
444 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
445 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
448 pthread_mutex_lock(&flows_mutex[h]);
450 if (!(flown = find(flows[h], flow, &flowpp))) {
451 /* No suitable flow found - add */
452 if (flag == COPY_INTO) {
453 if ((flown = mem_alloc())) {
454 copy_flow(flow, flown);
457 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
458 my_log(LOG_ERR, "%s %s. %s",
459 "mem_alloc():", strerror(errno), "packet lost");
464 flow->next = flows[h];
466 #if ((DEBUG) & DEBUG_I)
468 if (flow->flags & FLOW_FRAG) flows_fragmented++;
470 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
472 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
477 /* Found suitable flow - update */
478 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
479 sprintf(buf, " +> %x", (unsigned) flown);
482 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
483 flown->mtime = flow->mtime;
484 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
485 flown->ctime = flow->ctime;
486 flown->tcp_flags |= flow->tcp_flags;
487 flown->size += flow->size;
488 flown->pkts += flow->pkts;
489 if (flow->flags & FLOW_FRAG) {
490 /* Fragmented flow require some additional work */
491 if (flow->flags & FLOW_TL) {
494 Several packets with FLOW_TL (attack)
496 flown->sp = flow->sp;
497 flown->dp = flow->dp;
499 if (flow->flags & FLOW_LASTFRAG) {
502 Several packets with FLOW_LASTFRAG (attack)
504 flown->sizeP = flow->sizeP;
506 flown->flags |= flow->flags;
507 flown->sizeF += flow->sizeF;
508 if ((flown->flags & FLOW_LASTFRAG)
509 && (flown->sizeF >= flown->sizeP)) {
510 /* All fragments received - flow reassembled */
511 *flowpp = flown->next;
512 pthread_mutex_unlock(&flows_mutex[h]);
513 #if ((DEBUG) & DEBUG_I)
518 flown->flags &= ~FLOW_FRAG;
519 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
522 ret = put_into(flown, MOVE_INTO
523 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
529 if (flag == MOVE_INTO) mem_free(flow);
531 pthread_mutex_unlock(&flows_mutex[h]);
535 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
539 for (i = 0; i < fields; i++) {
540 #if ((DEBUG) & DEBUG_F)
541 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
544 case NETFLOW_IPV4_SRC_ADDR:
545 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
546 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
549 case NETFLOW_IPV4_DST_ADDR:
550 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
551 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
552 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
554 p += NETFLOW_IPV4_DST_ADDR_SIZE;
557 case NETFLOW_INPUT_SNMP:
558 *((uint16_t *) p) = htons(flow->iif);
559 p += NETFLOW_INPUT_SNMP_SIZE;
562 case NETFLOW_OUTPUT_SNMP:
563 *((uint16_t *) p) = htons(flow->oif);
564 p += NETFLOW_OUTPUT_SNMP_SIZE;
567 case NETFLOW_PKTS_32:
568 *((uint32_t *) p) = htonl(flow->pkts);
569 p += NETFLOW_PKTS_32_SIZE;
572 case NETFLOW_BYTES_32:
573 *((uint32_t *) p) = htonl(flow->size);
574 p += NETFLOW_BYTES_32_SIZE;
577 case NETFLOW_FIRST_SWITCHED:
578 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
579 p += NETFLOW_FIRST_SWITCHED_SIZE;
582 case NETFLOW_LAST_SWITCHED:
583 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
584 p += NETFLOW_LAST_SWITCHED_SIZE;
587 case NETFLOW_L4_SRC_PORT:
588 *((uint16_t *) p) = flow->sp;
589 p += NETFLOW_L4_SRC_PORT_SIZE;
592 case NETFLOW_L4_DST_PORT:
593 *((uint16_t *) p) = flow->dp;
594 p += NETFLOW_L4_DST_PORT_SIZE;
598 *((uint8_t *) p) = flow->proto;
599 p += NETFLOW_PROT_SIZE;
602 case NETFLOW_SRC_TOS:
603 *((uint8_t *) p) = flow->tos;
604 p += NETFLOW_SRC_TOS_SIZE;
607 case NETFLOW_TCP_FLAGS:
608 *((uint8_t *) p) = flow->tcp_flags;
609 p += NETFLOW_TCP_FLAGS_SIZE;
612 case NETFLOW_VERSION:
613 *((uint16_t *) p) = htons(netflow->Version);
614 p += NETFLOW_VERSION_SIZE;
618 *((uint16_t *) p) = htons(emit_count);
619 p += NETFLOW_COUNT_SIZE;
623 *((uint32_t *) p) = htonl(getuptime(&emit_time));
624 p += NETFLOW_UPTIME_SIZE;
627 case NETFLOW_UNIX_SECS:
628 *((uint32_t *) p) = htonl(emit_time.sec);
629 p += NETFLOW_UNIX_SECS_SIZE;
632 case NETFLOW_UNIX_NSECS:
633 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
634 p += NETFLOW_UNIX_NSECS_SIZE;
637 case NETFLOW_FLOW_SEQUENCE:
638 //*((uint32_t *) p) = htonl(emit_sequence);
639 *((uint32_t *) p) = 0;
640 p += NETFLOW_FLOW_SEQUENCE_SIZE;
644 /* Unsupported (uint8_t) */
645 case NETFLOW_ENGINE_TYPE:
646 case NETFLOW_ENGINE_ID:
647 case NETFLOW_FLAGS7_1:
648 case NETFLOW_SRC_MASK:
649 case NETFLOW_DST_MASK:
650 *((uint8_t *) p) = 0;
651 p += NETFLOW_PAD8_SIZE;
654 *((uint16_t *) p) = flow->tos;
655 p += NETFLOW_XID_SIZE;
658 /* Unsupported (uint16_t) */
661 case NETFLOW_FLAGS7_2:
662 *((uint16_t *) p) = 0;
663 p += NETFLOW_PAD16_SIZE;
667 /* Unsupported (uint32_t) */
668 case NETFLOW_IPV4_NEXT_HOP:
669 case NETFLOW_ROUTER_SC:
670 *((uint32_t *) p) = 0;
671 p += NETFLOW_PAD32_SIZE;
675 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
676 format, i, format[i]);
680 #if ((DEBUG) & DEBUG_F)
681 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
688 Workaround for clone()-based threads
689 Try to change EUID independently of main thread
693 setregid(pw->pw_gid, pw->pw_gid);
694 setreuid(pw->pw_uid, pw->pw_uid);
703 struct timespec timeout;
704 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
706 p = (void *) &emit_packet + netflow->HeaderSize;
712 pthread_mutex_lock(&emit_mutex);
713 while (!flows_emit) {
714 gettimeofday(&now, 0);
715 timeout.tv_sec = now.tv_sec + emit_timeout;
716 /* Do not wait until emit_packet will filled - it may be too long */
717 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
718 pthread_mutex_unlock(&emit_mutex);
723 flows_emit = flows_emit->next;
724 #if ((DEBUG) & DEBUG_I)
727 pthread_mutex_unlock(&emit_mutex);
731 gettime(&start_time);
732 start_time.sec -= start_time_offset;
735 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
739 printf("Emit count = %d\n", emit_count);
742 if (emit_count == netflow->MaxFlows) {
745 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
746 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
747 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
748 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
750 for (i = 0; i < npeers; i++) {
751 if (peers[i].type == PEER_FILE) {
752 if (netflow->SeqOffset)
753 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
754 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
755 ret = write(peers[0].write_fd, emit_packet, size);
757 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
758 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
759 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
763 #if ((DEBUG) & DEBUG_E)
765 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
766 emit_count, i + 1, peers[i].seq);
769 peers[0].seq += emit_count;
772 if (emit_rate_bytes) {
774 delay = sent / emit_rate_bytes;
776 sent %= emit_rate_bytes;
778 timeout.tv_nsec = emit_rate_delay * delay;
779 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
784 if (peers[i].type == PEER_MIRROR) goto sendreal;
786 if (peers[i].type == PEER_ROTATE)
787 if (peer_rot_cur++ == peer_rot_work) {
789 if (netflow->SeqOffset)
790 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
791 ret = send(peers[i].write_fd, emit_packet, size, 0);
793 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
794 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
795 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
798 #if ((DEBUG) & DEBUG_E)
800 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
801 emit_count, i + 1, peers[i].seq);
804 peers[i].seq += emit_count;
807 if (emit_rate_bytes) {
809 delay = sent / emit_rate_bytes;
811 sent %= emit_rate_bytes;
813 timeout.tv_nsec = emit_rate_delay * delay;
814 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
819 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
820 emit_sequence += emit_count;
822 #if ((DEBUG) & DEBUG_I)
829 void *unpending_thread()
832 struct timespec timeout;
833 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
840 pthread_mutex_lock(&unpending_mutex);
843 while (!(pending_tail->flags & FLOW_PENDING)) {
844 gettimeofday(&now, 0);
845 timeout.tv_sec = now.tv_sec + unpending_timeout;
846 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
849 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
852 if (put_into(pending_tail, COPY_INTO
853 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
857 #if ((DEBUG) & DEBUG_I)
858 pkts_lost_unpending++;
862 #if ((DEBUG) & DEBUG_U)
863 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
866 pending_tail->flags = 0;
867 pending_tail = pending_tail->next;
868 #if ((DEBUG) & DEBUG_I)
876 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
880 struct Flow *flow, **flowpp;
882 struct timespec timeout;
887 pthread_mutex_lock(&scan_mutex);
891 timeout.tv_sec = now.sec + scan_interval;
892 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
895 #if ((DEBUG) & DEBUG_S)
896 my_log(LOG_DEBUG, "S: %d", now.sec);
898 for (i = 0; i < 1 << HASH_BITS ; i++) {
899 pthread_mutex_lock(&flows_mutex[i]);
903 if (flow->flags & FLOW_FRAG) {
904 /* Process fragmented flow */
905 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
906 /* Fragmented flow expired - put it into special chain */
907 #if ((DEBUG) & DEBUG_I)
911 *flowpp = flow->next;
913 flow->flags &= ~FLOW_FRAG;
914 flow->next = scan_frag_dreg;
915 scan_frag_dreg = flow;
920 /* Flow is not frgamented */
921 if ((now.sec - flow->mtime.sec) > inactive_lifetime
922 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
924 #if ((DEBUG) & DEBUG_S)
925 my_log(LOG_DEBUG, "S: E %x", flow);
927 #if ((DEBUG) & DEBUG_I)
930 *flowpp = flow->next;
931 pthread_mutex_lock(&emit_mutex);
932 flow->next = flows_emit;
934 #if ((DEBUG) & DEBUG_I)
937 pthread_mutex_unlock(&emit_mutex);
942 flowpp = &flow->next;
945 pthread_mutex_unlock(&flows_mutex[i]);
947 if (flows_emit) pthread_cond_signal(&emit_cond);
949 while (scan_frag_dreg) {
950 flow = scan_frag_dreg;
951 scan_frag_dreg = flow->next;
952 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
955 put_into(flow, MOVE_INTO
956 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
960 #if ((DEBUG) & DEBUG_S)
961 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
969 struct ulog_packet_msg *ulog_msg;
973 int len, off_frag, psize;
974 #if ((DEBUG) & DEBUG_C)
982 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
984 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
987 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
989 #if ((DEBUG) & DEBUG_C)
990 sprintf(logbuf, "C: %d", ulog_msg->data_len);
993 nl = (void *) &ulog_msg->payload;
994 psize = ulog_msg->data_len;
997 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
998 #if ((DEBUG) & DEBUG_C)
999 strcat(logbuf, " U");
1000 my_log(LOG_DEBUG, "%s", logbuf);
1002 #if ((DEBUG) & DEBUG_I)
1008 if (pending_head->flags) {
1009 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1011 # if ((DEBUG) & DEBUG_C)
1016 "pending queue full:", "packet lost");
1018 #if ((DEBUG) & DEBUG_I)
1019 pkts_lost_capture++;
1024 #if ((DEBUG) & DEBUG_I)
1028 flow = pending_head;
1030 /* ?FIXME? Add sanity check for ip_len? */
1031 flow->size = ntohs(nl->ip_len);
1032 #if ((DEBUG) & DEBUG_I)
1033 size_total += flow->size;
1036 flow->sip = nl->ip_src;
1037 flow->dip = nl->ip_dst;
1038 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1039 my_log(LOG_INFO, "Received test flow to corewars.org");
1041 flow->iif = snmp_index(ulog_msg->indev_name);
1042 flow->oif = snmp_index(ulog_msg->outdev_name);
1043 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1044 flow->proto = nl->ip_p;
1046 flow->tcp_flags = 0;
1050 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1051 if (ulog_msg->timestamp_sec) {
1052 flow->ctime.sec = ulog_msg->timestamp_sec;
1053 flow->ctime.usec = ulog_msg->timestamp_usec;
1054 } else gettime(&flow->ctime);
1055 flow->mtime = flow->ctime;
1057 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1060 Offset (from network layer) to transport layer header/IP data
1061 IOW IP header size ;-)
1064 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1066 off_tl = nl->ip_hl << 2;
1067 tl = (void *) nl + off_tl;
1069 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1070 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1072 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1073 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1075 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1076 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1077 #if ((DEBUG) & DEBUG_C)
1078 strcat(logbuf, " F");
1080 #if ((DEBUG) & DEBUG_I)
1081 pkts_total_fragmented++;
1083 flow->flags |= FLOW_FRAG;
1084 flow->id = nl->ip_id;
1086 if (!(ntohs(nl->ip_off) & IP_MF)) {
1087 /* Packet whith IP_MF contains information about whole datagram size */
1088 flow->flags |= FLOW_LASTFRAG;
1089 /* size = frag_offset*8 + data_size */
1090 flow->sizeP = off_frag + flow->sizeF;
1094 #if ((DEBUG) & DEBUG_C)
1095 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1096 strcat(logbuf, buf);
1097 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1098 strcat(logbuf, buf);
1102 Fortunately most interesting transport layer information fit
1103 into first 8 bytes of IP data field (minimal nonzero size).
1104 Thus we don't need actual packet reassembling to build whole
1105 transport layer data. We only check the fragment offset for
1106 zero value to find packet with this information.
1108 if (!off_frag && psize >= 8) {
1109 switch (flow->proto) {
1112 flow->sp = ((struct udphdr *)tl)->uh_sport;
1113 flow->dp = ((struct udphdr *)tl)->uh_dport;
1118 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1119 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1122 #ifdef ICMP_TRICK_CISCO
1124 flow->dp = *((int32_t *) tl);
1129 /* Unknown transport layer */
1130 #if ((DEBUG) & DEBUG_C)
1131 strcat(logbuf, " U");
1138 #if ((DEBUG) & DEBUG_C)
1139 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1140 strcat(logbuf, buf);
1142 flow->flags |= FLOW_TL;
1146 /* Check for tcp flags presence (including CWR and ECE). */
1147 if (flow->proto == IPPROTO_TCP
1149 && psize >= 16 - off_frag) {
1150 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1151 #if ((DEBUG) & DEBUG_C)
1152 sprintf(buf, " TCP:%x", flow->tcp_flags);
1153 strcat(logbuf, buf);
1157 #if ((DEBUG) & DEBUG_C)
1158 sprintf(buf, " => %x", (unsigned) flow);
1159 strcat(logbuf, buf);
1160 my_log(LOG_DEBUG, "%s", logbuf);
1163 #if ((DEBUG) & DEBUG_I)
1165 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1166 if (pending_queue_trace < pending_queue_trace_candidate)
1167 pending_queue_trace = pending_queue_trace_candidate;
1170 /* Flow complete - inform unpending_thread() about it */
1171 pending_head->flags |= FLOW_PENDING;
1172 pending_head = pending_head->next;
1174 pthread_cond_signal(&unpending_cond);
1180 int main(int argc, char **argv)
1183 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1184 int c, i, write_fd, memory_limit = 0;
1185 struct addrinfo hints, *res;
1186 struct sockaddr_in saddr;
1187 pthread_attr_t tattr;
1188 struct sigaction sigact;
1189 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1190 struct timeval timeout;
1192 sched_min = sched_get_priority_min(SCHED);
1193 sched_max = sched_get_priority_max(SCHED);
1195 memset(&saddr, 0 , sizeof(saddr));
1196 memset(&hints, 0 , sizeof(hints));
1197 hints.ai_flags = AI_PASSIVE;
1198 hints.ai_family = AF_INET;
1199 hints.ai_socktype = SOCK_DGRAM;
1201 /* Process command line options */
1204 while ((c = my_getopt(argc, argv, parms)) != -1) {
1214 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1215 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1216 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1217 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1218 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1219 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1220 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1221 if (parms[nflag].count) {
1222 switch (atoi(parms[nflag].arg)) {
1224 netflow = &NetFlow1;
1231 netflow = &NetFlow7;
1235 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1239 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1240 if (parms[lflag].count) {
1241 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1244 sprintf(errpbuf, "[%s]", log_suffix);
1245 strcat(ident, errpbuf);
1248 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1249 if (log_suffix) *--log_suffix = ':';
1251 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1253 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1256 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1257 if (parms[qflag].count) {
1258 pending_queue_length = atoi(parms[qflag].arg);
1259 if (pending_queue_length < 1) {
1260 fprintf(stderr, "Illegal %s\n", "pending queue length");
1264 if (parms[rflag].count) {
1265 schedp.sched_priority = atoi(parms[rflag].arg);
1266 if (schedp.sched_priority
1267 && (schedp.sched_priority < sched_min
1268 || schedp.sched_priority > sched_max)) {
1269 fprintf(stderr, "Illegal %s\n", "realtime priority");
1273 if (parms[Bflag].count) {
1274 sockbufsize = atoi(parms[Bflag].arg) << 10;
1276 if (parms[bflag].count) {
1277 bulk_quantity = atoi(parms[bflag].arg);
1278 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1279 fprintf(stderr, "Illegal %s\n", "bulk size");
1283 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1284 if (parms[Xflag].count) {
1285 for(i = 0; parms[Xflag].arg[i]; i++)
1286 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1287 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1289 rule = strtok(parms[Xflag].arg, ":");
1290 for (i = 0; rule; i++) {
1291 snmp_rules[i].len = strlen(rule);
1292 if (snmp_rules[i].len > IFNAMSIZ) {
1293 fprintf(stderr, "Illegal %s\n", "interface basename");
1296 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1297 if (!*(rule - 1)) *(rule - 1) = ',';
1298 rule = strtok(NULL, ",");
1300 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1303 snmp_rules[i].base = atoi(rule);
1305 rule = strtok(NULL, ":");
1309 if (parms[tflag].count)
1310 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1311 if (parms[aflag].count) {
1312 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1314 fprintf(stderr, "Illegal %s\n", "source address");
1317 saddr = *((struct sockaddr_in *) res->ai_addr);
1321 if (parms[uflag].count)
1322 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1323 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1328 /* Process collectors parameters. Brrrr... :-[ */
1330 npeers = argc - optind;
1332 /* Send to remote Netflow collector */
1333 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1334 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1336 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1338 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1339 fprintf(stderr, "socket(): %s\n", strerror(errno));
1342 peers[npeers].write_fd = write_fd;
1343 peers[npeers].type = PEER_MIRROR;
1344 peers[npeers].laddr = saddr;
1345 peers[npeers].seq = 0;
1346 if ((lhost = strchr(dport, '/'))) {
1348 if ((type = strchr(lhost, '/'))) {
1356 peers[npeers].type = PEER_ROTATE;
1365 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1366 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1370 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1371 sizeof(struct sockaddr_in))) {
1372 fprintf(stderr, "bind(): %s\n", strerror(errno));
1375 if (getaddrinfo(dhost, dport, &hints, &res)) {
1377 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1380 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1382 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1383 sizeof(struct sockaddr_in))) {
1384 fprintf(stderr, "connect(): %s\n", strerror(errno));
1388 /* Restore command line */
1389 if (type) *--type = '/';
1390 if (lhost) *--lhost = '/';
1394 else if (parms[fflag].count) {
1396 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1397 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1398 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1400 peers[npeers].write_fd = -1;
1401 peers[npeers].type = PEER_FILE;
1402 peers[npeers].seq = 0;
1409 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1410 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1412 fprintf(stderr, "libipulog initialization error: %s",
1413 ipulog_strerror(ipulog_errno));
1417 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1418 &sockbufsize, sizeof(sockbufsize)) < 0)
1419 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1421 /* Daemonize (if log destination stdout-free) */
1423 my_log_open(ident, verbosity, log_dest);
1424 if (!(log_dest & 2)) {
1427 fprintf(stderr, "fork(): %s", strerror(errno));
1432 freopen("/dev/null", "r", stdin);
1433 freopen("/dev/null", "w", stdout);
1434 freopen("/dev/null", "w", stderr);
1441 setvbuf(stdout, (char *)0, _IONBF, 0);
1442 setvbuf(stderr, (char *)0, _IONBF, 0);
1446 sprintf(errpbuf, "[%ld]", (long) pid);
1447 strcat(ident, errpbuf);
1449 /* Initialization */
1451 hash_init(); /* Actually for crc16 only */
1452 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1453 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1456 /* Hope 12 days is enough :-/ */
1457 start_time_offset = 1 << 20;
1459 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1461 gettime(&start_time);
1464 Build static pending queue as circular buffer.
1466 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1467 pending_tail = pending_head;
1468 for (i = pending_queue_length - 1; i--;) {
1469 if (!(pending_tail->next = mem_alloc())) {
1471 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1474 pending_tail = pending_tail->next;
1476 pending_tail->next = pending_head;
1477 pending_tail = pending_head;
1479 sigemptyset(&sig_mask);
1480 sigact.sa_handler = &sighandler;
1481 sigact.sa_mask = sig_mask;
1482 sigact.sa_flags = 0;
1483 sigaddset(&sig_mask, SIGTERM);
1484 sigaction(SIGTERM, &sigact, 0);
1485 #if ((DEBUG) & DEBUG_I)
1486 sigaddset(&sig_mask, SIGUSR1);
1487 sigaction(SIGUSR1, &sigact, 0);
1489 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1490 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1494 my_log(LOG_INFO, "Starting %s...", VERSION);
1496 if (parms[cflag].count) {
1497 if (chdir(parms[cflag].arg) || chroot(".")) {
1498 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1503 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1504 pthread_attr_init(&tattr);
1505 for (i = 0; i < THREADS - 1; i++) {
1506 if (schedp.sched_priority > 0) {
1507 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1508 (pthread_attr_setschedparam(&tattr, &schedp))) {
1509 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1513 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1514 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1517 pthread_detach(thid);
1518 schedp.sched_priority++;
1522 if (setgroups(0, NULL)) {
1523 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1526 if (setregid(pw->pw_gid, pw->pw_gid)) {
1527 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1530 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1531 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1536 if (!(pidfile = fopen(pidfilepath, "w")))
1537 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1539 fprintf(pidfile, "%ld\n", (long) pid);
1543 my_log(LOG_INFO, "pid: %d", pid);
1544 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1545 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1546 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1547 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1548 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1549 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1550 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1551 for (i = 0; i < nsnmp_rules; i++) {
1552 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1553 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1555 for (i = 0; i < npeers; i++) {
1556 switch (peers[i].type) {
1564 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1565 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1566 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1569 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1571 timeout.tv_usec = 0;
1573 || (total_elements - free_elements - pending_queue_length)
1575 || pending_tail->flags) {
1578 timeout.tv_sec = scan_interval;
1579 select(0, 0, 0, 0, &timeout);
1582 if (sigs & SIGTERM_MASK && !killed) {
1583 sigs &= ~SIGTERM_MASK;
1584 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1587 active_lifetime = -1;
1588 inactive_lifetime = -1;
1590 unpending_timeout = 1;
1592 pthread_cond_signal(&scan_cond);
1593 pthread_cond_signal(&unpending_cond);
1596 #if ((DEBUG) & DEBUG_I)
1597 if (sigs & SIGUSR1_MASK) {
1598 sigs &= ~SIGUSR1_MASK;
1603 remove(pidfilepath);
1604 #if ((DEBUG) & DEBUG_I)
1607 my_log(LOG_INFO, "Done.");