2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
11 Added data collection (-f) functionality, xid support in the header and log file
17 /* stdout, stderr, freopen() */
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
34 #include <sys/statfs.h>
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
40 struct sockaddr_nl local;
41 struct sockaddr_nl peer;
42 struct nlmsghdr* last_nlhdr;
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
57 #include <sys/param.h>
82 #include <sys/select.h>
88 #include <fprobe-ulog.h>
90 #include <my_getopt.h>
123 static struct getopt_parms parms[] = {
124 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
133 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
136 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
147 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
152 extern int optind, opterr, optopt;
155 extern struct NetFlow NetFlow1;
156 extern struct NetFlow NetFlow5;
157 extern struct NetFlow NetFlow7;
159 #define START_VALUE -5
160 #define mark_is_tos parms[Mflag].count
161 static unsigned scan_interval = 5;
162 static int min_free = 0;
163 static int frag_lifetime = 30;
164 static int inactive_lifetime = 60;
165 static int active_lifetime = 300;
166 static int sockbufsize;
167 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
168 #if (MEM_BITS == 0) || (MEM_BITS == 16)
169 #define BULK_QUANTITY 10000
171 #define BULK_QUANTITY 200
174 static unsigned epoch_length=60, log_epochs=1;
175 static unsigned cur_epoch=0,prev_uptime=0;
177 static unsigned bulk_quantity = BULK_QUANTITY;
178 static unsigned pending_queue_length = 100;
179 static struct NetFlow *netflow = &NetFlow5;
180 static unsigned verbosity = 6;
181 static unsigned log_dest = MY_LOG_SYSLOG;
182 static struct Time start_time;
183 static long start_time_offset;
186 extern unsigned total_elements;
187 extern unsigned free_elements;
188 extern unsigned total_memory;
189 #if ((DEBUG) & DEBUG_I)
190 static unsigned emit_pkts, emit_queue;
191 static uint64_t size_total;
192 static unsigned pkts_total, pkts_total_fragmented;
193 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
194 static unsigned pkts_pending, pkts_pending_done;
195 static unsigned pending_queue_trace, pending_queue_trace_candidate;
196 static unsigned flows_total, flows_fragmented;
198 static unsigned emit_count;
199 static uint32_t emit_sequence;
200 static unsigned emit_rate_bytes, emit_rate_delay;
201 static struct Time emit_time;
202 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
203 static pthread_t thid;
204 static sigset_t sig_mask;
205 static struct sched_param schedp;
206 static int sched_min, sched_max;
207 static int npeers, npeers_rot;
208 static struct peer *peers;
211 static struct Flow *flows[1 << HASH_BITS];
212 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
214 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
215 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
217 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
218 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
219 static struct Flow *pending_head, *pending_tail;
220 static struct Flow *scan_frag_dreg;
222 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
223 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
224 static struct Flow *flows_emit;
226 static char ident[256] = "fprobe-ulog";
227 static FILE *pidfile;
228 static char *pidfilepath;
231 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
232 static struct ipulog_handle *ulog_handle;
233 static uint32_t ulog_gmask = 1;
234 static char *cap_buf;
235 static int nsnmp_rules;
236 static struct snmp_rule *snmp_rules;
237 static struct passwd *pw = 0;
242 "fprobe-ulog: a NetFlow probe. Version %s\n"
243 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
245 "-h\t\tDisplay this help\n"
246 "-U <mask>\tULOG group bitwise mask [1]\n"
247 "-s <seconds>\tHow often scan for expired flows [5]\n"
248 "-g <seconds>\tFragmented flow lifetime [30]\n"
249 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
250 "-f <filename>\tLog flow data in a file\n"
251 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
252 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
253 "-a <address>\tUse <address> as source for NetFlow flow\n"
254 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
255 "-M\t\tUse netfilter mark value as ToS flag\n"
256 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
257 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
258 "-q <flows>\tPending queue length [100]\n"
259 "-B <kilobytes>\tKernel capture buffer size [0]\n"
260 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
261 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
262 "-c <directory>\tDirectory to chroot to\n"
263 "-u <user>\tUser to run as\n"
264 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
265 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
266 "-y <remote:port>\tAddress of the NetFlow collector\n"
267 "-f <writable file>\tFile to write data into\n"
268 "-T <n>\tRotate log file every n epochs\n"
269 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
270 "-E <[1..60]>\tSize of an epoch in minutes\n"
271 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
273 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
277 #if ((DEBUG) & DEBUG_I)
280 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
281 pkts_total, pkts_total_fragmented, size_total,
282 pkts_pending - pkts_pending_done, pending_queue_trace);
283 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
284 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
285 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
286 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
287 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
288 total_elements, free_elements, total_memory);
292 void sighandler(int sig)
296 sigs |= SIGTERM_MASK;
298 #if ((DEBUG) & DEBUG_I)
300 sigs |= SIGUSR1_MASK;
306 void gettime(struct Time *now)
312 now->usec = t.tv_usec;
316 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
318 return (t1->sec - t2->sec)/60;
321 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
323 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
326 /* Uptime in miliseconds */
327 uint32_t getuptime(struct Time *t)
329 /* Maximum uptime is about 49/2 days */
330 return cmpmtime(t, &start_time);
333 /* Uptime in minutes */
334 uint32_t getuptime_minutes(struct Time *t)
336 /* Maximum uptime is about 49/2 days */
337 return cmpMtime(t, &start_time);
340 hash_t hash_flow(struct Flow *flow)
342 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
343 else return hash(flow, sizeof(struct Flow_TL));
346 uint16_t snmp_index(char *name) {
349 if (!*name) return 0;
351 for (i = 0; (int) i < nsnmp_rules; i++) {
352 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
353 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
356 if ((i = if_nametoindex(name))) return i;
361 inline void copy_flow(struct Flow *src, struct Flow *dst)
368 dst->proto = src->proto;
369 dst->tcp_flags = src->tcp_flags;
373 dst->pkts = src->pkts;
374 dst->size = src->size;
375 dst->sizeF = src->sizeF;
376 dst->sizeP = src->sizeP;
377 dst->ctime = src->ctime;
378 dst->mtime = src->mtime;
379 dst->flags = src->flags;
382 void get_cur_epoch() {
384 fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
388 len = read(fd, snum, sizeof(snum)-1);
391 sscanf(snum,"%d",&cur_epoch);
399 void update_cur_epoch_file(int n) {
402 len=snprintf(snum,6,"%d",n);
403 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
405 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
408 write(fd, snum, len);
412 unsigned get_log_fd(char *fname, unsigned cur_fd) {
415 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
416 * doesn't solve the problem */
418 struct statfs statfs;
421 cur_uptime = getuptime_minutes(&now);
423 if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
424 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
427 if (min_free && statfs.f_bfree < min_free)
429 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
430 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
433 my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
438 /* Epoch length in minutes */
439 if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
440 char nextname[MAX_PATH_LEN];
442 prev_uptime = cur_uptime;
443 cur_epoch = (cur_epoch + 1) % log_epochs;
445 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
446 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
447 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
450 update_cur_epoch_file(cur_epoch);
458 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
460 struct Flow **flowpp;
466 if (prev) flowpp = *prev;
469 if (where->sip.s_addr == what->sip.s_addr
470 && where->dip.s_addr == what->dip.s_addr
471 && where->proto == what->proto) {
472 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
474 /* Both unfragmented */
475 if ((what->sp == where->sp)
476 && (what->dp == where->dp)) goto done;
479 /* Both fragmented */
480 if (where->id == what->id) goto done;
484 flowpp = &where->next;
488 if (prev) *prev = flowpp;
492 int put_into(struct Flow *flow, int flag
493 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
500 struct Flow *flown, **flowpp;
501 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
506 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
507 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
510 pthread_mutex_lock(&flows_mutex[h]);
512 if (!(flown = find(flows[h], flow, &flowpp))) {
513 /* No suitable flow found - add */
514 if (flag == COPY_INTO) {
515 if ((flown = mem_alloc())) {
516 copy_flow(flow, flown);
519 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
520 my_log(LOG_ERR, "%s %s. %s",
521 "mem_alloc():", strerror(errno), "packet lost");
526 flow->next = flows[h];
528 #if ((DEBUG) & DEBUG_I)
530 if (flow->flags & FLOW_FRAG) flows_fragmented++;
532 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
534 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
539 /* Found suitable flow - update */
540 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
541 sprintf(buf, " +> %x", (unsigned) flown);
544 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
545 flown->mtime = flow->mtime;
546 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
547 flown->ctime = flow->ctime;
548 flown->tcp_flags |= flow->tcp_flags;
549 flown->size += flow->size;
550 flown->pkts += flow->pkts;
551 if (flow->flags & FLOW_FRAG) {
552 /* Fragmented flow require some additional work */
553 if (flow->flags & FLOW_TL) {
556 Several packets with FLOW_TL (attack)
558 flown->sp = flow->sp;
559 flown->dp = flow->dp;
561 if (flow->flags & FLOW_LASTFRAG) {
564 Several packets with FLOW_LASTFRAG (attack)
566 flown->sizeP = flow->sizeP;
568 flown->flags |= flow->flags;
569 flown->sizeF += flow->sizeF;
570 if ((flown->flags & FLOW_LASTFRAG)
571 && (flown->sizeF >= flown->sizeP)) {
572 /* All fragments received - flow reassembled */
573 *flowpp = flown->next;
574 pthread_mutex_unlock(&flows_mutex[h]);
575 #if ((DEBUG) & DEBUG_I)
580 flown->flags &= ~FLOW_FRAG;
581 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
584 ret = put_into(flown, MOVE_INTO
585 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
591 if (flag == MOVE_INTO) mem_free(flow);
593 pthread_mutex_unlock(&flows_mutex[h]);
597 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
601 for (i = 0; i < fields; i++) {
602 #if ((DEBUG) & DEBUG_F)
603 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
606 case NETFLOW_IPV4_SRC_ADDR:
607 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
608 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
611 case NETFLOW_IPV4_DST_ADDR:
612 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
613 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
614 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
616 p += NETFLOW_IPV4_DST_ADDR_SIZE;
619 case NETFLOW_INPUT_SNMP:
620 *((uint16_t *) p) = htons(flow->iif);
621 p += NETFLOW_INPUT_SNMP_SIZE;
624 case NETFLOW_OUTPUT_SNMP:
625 *((uint16_t *) p) = htons(flow->oif);
626 p += NETFLOW_OUTPUT_SNMP_SIZE;
629 case NETFLOW_PKTS_32:
630 *((uint32_t *) p) = htonl(flow->pkts);
631 p += NETFLOW_PKTS_32_SIZE;
634 case NETFLOW_BYTES_32:
635 *((uint32_t *) p) = htonl(flow->size);
636 p += NETFLOW_BYTES_32_SIZE;
639 case NETFLOW_FIRST_SWITCHED:
640 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
641 p += NETFLOW_FIRST_SWITCHED_SIZE;
644 case NETFLOW_LAST_SWITCHED:
645 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
646 p += NETFLOW_LAST_SWITCHED_SIZE;
649 case NETFLOW_L4_SRC_PORT:
650 *((uint16_t *) p) = flow->sp;
651 p += NETFLOW_L4_SRC_PORT_SIZE;
654 case NETFLOW_L4_DST_PORT:
655 *((uint16_t *) p) = flow->dp;
656 p += NETFLOW_L4_DST_PORT_SIZE;
660 *((uint8_t *) p) = flow->proto;
661 p += NETFLOW_PROT_SIZE;
664 case NETFLOW_SRC_TOS:
665 *((uint8_t *) p) = flow->tos;
666 p += NETFLOW_SRC_TOS_SIZE;
669 case NETFLOW_TCP_FLAGS:
670 *((uint8_t *) p) = flow->tcp_flags;
671 p += NETFLOW_TCP_FLAGS_SIZE;
674 case NETFLOW_VERSION:
675 *((uint16_t *) p) = htons(netflow->Version);
676 p += NETFLOW_VERSION_SIZE;
680 *((uint16_t *) p) = htons(emit_count);
681 p += NETFLOW_COUNT_SIZE;
685 *((uint32_t *) p) = htonl(getuptime(&emit_time));
686 p += NETFLOW_UPTIME_SIZE;
689 case NETFLOW_UNIX_SECS:
690 *((uint32_t *) p) = htonl(emit_time.sec);
691 p += NETFLOW_UNIX_SECS_SIZE;
694 case NETFLOW_UNIX_NSECS:
695 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
696 p += NETFLOW_UNIX_NSECS_SIZE;
699 case NETFLOW_FLOW_SEQUENCE:
700 //*((uint32_t *) p) = htonl(emit_sequence);
701 *((uint32_t *) p) = 0;
702 p += NETFLOW_FLOW_SEQUENCE_SIZE;
706 /* Unsupported (uint8_t) */
707 case NETFLOW_ENGINE_TYPE:
708 case NETFLOW_ENGINE_ID:
709 case NETFLOW_FLAGS7_1:
710 case NETFLOW_SRC_MASK:
711 case NETFLOW_DST_MASK:
712 *((uint8_t *) p) = 0;
713 p += NETFLOW_PAD8_SIZE;
716 *((uint16_t *) p) = flow->tos;
717 p += NETFLOW_XID_SIZE;
720 /* Unsupported (uint16_t) */
723 case NETFLOW_FLAGS7_2:
724 *((uint16_t *) p) = 0;
725 p += NETFLOW_PAD16_SIZE;
729 /* Unsupported (uint32_t) */
730 case NETFLOW_IPV4_NEXT_HOP:
731 case NETFLOW_ROUTER_SC:
732 *((uint32_t *) p) = 0;
733 p += NETFLOW_PAD32_SIZE;
737 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
738 format, i, format[i]);
742 #if ((DEBUG) & DEBUG_F)
743 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
750 Workaround for clone()-based threads
751 Try to change EUID independently of main thread
755 setregid(pw->pw_gid, pw->pw_gid);
756 setreuid(pw->pw_uid, pw->pw_uid);
765 struct timespec timeout;
766 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
768 p = (void *) &emit_packet + netflow->HeaderSize;
774 pthread_mutex_lock(&emit_mutex);
775 while (!flows_emit) {
776 gettimeofday(&now, 0);
777 timeout.tv_sec = now.tv_sec + emit_timeout;
778 /* Do not wait until emit_packet will filled - it may be too long */
779 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
780 pthread_mutex_unlock(&emit_mutex);
785 flows_emit = flows_emit->next;
786 #if ((DEBUG) & DEBUG_I)
789 pthread_mutex_unlock(&emit_mutex);
793 gettime(&start_time);
794 start_time.sec -= start_time_offset;
797 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
801 printf("Emit count = %d\n", emit_count);
804 if (emit_count == netflow->MaxFlows) {
807 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
808 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
809 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
810 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
812 for (i = 0; i < npeers; i++) {
813 if (peers[i].type == PEER_FILE) {
814 if (netflow->SeqOffset)
815 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
816 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
817 ret = write(peers[i].write_fd, emit_packet, size);
820 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
821 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
822 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
826 #if ((DEBUG) & DEBUG_E)
828 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
829 emit_count, i + 1, peers[i].seq);
832 peers[i].seq += emit_count;
835 if (emit_rate_bytes) {
837 delay = sent / emit_rate_bytes;
839 sent %= emit_rate_bytes;
841 timeout.tv_nsec = emit_rate_delay * delay;
842 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
847 if (peers[i].type == PEER_MIRROR) goto sendreal;
849 if (peers[i].type == PEER_ROTATE)
850 if (peer_rot_cur++ == peer_rot_work) {
852 if (netflow->SeqOffset)
853 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
854 ret = send(peers[i].write_fd, emit_packet, size, 0);
856 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
857 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
858 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
861 #if ((DEBUG) & DEBUG_E)
863 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
864 emit_count, i + 1, peers[i].seq);
867 peers[i].seq += emit_count;
870 if (emit_rate_bytes) {
872 delay = sent / emit_rate_bytes;
874 sent %= emit_rate_bytes;
876 timeout.tv_nsec = emit_rate_delay * delay;
877 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
882 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
883 emit_sequence += emit_count;
885 #if ((DEBUG) & DEBUG_I)
892 void *unpending_thread()
895 struct timespec timeout;
896 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
903 pthread_mutex_lock(&unpending_mutex);
906 while (!(pending_tail->flags & FLOW_PENDING)) {
907 gettimeofday(&now, 0);
908 timeout.tv_sec = now.tv_sec + unpending_timeout;
909 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
912 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
915 if (put_into(pending_tail, COPY_INTO
916 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
920 #if ((DEBUG) & DEBUG_I)
921 pkts_lost_unpending++;
925 #if ((DEBUG) & DEBUG_U)
926 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
929 pending_tail->flags = 0;
930 pending_tail = pending_tail->next;
931 #if ((DEBUG) & DEBUG_I)
939 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
943 struct Flow *flow, **flowpp;
945 struct timespec timeout;
950 pthread_mutex_lock(&scan_mutex);
954 timeout.tv_sec = now.sec + scan_interval;
955 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
958 #if ((DEBUG) & DEBUG_S)
959 my_log(LOG_DEBUG, "S: %d", now.sec);
961 for (i = 0; i < 1 << HASH_BITS ; i++) {
962 pthread_mutex_lock(&flows_mutex[i]);
966 if (flow->flags & FLOW_FRAG) {
967 /* Process fragmented flow */
968 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
969 /* Fragmented flow expired - put it into special chain */
970 #if ((DEBUG) & DEBUG_I)
974 *flowpp = flow->next;
976 flow->flags &= ~FLOW_FRAG;
977 flow->next = scan_frag_dreg;
978 scan_frag_dreg = flow;
983 /* Flow is not frgamented */
984 if ((now.sec - flow->mtime.sec) > inactive_lifetime
985 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
987 #if ((DEBUG) & DEBUG_S)
988 my_log(LOG_DEBUG, "S: E %x", flow);
990 #if ((DEBUG) & DEBUG_I)
993 *flowpp = flow->next;
994 pthread_mutex_lock(&emit_mutex);
995 flow->next = flows_emit;
997 #if ((DEBUG) & DEBUG_I)
1000 pthread_mutex_unlock(&emit_mutex);
1005 flowpp = &flow->next;
1008 pthread_mutex_unlock(&flows_mutex[i]);
1010 if (flows_emit) pthread_cond_signal(&emit_cond);
1012 while (scan_frag_dreg) {
1013 flow = scan_frag_dreg;
1014 scan_frag_dreg = flow->next;
1015 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1018 put_into(flow, MOVE_INTO
1019 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1023 #if ((DEBUG) & DEBUG_S)
1024 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1032 struct ulog_packet_msg *ulog_msg;
1036 int len, off_frag, psize;
1037 #if ((DEBUG) & DEBUG_C)
1045 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1047 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1050 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1052 #if ((DEBUG) & DEBUG_C)
1053 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1056 nl = (void *) &ulog_msg->payload;
1057 psize = ulog_msg->data_len;
1060 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1061 #if ((DEBUG) & DEBUG_C)
1062 strcat(logbuf, " U");
1063 my_log(LOG_DEBUG, "%s", logbuf);
1065 #if ((DEBUG) & DEBUG_I)
1071 if (pending_head->flags) {
1072 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1074 # if ((DEBUG) & DEBUG_C)
1079 "pending queue full:", "packet lost");
1081 #if ((DEBUG) & DEBUG_I)
1082 pkts_lost_capture++;
1087 #if ((DEBUG) & DEBUG_I)
1091 flow = pending_head;
1093 /* ?FIXME? Add sanity check for ip_len? */
1094 flow->size = ntohs(nl->ip_len);
1095 #if ((DEBUG) & DEBUG_I)
1096 size_total += flow->size;
1099 flow->sip = nl->ip_src;
1100 flow->dip = nl->ip_dst;
1101 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1102 my_log(LOG_INFO, "Received test flow to corewars.org");
1104 flow->iif = snmp_index(ulog_msg->indev_name);
1105 flow->oif = snmp_index(ulog_msg->outdev_name);
1106 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1107 flow->proto = nl->ip_p;
1109 flow->tcp_flags = 0;
1113 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1114 if (ulog_msg->timestamp_sec) {
1115 flow->ctime.sec = ulog_msg->timestamp_sec;
1116 flow->ctime.usec = ulog_msg->timestamp_usec;
1117 } else gettime(&flow->ctime);
1118 flow->mtime = flow->ctime;
1120 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1123 Offset (from network layer) to transport layer header/IP data
1124 IOW IP header size ;-)
1127 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1129 off_tl = nl->ip_hl << 2;
1130 tl = (void *) nl + off_tl;
1132 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1133 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1135 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1136 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1138 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1139 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1140 #if ((DEBUG) & DEBUG_C)
1141 strcat(logbuf, " F");
1143 #if ((DEBUG) & DEBUG_I)
1144 pkts_total_fragmented++;
1146 flow->flags |= FLOW_FRAG;
1147 flow->id = nl->ip_id;
1149 if (!(ntohs(nl->ip_off) & IP_MF)) {
1150 /* Packet whith IP_MF contains information about whole datagram size */
1151 flow->flags |= FLOW_LASTFRAG;
1152 /* size = frag_offset*8 + data_size */
1153 flow->sizeP = off_frag + flow->sizeF;
1157 #if ((DEBUG) & DEBUG_C)
1158 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1159 strcat(logbuf, buf);
1160 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1161 strcat(logbuf, buf);
1165 Fortunately most interesting transport layer information fit
1166 into first 8 bytes of IP data field (minimal nonzero size).
1167 Thus we don't need actual packet reassembling to build whole
1168 transport layer data. We only check the fragment offset for
1169 zero value to find packet with this information.
1171 if (!off_frag && psize >= 8) {
1172 switch (flow->proto) {
1175 flow->sp = ((struct udphdr *)tl)->uh_sport;
1176 flow->dp = ((struct udphdr *)tl)->uh_dport;
1181 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1182 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1185 #ifdef ICMP_TRICK_CISCO
1187 flow->dp = *((int32_t *) tl);
1192 /* Unknown transport layer */
1193 #if ((DEBUG) & DEBUG_C)
1194 strcat(logbuf, " U");
1201 #if ((DEBUG) & DEBUG_C)
1202 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1203 strcat(logbuf, buf);
1205 flow->flags |= FLOW_TL;
1209 /* Check for tcp flags presence (including CWR and ECE). */
1210 if (flow->proto == IPPROTO_TCP
1212 && psize >= 16 - off_frag) {
1213 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1214 #if ((DEBUG) & DEBUG_C)
1215 sprintf(buf, " TCP:%x", flow->tcp_flags);
1216 strcat(logbuf, buf);
1220 #if ((DEBUG) & DEBUG_C)
1221 sprintf(buf, " => %x", (unsigned) flow);
1222 strcat(logbuf, buf);
1223 my_log(LOG_DEBUG, "%s", logbuf);
1226 #if ((DEBUG) & DEBUG_I)
1228 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1229 if (pending_queue_trace < pending_queue_trace_candidate)
1230 pending_queue_trace = pending_queue_trace_candidate;
1233 /* Flow complete - inform unpending_thread() about it */
1234 pending_head->flags |= FLOW_PENDING;
1235 pending_head = pending_head->next;
1237 pthread_cond_signal(&unpending_cond);
1243 int main(int argc, char **argv)
1246 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1247 int c, i, write_fd, memory_limit = 0;
1248 struct addrinfo hints, *res;
1249 struct sockaddr_in saddr;
1250 pthread_attr_t tattr;
1251 struct sigaction sigact;
1252 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1253 struct timeval timeout;
1255 sched_min = sched_get_priority_min(SCHED);
1256 sched_max = sched_get_priority_max(SCHED);
1258 memset(&saddr, 0 , sizeof(saddr));
1259 memset(&hints, 0 , sizeof(hints));
1260 hints.ai_flags = AI_PASSIVE;
1261 hints.ai_family = AF_INET;
1262 hints.ai_socktype = SOCK_DGRAM;
1264 /* Process command line options */
1267 while ((c = my_getopt(argc, argv, parms)) != -1) {
1277 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1278 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1279 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1280 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1281 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1282 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1283 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1284 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1285 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1286 if (parms[nflag].count) {
1287 switch (atoi(parms[nflag].arg)) {
1289 netflow = &NetFlow1;
1296 netflow = &NetFlow7;
1300 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1304 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1305 if (parms[lflag].count) {
1306 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1309 sprintf(errpbuf, "[%s]", log_suffix);
1310 strcat(ident, errpbuf);
1313 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1314 if (log_suffix) *--log_suffix = ':';
1316 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1318 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1321 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1322 if (parms[qflag].count) {
1323 pending_queue_length = atoi(parms[qflag].arg);
1324 if (pending_queue_length < 1) {
1325 fprintf(stderr, "Illegal %s\n", "pending queue length");
1329 if (parms[rflag].count) {
1330 schedp.sched_priority = atoi(parms[rflag].arg);
1331 if (schedp.sched_priority
1332 && (schedp.sched_priority < sched_min
1333 || schedp.sched_priority > sched_max)) {
1334 fprintf(stderr, "Illegal %s\n", "realtime priority");
1338 if (parms[Bflag].count) {
1339 sockbufsize = atoi(parms[Bflag].arg) << 10;
1341 if (parms[bflag].count) {
1342 bulk_quantity = atoi(parms[bflag].arg);
1343 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1344 fprintf(stderr, "Illegal %s\n", "bulk size");
1348 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1349 if (parms[Xflag].count) {
1350 for(i = 0; parms[Xflag].arg[i]; i++)
1351 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1352 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1354 rule = strtok(parms[Xflag].arg, ":");
1355 for (i = 0; rule; i++) {
1356 snmp_rules[i].len = strlen(rule);
1357 if (snmp_rules[i].len > IFNAMSIZ) {
1358 fprintf(stderr, "Illegal %s\n", "interface basename");
1361 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1362 if (!*(rule - 1)) *(rule - 1) = ',';
1363 rule = strtok(NULL, ",");
1365 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1368 snmp_rules[i].base = atoi(rule);
1370 rule = strtok(NULL, ":");
1374 if (parms[tflag].count)
1375 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1376 if (parms[aflag].count) {
1377 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1379 fprintf(stderr, "Illegal %s\n", "source address");
1382 saddr = *((struct sockaddr_in *) res->ai_addr);
1386 if (parms[uflag].count)
1387 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1388 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1393 /* Process collectors parameters. Brrrr... :-[ */
1395 npeers = argc - optind;
1397 /* Send to remote Netflow collector */
1398 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1399 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1401 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1403 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1404 fprintf(stderr, "socket(): %s\n", strerror(errno));
1407 peers[npeers].write_fd = write_fd;
1408 peers[npeers].type = PEER_MIRROR;
1409 peers[npeers].laddr = saddr;
1410 peers[npeers].seq = 0;
1411 if ((lhost = strchr(dport, '/'))) {
1413 if ((type = strchr(lhost, '/'))) {
1421 peers[npeers].type = PEER_ROTATE;
1430 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1431 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1435 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1436 sizeof(struct sockaddr_in))) {
1437 fprintf(stderr, "bind(): %s\n", strerror(errno));
1440 if (getaddrinfo(dhost, dport, &hints, &res)) {
1442 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1445 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1447 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1448 sizeof(struct sockaddr_in))) {
1449 fprintf(stderr, "connect(): %s\n", strerror(errno));
1453 /* Restore command line */
1454 if (type) *--type = '/';
1455 if (lhost) *--lhost = '/';
1459 else if (parms[fflag].count) {
1461 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1462 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1463 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1465 peers[npeers].write_fd = START_VALUE;
1466 peers[npeers].type = PEER_FILE;
1467 peers[npeers].seq = 0;
1476 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1477 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1479 fprintf(stderr, "libipulog initialization error: %s",
1480 ipulog_strerror(ipulog_errno));
1484 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1485 &sockbufsize, sizeof(sockbufsize)) < 0)
1486 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1488 /* Daemonize (if log destination stdout-free) */
1490 my_log_open(ident, verbosity, log_dest);
1491 if (!(log_dest & 2)) {
1492 /* Crash-proofing - Sapan*/
1496 fprintf(stderr, "fork(): %s", strerror(errno));
1501 freopen("/dev/null", "r", stdin);
1502 freopen("/dev/null", "w", stdout);
1503 freopen("/dev/null", "w", stderr);
1507 while (wait3(NULL,0,NULL) < 1);
1511 setvbuf(stdout, (char *)0, _IONBF, 0);
1512 setvbuf(stderr, (char *)0, _IONBF, 0);
1516 sprintf(errpbuf, "[%ld]", (long) pid);
1517 strcat(ident, errpbuf);
1519 /* Initialization */
1521 hash_init(); /* Actually for crc16 only */
1522 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1523 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1526 /* Hope 12 days is enough :-/ */
1527 start_time_offset = 1 << 20;
1529 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1531 gettime(&start_time);
1534 Build static pending queue as circular buffer.
1536 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1537 pending_tail = pending_head;
1538 for (i = pending_queue_length - 1; i--;) {
1539 if (!(pending_tail->next = mem_alloc())) {
1541 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1544 pending_tail = pending_tail->next;
1546 pending_tail->next = pending_head;
1547 pending_tail = pending_head;
1549 sigemptyset(&sig_mask);
1550 sigact.sa_handler = &sighandler;
1551 sigact.sa_mask = sig_mask;
1552 sigact.sa_flags = 0;
1553 sigaddset(&sig_mask, SIGTERM);
1554 sigaction(SIGTERM, &sigact, 0);
1555 #if ((DEBUG) & DEBUG_I)
1556 sigaddset(&sig_mask, SIGUSR1);
1557 sigaction(SIGUSR1, &sigact, 0);
1559 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1560 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1564 my_log(LOG_INFO, "Starting %s...", VERSION);
1566 if (parms[cflag].count) {
1567 if (chdir(parms[cflag].arg) || chroot(".")) {
1568 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1573 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1574 pthread_attr_init(&tattr);
1575 for (i = 0; i < THREADS - 1; i++) {
1576 if (schedp.sched_priority > 0) {
1577 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1578 (pthread_attr_setschedparam(&tattr, &schedp))) {
1579 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1583 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1584 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1587 pthread_detach(thid);
1588 schedp.sched_priority++;
1592 if (setgroups(0, NULL)) {
1593 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1596 if (setregid(pw->pw_gid, pw->pw_gid)) {
1597 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1600 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1601 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1606 if (!(pidfile = fopen(pidfilepath, "w")))
1607 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1609 fprintf(pidfile, "%ld\n", (long) pid);
1613 my_log(LOG_INFO, "pid: %d", pid);
1614 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1615 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1616 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1617 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1618 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1619 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1620 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1621 for (i = 0; i < nsnmp_rules; i++) {
1622 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1623 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1625 for (i = 0; i < npeers; i++) {
1626 switch (peers[i].type) {
1634 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1635 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1636 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1639 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1641 timeout.tv_usec = 0;
1643 || (total_elements - free_elements - pending_queue_length)
1645 || pending_tail->flags) {
1648 timeout.tv_sec = scan_interval;
1649 select(0, 0, 0, 0, &timeout);
1652 if (sigs & SIGTERM_MASK && !killed) {
1653 sigs &= ~SIGTERM_MASK;
1654 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1657 active_lifetime = -1;
1658 inactive_lifetime = -1;
1660 unpending_timeout = 1;
1662 pthread_cond_signal(&scan_cond);
1663 pthread_cond_signal(&unpending_cond);
1666 #if ((DEBUG) & DEBUG_I)
1667 if (sigs & SIGUSR1_MASK) {
1668 sigs &= ~SIGUSR1_MASK;
1673 remove(pidfilepath);
1674 #if ((DEBUG) & DEBUG_I)
1677 my_log(LOG_INFO, "Done.");