2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
11 Added data collection (-f) functionality, xid support in the header and log file
17 /* stdout, stderr, freopen() */
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
34 #include <sys/statfs.h>
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
40 struct sockaddr_nl local;
41 struct sockaddr_nl peer;
42 struct nlmsghdr* last_nlhdr;
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
57 #include <sys/param.h>
82 #include <sys/select.h>
88 #include <fprobe-ulog.h>
90 #include <my_getopt.h>
123 static struct getopt_parms parms[] = {
124 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
147 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 extern int optind, opterr, optopt;
155 extern struct NetFlow NetFlow1;
156 extern struct NetFlow NetFlow5;
157 extern struct NetFlow NetFlow7;
159 #define START_VALUE -5
160 #define mark_is_tos parms[Mflag].count
161 static unsigned scan_interval = 5;
162 static int min_free = 0;
163 static int frag_lifetime = 30;
164 static int inactive_lifetime = 60;
165 static int active_lifetime = 300;
166 static int sockbufsize;
167 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
168 #if (MEM_BITS == 0) || (MEM_BITS == 16)
169 #define BULK_QUANTITY 10000
171 #define BULK_QUANTITY 200
174 static unsigned epoch_length=60, log_epochs=1;
175 static unsigned cur_epoch=0,prev_uptime=0;
177 static unsigned bulk_quantity = BULK_QUANTITY;
178 static unsigned pending_queue_length = 100;
179 static struct NetFlow *netflow = &NetFlow5;
180 static unsigned verbosity = 6;
181 static unsigned log_dest = MY_LOG_SYSLOG;
182 static struct Time start_time;
183 static long start_time_offset;
186 extern unsigned total_elements;
187 extern unsigned free_elements;
188 extern unsigned total_memory;
189 #if ((DEBUG) & DEBUG_I)
190 static unsigned emit_pkts, emit_queue;
191 static uint64_t size_total;
192 static unsigned pkts_total, pkts_total_fragmented;
193 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
194 static unsigned pkts_pending, pkts_pending_done;
195 static unsigned pending_queue_trace, pending_queue_trace_candidate;
196 static unsigned flows_total, flows_fragmented;
198 static unsigned emit_count;
199 static uint32_t emit_sequence;
200 static unsigned emit_rate_bytes, emit_rate_delay;
201 static struct Time emit_time;
202 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
203 static pthread_t thid;
204 static sigset_t sig_mask;
205 static struct sched_param schedp;
206 static int sched_min, sched_max;
207 static int npeers, npeers_rot;
208 static struct peer *peers;
211 static struct Flow *flows[1 << HASH_BITS];
212 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
214 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
215 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
217 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
218 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
219 static struct Flow *pending_head, *pending_tail;
220 static struct Flow *scan_frag_dreg;
222 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
224 static struct Flow *flows_emit;
226 static char ident[256] = "fprobe-ulog";
227 static FILE *pidfile;
228 static char *pidfilepath;
231 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
232 static struct ipulog_handle *ulog_handle;
233 static uint32_t ulog_gmask = 1;
234 static char *cap_buf;
235 static int nsnmp_rules;
236 static struct snmp_rule *snmp_rules;
237 static struct passwd *pw = 0;
242 "fprobe-ulog: a NetFlow probe. Version %s\n"
243 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
245 "-h\t\tDisplay this help\n"
246 "-U <mask>\tULOG group bitwise mask [1]\n"
247 "-s <seconds>\tHow often scan for expired flows [5]\n"
248 "-g <seconds>\tFragmented flow lifetime [30]\n"
249 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
250 "-f <filename>\tLog flow data in a file\n"
251 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
252 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
253 "-a <address>\tUse <address> as source for NetFlow flow\n"
254 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
255 "-M\t\tUse netfilter mark value as ToS flag\n"
256 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
257 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
258 "-q <flows>\tPending queue length [100]\n"
259 "-B <kilobytes>\tKernel capture buffer size [0]\n"
260 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
261 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
262 "-c <directory>\tDirectory to chroot to\n"
263 "-u <user>\tUser to run as\n"
264 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
265 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
266 "-y <remote:port>\tAddress of the NetFlow collector\n"
267 "-f <writable file>\tFile to write data into\n"
268 "-T <n>\tRotate log file every n epochs\n"
269 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
270 "-E <[1..60]>\tSize of an epoch in minutes\n"
271 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
273 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
277 #if ((DEBUG) & DEBUG_I)
280 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
281 pkts_total, pkts_total_fragmented, size_total,
282 pkts_pending - pkts_pending_done, pending_queue_trace);
283 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
284 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
285 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
286 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
287 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
288 total_elements, free_elements, total_memory);
292 void sighandler(int sig)
296 sigs |= SIGTERM_MASK;
298 #if ((DEBUG) & DEBUG_I)
300 sigs |= SIGUSR1_MASK;
306 void gettime(struct Time *now)
312 now->usec = t.tv_usec;
316 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
318 return (t1->sec - t2->sec)/60;
321 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
323 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
326 /* Uptime in miliseconds */
327 uint32_t getuptime(struct Time *t)
329 /* Maximum uptime is about 49/2 days */
330 return cmpmtime(t, &start_time);
333 /* Uptime in minutes */
334 uint32_t getuptime_minutes(struct Time *t)
336 /* Maximum uptime is about 49/2 days */
337 return cmpMtime(t, &start_time);
340 hash_t hash_flow(struct Flow *flow)
342 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
343 else return hash(flow, sizeof(struct Flow_TL));
346 uint16_t snmp_index(char *name) {
349 if (!*name) return 0;
351 for (i = 0; (int) i < nsnmp_rules; i++) {
352 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
353 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
356 if ((i = if_nametoindex(name))) return i;
361 inline void copy_flow(struct Flow *src, struct Flow *dst)
368 dst->proto = src->proto;
369 dst->tcp_flags = src->tcp_flags;
373 dst->pkts = src->pkts;
374 dst->size = src->size;
375 dst->sizeF = src->sizeF;
376 dst->sizeP = src->sizeP;
377 dst->ctime = src->ctime;
378 dst->mtime = src->mtime;
379 dst->flags = src->flags;
382 void get_cur_epoch() {
384 fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
388 len = read(fd, snum, sizeof(snum)-1);
391 sscanf(snum,"%d",&cur_epoch);
392 cur_epoch++; /* Let's not stone the last epoch */
400 void update_cur_epoch_file(int n) {
403 len=snprintf(snum,6,"%d",n);
404 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
406 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
409 write(fd, snum, len);
413 unsigned get_log_fd(char *fname, int cur_fd) {
416 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
417 * doesn't solve the problem */
419 struct statfs statfs;
422 cur_uptime = getuptime_minutes(&now);
425 if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
426 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
429 if (min_free && (statfs.f_bfree < min_free))
431 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
432 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
435 my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
440 /* Epoch length in minutes */
441 if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
442 char nextname[MAX_PATH_LEN];
444 prev_uptime = cur_uptime;
445 cur_epoch = (cur_epoch + 1) % log_epochs;
448 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
449 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
450 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
453 update_cur_epoch_file(cur_epoch);
462 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
464 struct Flow **flowpp;
470 if (prev) flowpp = *prev;
473 if (where->sip.s_addr == what->sip.s_addr
474 && where->dip.s_addr == what->dip.s_addr
475 && where->proto == what->proto) {
476 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
478 /* Both unfragmented */
479 if ((what->sp == where->sp)
480 && (what->dp == where->dp)) goto done;
483 /* Both fragmented */
484 if (where->id == what->id) goto done;
488 flowpp = &where->next;
492 if (prev) *prev = flowpp;
496 int put_into(struct Flow *flow, int flag
497 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
504 struct Flow *flown, **flowpp;
505 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
510 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
511 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
514 pthread_mutex_lock(&flows_mutex[h]);
516 if (!(flown = find(flows[h], flow, &flowpp))) {
517 /* No suitable flow found - add */
518 if (flag == COPY_INTO) {
519 if ((flown = mem_alloc())) {
520 copy_flow(flow, flown);
523 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
524 my_log(LOG_ERR, "%s %s. %s",
525 "mem_alloc():", strerror(errno), "packet lost");
530 flow->next = flows[h];
532 #if ((DEBUG) & DEBUG_I)
534 if (flow->flags & FLOW_FRAG) flows_fragmented++;
536 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
538 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
543 /* Found suitable flow - update */
544 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
545 sprintf(buf, " +> %x", (unsigned) flown);
548 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
549 flown->mtime = flow->mtime;
550 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
551 flown->ctime = flow->ctime;
552 flown->tcp_flags |= flow->tcp_flags;
553 flown->size += flow->size;
554 flown->pkts += flow->pkts;
555 if (flow->flags & FLOW_FRAG) {
556 /* Fragmented flow require some additional work */
557 if (flow->flags & FLOW_TL) {
560 Several packets with FLOW_TL (attack)
562 flown->sp = flow->sp;
563 flown->dp = flow->dp;
565 if (flow->flags & FLOW_LASTFRAG) {
568 Several packets with FLOW_LASTFRAG (attack)
570 flown->sizeP = flow->sizeP;
572 flown->flags |= flow->flags;
573 flown->sizeF += flow->sizeF;
574 if ((flown->flags & FLOW_LASTFRAG)
575 && (flown->sizeF >= flown->sizeP)) {
576 /* All fragments received - flow reassembled */
577 *flowpp = flown->next;
578 pthread_mutex_unlock(&flows_mutex[h]);
579 #if ((DEBUG) & DEBUG_I)
584 flown->flags &= ~FLOW_FRAG;
585 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
588 ret = put_into(flown, MOVE_INTO
589 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
595 if (flag == MOVE_INTO) mem_free(flow);
597 pthread_mutex_unlock(&flows_mutex[h]);
601 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
605 for (i = 0; i < fields; i++) {
606 #if ((DEBUG) & DEBUG_F)
607 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
610 case NETFLOW_IPV4_SRC_ADDR:
611 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
612 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
615 case NETFLOW_IPV4_DST_ADDR:
616 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
617 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
618 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
620 p += NETFLOW_IPV4_DST_ADDR_SIZE;
623 case NETFLOW_INPUT_SNMP:
624 *((uint16_t *) p) = htons(flow->iif);
625 p += NETFLOW_INPUT_SNMP_SIZE;
628 case NETFLOW_OUTPUT_SNMP:
629 *((uint16_t *) p) = htons(flow->oif);
630 p += NETFLOW_OUTPUT_SNMP_SIZE;
633 case NETFLOW_PKTS_32:
634 *((uint32_t *) p) = htonl(flow->pkts);
635 p += NETFLOW_PKTS_32_SIZE;
638 case NETFLOW_BYTES_32:
639 *((uint32_t *) p) = htonl(flow->size);
640 p += NETFLOW_BYTES_32_SIZE;
643 case NETFLOW_FIRST_SWITCHED:
644 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
645 p += NETFLOW_FIRST_SWITCHED_SIZE;
648 case NETFLOW_LAST_SWITCHED:
649 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
650 p += NETFLOW_LAST_SWITCHED_SIZE;
653 case NETFLOW_L4_SRC_PORT:
654 *((uint16_t *) p) = flow->sp;
655 p += NETFLOW_L4_SRC_PORT_SIZE;
658 case NETFLOW_L4_DST_PORT:
659 *((uint16_t *) p) = flow->dp;
660 p += NETFLOW_L4_DST_PORT_SIZE;
664 *((uint8_t *) p) = flow->proto;
665 p += NETFLOW_PROT_SIZE;
668 case NETFLOW_SRC_TOS:
669 *((uint8_t *) p) = flow->tos;
670 p += NETFLOW_SRC_TOS_SIZE;
673 case NETFLOW_TCP_FLAGS:
674 *((uint8_t *) p) = flow->tcp_flags;
675 p += NETFLOW_TCP_FLAGS_SIZE;
678 case NETFLOW_VERSION:
679 *((uint16_t *) p) = htons(netflow->Version);
680 p += NETFLOW_VERSION_SIZE;
684 *((uint16_t *) p) = htons(emit_count);
685 p += NETFLOW_COUNT_SIZE;
689 *((uint32_t *) p) = htonl(getuptime(&emit_time));
690 p += NETFLOW_UPTIME_SIZE;
693 case NETFLOW_UNIX_SECS:
694 *((uint32_t *) p) = htonl(emit_time.sec);
695 p += NETFLOW_UNIX_SECS_SIZE;
698 case NETFLOW_UNIX_NSECS:
699 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
700 p += NETFLOW_UNIX_NSECS_SIZE;
703 case NETFLOW_FLOW_SEQUENCE:
704 //*((uint32_t *) p) = htonl(emit_sequence);
705 *((uint32_t *) p) = 0;
706 p += NETFLOW_FLOW_SEQUENCE_SIZE;
710 /* Unsupported (uint8_t) */
711 case NETFLOW_ENGINE_TYPE:
712 case NETFLOW_ENGINE_ID:
713 case NETFLOW_FLAGS7_1:
714 case NETFLOW_SRC_MASK:
715 case NETFLOW_DST_MASK:
716 *((uint8_t *) p) = 0;
717 p += NETFLOW_PAD8_SIZE;
720 *((uint16_t *) p) = flow->tos;
721 p += NETFLOW_XID_SIZE;
724 /* Unsupported (uint16_t) */
727 case NETFLOW_FLAGS7_2:
728 *((uint16_t *) p) = 0;
729 p += NETFLOW_PAD16_SIZE;
733 /* Unsupported (uint32_t) */
734 case NETFLOW_IPV4_NEXT_HOP:
735 case NETFLOW_ROUTER_SC:
736 *((uint32_t *) p) = 0;
737 p += NETFLOW_PAD32_SIZE;
741 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
742 format, i, format[i]);
746 #if ((DEBUG) & DEBUG_F)
747 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
754 Workaround for clone()-based threads
755 Try to change EUID independently of main thread
759 setregid(pw->pw_gid, pw->pw_gid);
760 setreuid(pw->pw_uid, pw->pw_uid);
769 struct timespec timeout;
770 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
772 p = (void *) &emit_packet + netflow->HeaderSize;
778 pthread_mutex_lock(&emit_mutex);
779 while (!flows_emit) {
780 gettimeofday(&now, 0);
781 timeout.tv_sec = now.tv_sec + emit_timeout;
782 /* Do not wait until emit_packet will filled - it may be too long */
783 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
784 pthread_mutex_unlock(&emit_mutex);
789 flows_emit = flows_emit->next;
790 #if ((DEBUG) & DEBUG_I)
793 pthread_mutex_unlock(&emit_mutex);
797 gettime(&start_time);
798 start_time.sec -= start_time_offset;
801 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
805 printf("Emit count = %d\n", emit_count);
808 if (emit_count == netflow->MaxFlows) {
811 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
812 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
813 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
814 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
816 for (i = 0; i < npeers; i++) {
817 if (peers[i].type == PEER_FILE) {
818 if (netflow->SeqOffset)
819 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
820 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
821 ret = write(peers[i].write_fd, emit_packet, size);
824 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
825 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
826 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
830 #if ((DEBUG) & DEBUG_E)
832 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
833 emit_count, i + 1, peers[i].seq);
836 peers[i].seq += emit_count;
839 if (emit_rate_bytes) {
841 delay = sent / emit_rate_bytes;
843 sent %= emit_rate_bytes;
845 timeout.tv_nsec = emit_rate_delay * delay;
846 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
851 if (peers[i].type == PEER_MIRROR) goto sendreal;
853 if (peers[i].type == PEER_ROTATE)
854 if (peer_rot_cur++ == peer_rot_work) {
856 if (netflow->SeqOffset)
857 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
858 ret = send(peers[i].write_fd, emit_packet, size, 0);
860 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
861 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
862 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
865 #if ((DEBUG) & DEBUG_E)
867 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
868 emit_count, i + 1, peers[i].seq);
871 peers[i].seq += emit_count;
874 if (emit_rate_bytes) {
876 delay = sent / emit_rate_bytes;
878 sent %= emit_rate_bytes;
880 timeout.tv_nsec = emit_rate_delay * delay;
881 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
886 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
887 emit_sequence += emit_count;
889 #if ((DEBUG) & DEBUG_I)
896 void *unpending_thread()
899 struct timespec timeout;
900 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
907 pthread_mutex_lock(&unpending_mutex);
910 while (!(pending_tail->flags & FLOW_PENDING)) {
911 gettimeofday(&now, 0);
912 timeout.tv_sec = now.tv_sec + unpending_timeout;
913 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
916 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
919 if (put_into(pending_tail, COPY_INTO
920 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
924 #if ((DEBUG) & DEBUG_I)
925 pkts_lost_unpending++;
929 #if ((DEBUG) & DEBUG_U)
930 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
933 pending_tail->flags = 0;
934 pending_tail = pending_tail->next;
935 #if ((DEBUG) & DEBUG_I)
943 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
947 struct Flow *flow, **flowpp;
949 struct timespec timeout;
954 pthread_mutex_lock(&scan_mutex);
958 timeout.tv_sec = now.sec + scan_interval;
959 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
962 #if ((DEBUG) & DEBUG_S)
963 my_log(LOG_DEBUG, "S: %d", now.sec);
965 for (i = 0; i < 1 << HASH_BITS ; i++) {
966 pthread_mutex_lock(&flows_mutex[i]);
970 if (flow->flags & FLOW_FRAG) {
971 /* Process fragmented flow */
972 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
973 /* Fragmented flow expired - put it into special chain */
974 #if ((DEBUG) & DEBUG_I)
978 *flowpp = flow->next;
980 flow->flags &= ~FLOW_FRAG;
981 flow->next = scan_frag_dreg;
982 scan_frag_dreg = flow;
987 /* Flow is not frgamented */
988 if ((now.sec - flow->mtime.sec) > inactive_lifetime
989 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
991 #if ((DEBUG) & DEBUG_S)
992 my_log(LOG_DEBUG, "S: E %x", flow);
994 #if ((DEBUG) & DEBUG_I)
997 *flowpp = flow->next;
998 pthread_mutex_lock(&emit_mutex);
999 flow->next = flows_emit;
1001 #if ((DEBUG) & DEBUG_I)
1004 pthread_mutex_unlock(&emit_mutex);
1009 flowpp = &flow->next;
1012 pthread_mutex_unlock(&flows_mutex[i]);
1014 if (flows_emit) pthread_cond_signal(&emit_cond);
1016 while (scan_frag_dreg) {
1017 flow = scan_frag_dreg;
1018 scan_frag_dreg = flow->next;
1019 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1022 put_into(flow, MOVE_INTO
1023 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1027 #if ((DEBUG) & DEBUG_S)
1028 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1036 struct ulog_packet_msg *ulog_msg;
1040 int len, off_frag, psize;
1041 #if ((DEBUG) & DEBUG_C)
1049 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1051 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1054 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1056 #if ((DEBUG) & DEBUG_C)
1057 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1060 nl = (void *) &ulog_msg->payload;
1061 psize = ulog_msg->data_len;
1064 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1065 #if ((DEBUG) & DEBUG_C)
1066 strcat(logbuf, " U");
1067 my_log(LOG_DEBUG, "%s", logbuf);
1069 #if ((DEBUG) & DEBUG_I)
1075 if (pending_head->flags) {
1076 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1078 # if ((DEBUG) & DEBUG_C)
1083 "pending queue full:", "packet lost");
1085 #if ((DEBUG) & DEBUG_I)
1086 pkts_lost_capture++;
1091 #if ((DEBUG) & DEBUG_I)
1095 flow = pending_head;
1097 /* ?FIXME? Add sanity check for ip_len? */
1098 flow->size = ntohs(nl->ip_len);
1099 #if ((DEBUG) & DEBUG_I)
1100 size_total += flow->size;
1103 flow->sip = nl->ip_src;
1104 flow->dip = nl->ip_dst;
1105 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1106 my_log(LOG_INFO, "Received test flow to corewars.org");
1108 flow->iif = snmp_index(ulog_msg->indev_name);
1109 flow->oif = snmp_index(ulog_msg->outdev_name);
1110 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1111 flow->proto = nl->ip_p;
1113 flow->tcp_flags = 0;
1117 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1118 if (ulog_msg->timestamp_sec) {
1119 flow->ctime.sec = ulog_msg->timestamp_sec;
1120 flow->ctime.usec = ulog_msg->timestamp_usec;
1121 } else gettime(&flow->ctime);
1122 flow->mtime = flow->ctime;
1124 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1127 Offset (from network layer) to transport layer header/IP data
1128 IOW IP header size ;-)
1131 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1133 off_tl = nl->ip_hl << 2;
1134 tl = (void *) nl + off_tl;
1136 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1137 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1139 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1140 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1142 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1143 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1144 #if ((DEBUG) & DEBUG_C)
1145 strcat(logbuf, " F");
1147 #if ((DEBUG) & DEBUG_I)
1148 pkts_total_fragmented++;
1150 flow->flags |= FLOW_FRAG;
1151 flow->id = nl->ip_id;
1153 if (!(ntohs(nl->ip_off) & IP_MF)) {
1154 /* Packet whith IP_MF contains information about whole datagram size */
1155 flow->flags |= FLOW_LASTFRAG;
1156 /* size = frag_offset*8 + data_size */
1157 flow->sizeP = off_frag + flow->sizeF;
1161 #if ((DEBUG) & DEBUG_C)
1162 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1163 strcat(logbuf, buf);
1164 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1165 strcat(logbuf, buf);
1169 Fortunately most interesting transport layer information fit
1170 into first 8 bytes of IP data field (minimal nonzero size).
1171 Thus we don't need actual packet reassembling to build whole
1172 transport layer data. We only check the fragment offset for
1173 zero value to find packet with this information.
1175 if (!off_frag && psize >= 8) {
1176 switch (flow->proto) {
1179 flow->sp = ((struct udphdr *)tl)->uh_sport;
1180 flow->dp = ((struct udphdr *)tl)->uh_dport;
1185 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1186 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1189 #ifdef ICMP_TRICK_CISCO
1191 flow->dp = *((int32_t *) tl);
1196 /* Unknown transport layer */
1197 #if ((DEBUG) & DEBUG_C)
1198 strcat(logbuf, " U");
1205 #if ((DEBUG) & DEBUG_C)
1206 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1207 strcat(logbuf, buf);
1209 flow->flags |= FLOW_TL;
1213 /* Check for tcp flags presence (including CWR and ECE). */
1214 if (flow->proto == IPPROTO_TCP
1216 && psize >= 16 - off_frag) {
1217 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1218 #if ((DEBUG) & DEBUG_C)
1219 sprintf(buf, " TCP:%x", flow->tcp_flags);
1220 strcat(logbuf, buf);
1224 #if ((DEBUG) & DEBUG_C)
1225 sprintf(buf, " => %x", (unsigned) flow);
1226 strcat(logbuf, buf);
1227 my_log(LOG_DEBUG, "%s", logbuf);
1230 #if ((DEBUG) & DEBUG_I)
1232 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1233 if (pending_queue_trace < pending_queue_trace_candidate)
1234 pending_queue_trace = pending_queue_trace_candidate;
1237 /* Flow complete - inform unpending_thread() about it */
1238 pending_head->flags |= FLOW_PENDING;
1239 pending_head = pending_head->next;
1241 pthread_cond_signal(&unpending_cond);
1247 int main(int argc, char **argv)
1250 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1251 int c, i, write_fd, memory_limit = 0;
1252 struct addrinfo hints, *res;
1253 struct sockaddr_in saddr;
1254 pthread_attr_t tattr;
1255 struct sigaction sigact;
1256 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1257 struct timeval timeout;
1259 sched_min = sched_get_priority_min(SCHED);
1260 sched_max = sched_get_priority_max(SCHED);
1262 memset(&saddr, 0 , sizeof(saddr));
1263 memset(&hints, 0 , sizeof(hints));
1264 hints.ai_flags = AI_PASSIVE;
1265 hints.ai_family = AF_INET;
1266 hints.ai_socktype = SOCK_DGRAM;
1268 /* Process command line options */
1271 while ((c = my_getopt(argc, argv, parms)) != -1) {
1281 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1282 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1283 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1284 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1285 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1286 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1287 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1288 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1289 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1290 if (parms[nflag].count) {
1291 switch (atoi(parms[nflag].arg)) {
1293 netflow = &NetFlow1;
1300 netflow = &NetFlow7;
1304 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1308 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1309 if (parms[lflag].count) {
1310 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1313 sprintf(errpbuf, "[%s]", log_suffix);
1314 strcat(ident, errpbuf);
1317 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1318 if (log_suffix) *--log_suffix = ':';
1320 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1322 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1325 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1326 if (parms[qflag].count) {
1327 pending_queue_length = atoi(parms[qflag].arg);
1328 if (pending_queue_length < 1) {
1329 fprintf(stderr, "Illegal %s\n", "pending queue length");
1333 if (parms[rflag].count) {
1334 schedp.sched_priority = atoi(parms[rflag].arg);
1335 if (schedp.sched_priority
1336 && (schedp.sched_priority < sched_min
1337 || schedp.sched_priority > sched_max)) {
1338 fprintf(stderr, "Illegal %s\n", "realtime priority");
1342 if (parms[Bflag].count) {
1343 sockbufsize = atoi(parms[Bflag].arg) << 10;
1345 if (parms[bflag].count) {
1346 bulk_quantity = atoi(parms[bflag].arg);
1347 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1348 fprintf(stderr, "Illegal %s\n", "bulk size");
1352 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1353 if (parms[Xflag].count) {
1354 for(i = 0; parms[Xflag].arg[i]; i++)
1355 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1356 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1358 rule = strtok(parms[Xflag].arg, ":");
1359 for (i = 0; rule; i++) {
1360 snmp_rules[i].len = strlen(rule);
1361 if (snmp_rules[i].len > IFNAMSIZ) {
1362 fprintf(stderr, "Illegal %s\n", "interface basename");
1365 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1366 if (!*(rule - 1)) *(rule - 1) = ',';
1367 rule = strtok(NULL, ",");
1369 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1372 snmp_rules[i].base = atoi(rule);
1374 rule = strtok(NULL, ":");
1378 if (parms[tflag].count)
1379 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1380 if (parms[aflag].count) {
1381 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1383 fprintf(stderr, "Illegal %s\n", "source address");
1386 saddr = *((struct sockaddr_in *) res->ai_addr);
1390 if (parms[uflag].count)
1391 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1392 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1397 /* Process collectors parameters. Brrrr... :-[ */
1399 npeers = argc - optind;
1401 /* Send to remote Netflow collector */
1402 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1403 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1405 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1407 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1408 fprintf(stderr, "socket(): %s\n", strerror(errno));
1411 peers[npeers].write_fd = write_fd;
1412 peers[npeers].type = PEER_MIRROR;
1413 peers[npeers].laddr = saddr;
1414 peers[npeers].seq = 0;
1415 if ((lhost = strchr(dport, '/'))) {
1417 if ((type = strchr(lhost, '/'))) {
1425 peers[npeers].type = PEER_ROTATE;
1434 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1435 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1439 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1440 sizeof(struct sockaddr_in))) {
1441 fprintf(stderr, "bind(): %s\n", strerror(errno));
1444 if (getaddrinfo(dhost, dport, &hints, &res)) {
1446 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1449 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1451 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1452 sizeof(struct sockaddr_in))) {
1453 fprintf(stderr, "connect(): %s\n", strerror(errno));
1457 /* Restore command line */
1458 if (type) *--type = '/';
1459 if (lhost) *--lhost = '/';
1463 else if (parms[fflag].count) {
1465 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1466 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1467 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1469 peers[npeers].write_fd = START_VALUE;
1470 peers[npeers].type = PEER_FILE;
1471 peers[npeers].seq = 0;
1480 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1481 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1483 fprintf(stderr, "libipulog initialization error: %s",
1484 ipulog_strerror(ipulog_errno));
1488 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1489 &sockbufsize, sizeof(sockbufsize)) < 0)
1490 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1492 /* Daemonize (if log destination stdout-free) */
1494 my_log_open(ident, verbosity, log_dest);
1495 if (!(log_dest & 2)) {
1496 /* Crash-proofing - Sapan*/
1500 fprintf(stderr, "fork(): %s", strerror(errno));
1505 freopen("/dev/null", "r", stdin);
1506 freopen("/dev/null", "w", stdout);
1507 freopen("/dev/null", "w", stderr);
1511 while (wait3(NULL,0,NULL) < 1);
1515 setvbuf(stdout, (char *)0, _IONBF, 0);
1516 setvbuf(stderr, (char *)0, _IONBF, 0);
1520 sprintf(errpbuf, "[%ld]", (long) pid);
1521 strcat(ident, errpbuf);
1523 /* Initialization */
1525 hash_init(); /* Actually for crc16 only */
1526 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1527 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1530 /* Hope 12 days is enough :-/ */
1531 start_time_offset = 1 << 20;
1533 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1535 gettime(&start_time);
1538 Build static pending queue as circular buffer.
1540 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1541 pending_tail = pending_head;
1542 for (i = pending_queue_length - 1; i--;) {
1543 if (!(pending_tail->next = mem_alloc())) {
1545 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1548 pending_tail = pending_tail->next;
1550 pending_tail->next = pending_head;
1551 pending_tail = pending_head;
1553 sigemptyset(&sig_mask);
1554 sigact.sa_handler = &sighandler;
1555 sigact.sa_mask = sig_mask;
1556 sigact.sa_flags = 0;
1557 sigaddset(&sig_mask, SIGTERM);
1558 sigaction(SIGTERM, &sigact, 0);
1559 #if ((DEBUG) & DEBUG_I)
1560 sigaddset(&sig_mask, SIGUSR1);
1561 sigaction(SIGUSR1, &sigact, 0);
1563 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1564 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1568 my_log(LOG_INFO, "Starting %s...", VERSION);
1570 if (parms[cflag].count) {
1571 if (chdir(parms[cflag].arg) || chroot(".")) {
1572 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1577 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1578 pthread_attr_init(&tattr);
1579 for (i = 0; i < THREADS - 1; i++) {
1580 if (schedp.sched_priority > 0) {
1581 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1582 (pthread_attr_setschedparam(&tattr, &schedp))) {
1583 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1587 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1588 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1591 pthread_detach(thid);
1592 schedp.sched_priority++;
1596 if (setgroups(0, NULL)) {
1597 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1600 if (setregid(pw->pw_gid, pw->pw_gid)) {
1601 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1604 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1605 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1610 if (!(pidfile = fopen(pidfilepath, "w")))
1611 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1613 fprintf(pidfile, "%ld\n", (long) pid);
1617 my_log(LOG_INFO, "pid: %d", pid);
1618 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1619 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1620 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1621 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1622 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1623 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1624 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1625 for (i = 0; i < nsnmp_rules; i++) {
1626 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1627 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1629 for (i = 0; i < npeers; i++) {
1630 switch (peers[i].type) {
1638 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1639 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1640 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1643 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1645 timeout.tv_usec = 0;
1647 || (total_elements - free_elements - pending_queue_length)
1649 || pending_tail->flags) {
1652 timeout.tv_sec = scan_interval;
1653 select(0, 0, 0, 0, &timeout);
1656 if (sigs & SIGTERM_MASK && !killed) {
1657 sigs &= ~SIGTERM_MASK;
1658 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1661 active_lifetime = -1;
1662 inactive_lifetime = -1;
1664 unpending_timeout = 1;
1666 pthread_cond_signal(&scan_cond);
1667 pthread_cond_signal(&unpending_cond);
1670 #if ((DEBUG) & DEBUG_I)
1671 if (sigs & SIGUSR1_MASK) {
1672 sigs &= ~SIGUSR1_MASK;
1677 remove(pidfilepath);
1678 #if ((DEBUG) & DEBUG_I)
1681 my_log(LOG_INFO, "Done.");