2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
11 Added data collection (-f) functionality, xid support in the header and log file
17 /* stdout, stderr, freopen() */
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
34 #include <sys/statfs.h>
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
40 struct sockaddr_nl local;
41 struct sockaddr_nl peer;
42 struct nlmsghdr* last_nlhdr;
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
57 #include <sys/param.h>
82 #include <sys/select.h>
88 #include <fprobe-ulog.h>
90 #include <my_getopt.h>
123 static struct getopt_parms parms[] = {
124 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 extern int optind, opterr, optopt;
154 extern struct NetFlow NetFlow1;
155 extern struct NetFlow NetFlow5;
156 extern struct NetFlow NetFlow7;
158 #define START_VALUE -5
159 #define mark_is_tos parms[Mflag].count
160 static unsigned scan_interval = 5;
161 static int min_free = 0;
162 static int frag_lifetime = 30;
163 static int inactive_lifetime = 60;
164 static int active_lifetime = 300;
165 static int sockbufsize;
166 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
167 #if (MEM_BITS == 0) || (MEM_BITS == 16)
168 #define BULK_QUANTITY 10000
170 #define BULK_QUANTITY 200
173 static unsigned epoch_length=60, log_epochs=1;
174 static unsigned cur_epoch=0,prev_uptime=0;
176 static unsigned bulk_quantity = BULK_QUANTITY;
177 static unsigned pending_queue_length = 100;
178 static struct NetFlow *netflow = &NetFlow5;
179 static unsigned verbosity = 6;
180 static unsigned log_dest = MY_LOG_SYSLOG;
181 static struct Time start_time;
182 static long start_time_offset;
185 extern unsigned total_elements;
186 extern unsigned free_elements;
187 extern unsigned total_memory;
188 #if ((DEBUG) & DEBUG_I)
189 static unsigned emit_pkts, emit_queue;
190 static uint64_t size_total;
191 static unsigned pkts_total, pkts_total_fragmented;
192 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
193 static unsigned pkts_pending, pkts_pending_done;
194 static unsigned pending_queue_trace, pending_queue_trace_candidate;
195 static unsigned flows_total, flows_fragmented;
197 static unsigned emit_count;
198 static uint32_t emit_sequence;
199 static unsigned emit_rate_bytes, emit_rate_delay;
200 static struct Time emit_time;
201 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
202 static pthread_t thid;
203 static sigset_t sig_mask;
204 static struct sched_param schedp;
205 static int sched_min, sched_max;
206 static int npeers, npeers_rot;
207 static struct peer *peers;
210 static struct Flow *flows[1 << HASH_BITS];
211 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
213 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
216 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
217 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
218 static struct Flow *pending_head, *pending_tail;
219 static struct Flow *scan_frag_dreg;
221 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
222 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
223 static struct Flow *flows_emit;
225 static char ident[256] = "fprobe-ulog";
226 static FILE *pidfile;
227 static char *pidfilepath;
230 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
231 static struct ipulog_handle *ulog_handle;
232 static uint32_t ulog_gmask = 1;
233 static char *cap_buf;
234 static int nsnmp_rules;
235 static struct snmp_rule *snmp_rules;
236 static struct passwd *pw = 0;
241 "fprobe-ulog: a NetFlow probe. Version %s\n"
242 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
244 "-h\t\tDisplay this help\n"
245 "-U <mask>\tULOG group bitwise mask [1]\n"
246 "-s <seconds>\tHow often scan for expired flows [5]\n"
247 "-g <seconds>\tFragmented flow lifetime [30]\n"
248 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
249 "-f <filename>\tLog flow data in a file\n"
250 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
251 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
252 "-a <address>\tUse <address> as source for NetFlow flow\n"
253 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
254 "-M\t\tUse netfilter mark value as ToS flag\n"
255 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
256 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
257 "-q <flows>\tPending queue length [100]\n"
258 "-B <kilobytes>\tKernel capture buffer size [0]\n"
259 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
260 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
261 "-c <directory>\tDirectory to chroot to\n"
262 "-u <user>\tUser to run as\n"
263 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
264 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
265 "-y <remote:port>\tAddress of the NetFlow collector\n"
266 "-f <writable file>\tFile to write data into\n"
267 "-T <n>\tRotate log file every n epochs\n"
268 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
269 "-E <[1..60]>\tSize of an epoch in minutes\n"
270 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
272 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
276 #if ((DEBUG) & DEBUG_I)
279 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
280 pkts_total, pkts_total_fragmented, size_total,
281 pkts_pending - pkts_pending_done, pending_queue_trace);
282 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
283 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
284 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
285 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
286 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
287 total_elements, free_elements, total_memory);
291 void sighandler(int sig)
295 sigs |= SIGTERM_MASK;
297 #if ((DEBUG) & DEBUG_I)
299 sigs |= SIGUSR1_MASK;
305 void gettime(struct Time *now)
311 now->usec = t.tv_usec;
315 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
317 return (t1->sec - t2->sec)/60;
320 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
322 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
325 /* Uptime in miliseconds */
326 uint32_t getuptime(struct Time *t)
328 /* Maximum uptime is about 49/2 days */
329 return cmpmtime(t, &start_time);
332 /* Uptime in minutes */
333 uint32_t getuptime_minutes(struct Time *t)
335 /* Maximum uptime is about 49/2 days */
336 return cmpMtime(t, &start_time);
339 hash_t hash_flow(struct Flow *flow)
341 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
342 else return hash(flow, sizeof(struct Flow_TL));
345 uint16_t snmp_index(char *name) {
348 if (!*name) return 0;
350 for (i = 0; (int) i < nsnmp_rules; i++) {
351 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
352 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
355 if ((i = if_nametoindex(name))) return i;
360 inline void copy_flow(struct Flow *src, struct Flow *dst)
367 dst->proto = src->proto;
368 dst->tcp_flags = src->tcp_flags;
372 dst->pkts = src->pkts;
373 dst->size = src->size;
374 dst->sizeF = src->sizeF;
375 dst->sizeP = src->sizeP;
376 dst->ctime = src->ctime;
377 dst->mtime = src->mtime;
378 dst->flags = src->flags;
381 void get_cur_epoch() {
383 fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
387 len = read(fd, snum, sizeof(snum)-1);
390 sscanf(snum,"%d",&cur_epoch);
398 void update_cur_epoch_file(int n) {
401 len=snprintf(snum,6,"%d",n);
402 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
404 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
407 write(fd, snum, len);
411 unsigned get_log_fd(char *fname, unsigned cur_fd) {
414 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
415 * doesn't solve the problem */
417 struct statfs statfs;
420 cur_uptime = getuptime_minutes(&now);
422 if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
423 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
426 if (min_free && statfs.f_bfree < min_free)
428 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
429 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
432 my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
437 /* Epoch length in minutes */
438 if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
439 char nextname[MAX_PATH_LEN];
441 prev_uptime = cur_uptime;
442 cur_epoch = (cur_epoch + 1) % log_epochs;
444 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
445 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
446 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
449 update_cur_epoch_file(cur_epoch);
457 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
459 struct Flow **flowpp;
465 if (prev) flowpp = *prev;
468 if (where->sip.s_addr == what->sip.s_addr
469 && where->dip.s_addr == what->dip.s_addr
470 && where->proto == what->proto) {
471 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
473 /* Both unfragmented */
474 if ((what->sp == where->sp)
475 && (what->dp == where->dp)) goto done;
478 /* Both fragmented */
479 if (where->id == what->id) goto done;
483 flowpp = &where->next;
487 if (prev) *prev = flowpp;
491 int put_into(struct Flow *flow, int flag
492 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
499 struct Flow *flown, **flowpp;
500 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
505 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
506 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
509 pthread_mutex_lock(&flows_mutex[h]);
511 if (!(flown = find(flows[h], flow, &flowpp))) {
512 /* No suitable flow found - add */
513 if (flag == COPY_INTO) {
514 if ((flown = mem_alloc())) {
515 copy_flow(flow, flown);
518 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
519 my_log(LOG_ERR, "%s %s. %s",
520 "mem_alloc():", strerror(errno), "packet lost");
525 flow->next = flows[h];
527 #if ((DEBUG) & DEBUG_I)
529 if (flow->flags & FLOW_FRAG) flows_fragmented++;
531 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
533 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
538 /* Found suitable flow - update */
539 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
540 sprintf(buf, " +> %x", (unsigned) flown);
543 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
544 flown->mtime = flow->mtime;
545 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
546 flown->ctime = flow->ctime;
547 flown->tcp_flags |= flow->tcp_flags;
548 flown->size += flow->size;
549 flown->pkts += flow->pkts;
550 if (flow->flags & FLOW_FRAG) {
551 /* Fragmented flow require some additional work */
552 if (flow->flags & FLOW_TL) {
555 Several packets with FLOW_TL (attack)
557 flown->sp = flow->sp;
558 flown->dp = flow->dp;
560 if (flow->flags & FLOW_LASTFRAG) {
563 Several packets with FLOW_LASTFRAG (attack)
565 flown->sizeP = flow->sizeP;
567 flown->flags |= flow->flags;
568 flown->sizeF += flow->sizeF;
569 if ((flown->flags & FLOW_LASTFRAG)
570 && (flown->sizeF >= flown->sizeP)) {
571 /* All fragments received - flow reassembled */
572 *flowpp = flown->next;
573 pthread_mutex_unlock(&flows_mutex[h]);
574 #if ((DEBUG) & DEBUG_I)
579 flown->flags &= ~FLOW_FRAG;
580 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
583 ret = put_into(flown, MOVE_INTO
584 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
590 if (flag == MOVE_INTO) mem_free(flow);
592 pthread_mutex_unlock(&flows_mutex[h]);
596 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
600 for (i = 0; i < fields; i++) {
601 #if ((DEBUG) & DEBUG_F)
602 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
605 case NETFLOW_IPV4_SRC_ADDR:
606 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
607 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
610 case NETFLOW_IPV4_DST_ADDR:
611 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
612 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
613 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
615 p += NETFLOW_IPV4_DST_ADDR_SIZE;
618 case NETFLOW_INPUT_SNMP:
619 *((uint16_t *) p) = htons(flow->iif);
620 p += NETFLOW_INPUT_SNMP_SIZE;
623 case NETFLOW_OUTPUT_SNMP:
624 *((uint16_t *) p) = htons(flow->oif);
625 p += NETFLOW_OUTPUT_SNMP_SIZE;
628 case NETFLOW_PKTS_32:
629 *((uint32_t *) p) = htonl(flow->pkts);
630 p += NETFLOW_PKTS_32_SIZE;
633 case NETFLOW_BYTES_32:
634 *((uint32_t *) p) = htonl(flow->size);
635 p += NETFLOW_BYTES_32_SIZE;
638 case NETFLOW_FIRST_SWITCHED:
639 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
640 p += NETFLOW_FIRST_SWITCHED_SIZE;
643 case NETFLOW_LAST_SWITCHED:
644 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
645 p += NETFLOW_LAST_SWITCHED_SIZE;
648 case NETFLOW_L4_SRC_PORT:
649 *((uint16_t *) p) = flow->sp;
650 p += NETFLOW_L4_SRC_PORT_SIZE;
653 case NETFLOW_L4_DST_PORT:
654 *((uint16_t *) p) = flow->dp;
655 p += NETFLOW_L4_DST_PORT_SIZE;
659 *((uint8_t *) p) = flow->proto;
660 p += NETFLOW_PROT_SIZE;
663 case NETFLOW_SRC_TOS:
664 *((uint8_t *) p) = flow->tos;
665 p += NETFLOW_SRC_TOS_SIZE;
668 case NETFLOW_TCP_FLAGS:
669 *((uint8_t *) p) = flow->tcp_flags;
670 p += NETFLOW_TCP_FLAGS_SIZE;
673 case NETFLOW_VERSION:
674 *((uint16_t *) p) = htons(netflow->Version);
675 p += NETFLOW_VERSION_SIZE;
679 *((uint16_t *) p) = htons(emit_count);
680 p += NETFLOW_COUNT_SIZE;
684 *((uint32_t *) p) = htonl(getuptime(&emit_time));
685 p += NETFLOW_UPTIME_SIZE;
688 case NETFLOW_UNIX_SECS:
689 *((uint32_t *) p) = htonl(emit_time.sec);
690 p += NETFLOW_UNIX_SECS_SIZE;
693 case NETFLOW_UNIX_NSECS:
694 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
695 p += NETFLOW_UNIX_NSECS_SIZE;
698 case NETFLOW_FLOW_SEQUENCE:
699 //*((uint32_t *) p) = htonl(emit_sequence);
700 *((uint32_t *) p) = 0;
701 p += NETFLOW_FLOW_SEQUENCE_SIZE;
705 /* Unsupported (uint8_t) */
706 case NETFLOW_ENGINE_TYPE:
707 case NETFLOW_ENGINE_ID:
708 case NETFLOW_FLAGS7_1:
709 case NETFLOW_SRC_MASK:
710 case NETFLOW_DST_MASK:
711 *((uint8_t *) p) = 0;
712 p += NETFLOW_PAD8_SIZE;
715 *((uint16_t *) p) = flow->tos;
716 p += NETFLOW_XID_SIZE;
719 /* Unsupported (uint16_t) */
722 case NETFLOW_FLAGS7_2:
723 *((uint16_t *) p) = 0;
724 p += NETFLOW_PAD16_SIZE;
728 /* Unsupported (uint32_t) */
729 case NETFLOW_IPV4_NEXT_HOP:
730 case NETFLOW_ROUTER_SC:
731 *((uint32_t *) p) = 0;
732 p += NETFLOW_PAD32_SIZE;
736 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
737 format, i, format[i]);
741 #if ((DEBUG) & DEBUG_F)
742 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
749 Workaround for clone()-based threads
750 Try to change EUID independently of main thread
754 setregid(pw->pw_gid, pw->pw_gid);
755 setreuid(pw->pw_uid, pw->pw_uid);
764 struct timespec timeout;
765 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
767 p = (void *) &emit_packet + netflow->HeaderSize;
773 pthread_mutex_lock(&emit_mutex);
774 while (!flows_emit) {
775 gettimeofday(&now, 0);
776 timeout.tv_sec = now.tv_sec + emit_timeout;
777 /* Do not wait until emit_packet will filled - it may be too long */
778 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
779 pthread_mutex_unlock(&emit_mutex);
784 flows_emit = flows_emit->next;
785 #if ((DEBUG) & DEBUG_I)
788 pthread_mutex_unlock(&emit_mutex);
792 gettime(&start_time);
793 start_time.sec -= start_time_offset;
796 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
800 printf("Emit count = %d\n", emit_count);
803 if (emit_count == netflow->MaxFlows) {
806 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
807 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
808 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
809 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
811 for (i = 0; i < npeers; i++) {
812 if (peers[i].type == PEER_FILE) {
813 if (netflow->SeqOffset)
814 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
815 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
816 ret = write(peers[i].write_fd, emit_packet, size);
819 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
820 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
821 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
825 #if ((DEBUG) & DEBUG_E)
827 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
828 emit_count, i + 1, peers[i].seq);
831 peers[i].seq += emit_count;
834 if (emit_rate_bytes) {
836 delay = sent / emit_rate_bytes;
838 sent %= emit_rate_bytes;
840 timeout.tv_nsec = emit_rate_delay * delay;
841 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
846 if (peers[i].type == PEER_MIRROR) goto sendreal;
848 if (peers[i].type == PEER_ROTATE)
849 if (peer_rot_cur++ == peer_rot_work) {
851 if (netflow->SeqOffset)
852 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
853 ret = send(peers[i].write_fd, emit_packet, size, 0);
855 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
856 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
857 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
860 #if ((DEBUG) & DEBUG_E)
862 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
863 emit_count, i + 1, peers[i].seq);
866 peers[i].seq += emit_count;
869 if (emit_rate_bytes) {
871 delay = sent / emit_rate_bytes;
873 sent %= emit_rate_bytes;
875 timeout.tv_nsec = emit_rate_delay * delay;
876 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
881 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
882 emit_sequence += emit_count;
884 #if ((DEBUG) & DEBUG_I)
891 void *unpending_thread()
894 struct timespec timeout;
895 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
902 pthread_mutex_lock(&unpending_mutex);
905 while (!(pending_tail->flags & FLOW_PENDING)) {
906 gettimeofday(&now, 0);
907 timeout.tv_sec = now.tv_sec + unpending_timeout;
908 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
911 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
914 if (put_into(pending_tail, COPY_INTO
915 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
919 #if ((DEBUG) & DEBUG_I)
920 pkts_lost_unpending++;
924 #if ((DEBUG) & DEBUG_U)
925 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
928 pending_tail->flags = 0;
929 pending_tail = pending_tail->next;
930 #if ((DEBUG) & DEBUG_I)
938 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
942 struct Flow *flow, **flowpp;
944 struct timespec timeout;
949 pthread_mutex_lock(&scan_mutex);
953 timeout.tv_sec = now.sec + scan_interval;
954 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
957 #if ((DEBUG) & DEBUG_S)
958 my_log(LOG_DEBUG, "S: %d", now.sec);
960 for (i = 0; i < 1 << HASH_BITS ; i++) {
961 pthread_mutex_lock(&flows_mutex[i]);
965 if (flow->flags & FLOW_FRAG) {
966 /* Process fragmented flow */
967 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
968 /* Fragmented flow expired - put it into special chain */
969 #if ((DEBUG) & DEBUG_I)
973 *flowpp = flow->next;
975 flow->flags &= ~FLOW_FRAG;
976 flow->next = scan_frag_dreg;
977 scan_frag_dreg = flow;
982 /* Flow is not frgamented */
983 if ((now.sec - flow->mtime.sec) > inactive_lifetime
984 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
986 #if ((DEBUG) & DEBUG_S)
987 my_log(LOG_DEBUG, "S: E %x", flow);
989 #if ((DEBUG) & DEBUG_I)
992 *flowpp = flow->next;
993 pthread_mutex_lock(&emit_mutex);
994 flow->next = flows_emit;
996 #if ((DEBUG) & DEBUG_I)
999 pthread_mutex_unlock(&emit_mutex);
1004 flowpp = &flow->next;
1007 pthread_mutex_unlock(&flows_mutex[i]);
1009 if (flows_emit) pthread_cond_signal(&emit_cond);
1011 while (scan_frag_dreg) {
1012 flow = scan_frag_dreg;
1013 scan_frag_dreg = flow->next;
1014 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1017 put_into(flow, MOVE_INTO
1018 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1022 #if ((DEBUG) & DEBUG_S)
1023 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1031 struct ulog_packet_msg *ulog_msg;
1035 int len, off_frag, psize;
1036 #if ((DEBUG) & DEBUG_C)
1044 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1046 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1049 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1051 #if ((DEBUG) & DEBUG_C)
1052 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1055 nl = (void *) &ulog_msg->payload;
1056 psize = ulog_msg->data_len;
1059 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1060 #if ((DEBUG) & DEBUG_C)
1061 strcat(logbuf, " U");
1062 my_log(LOG_DEBUG, "%s", logbuf);
1064 #if ((DEBUG) & DEBUG_I)
1070 if (pending_head->flags) {
1071 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1073 # if ((DEBUG) & DEBUG_C)
1078 "pending queue full:", "packet lost");
1080 #if ((DEBUG) & DEBUG_I)
1081 pkts_lost_capture++;
1086 #if ((DEBUG) & DEBUG_I)
1090 flow = pending_head;
1092 /* ?FIXME? Add sanity check for ip_len? */
1093 flow->size = ntohs(nl->ip_len);
1094 #if ((DEBUG) & DEBUG_I)
1095 size_total += flow->size;
1098 flow->sip = nl->ip_src;
1099 flow->dip = nl->ip_dst;
1100 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1101 my_log(LOG_INFO, "Received test flow to corewars.org");
1103 flow->iif = snmp_index(ulog_msg->indev_name);
1104 flow->oif = snmp_index(ulog_msg->outdev_name);
1105 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1106 flow->proto = nl->ip_p;
1108 flow->tcp_flags = 0;
1112 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1113 if (ulog_msg->timestamp_sec) {
1114 flow->ctime.sec = ulog_msg->timestamp_sec;
1115 flow->ctime.usec = ulog_msg->timestamp_usec;
1116 } else gettime(&flow->ctime);
1117 flow->mtime = flow->ctime;
1119 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1122 Offset (from network layer) to transport layer header/IP data
1123 IOW IP header size ;-)
1126 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1128 off_tl = nl->ip_hl << 2;
1129 tl = (void *) nl + off_tl;
1131 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1132 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1134 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1135 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1137 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1138 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1139 #if ((DEBUG) & DEBUG_C)
1140 strcat(logbuf, " F");
1142 #if ((DEBUG) & DEBUG_I)
1143 pkts_total_fragmented++;
1145 flow->flags |= FLOW_FRAG;
1146 flow->id = nl->ip_id;
1148 if (!(ntohs(nl->ip_off) & IP_MF)) {
1149 /* Packet whith IP_MF contains information about whole datagram size */
1150 flow->flags |= FLOW_LASTFRAG;
1151 /* size = frag_offset*8 + data_size */
1152 flow->sizeP = off_frag + flow->sizeF;
1156 #if ((DEBUG) & DEBUG_C)
1157 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1158 strcat(logbuf, buf);
1159 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1160 strcat(logbuf, buf);
1164 Fortunately most interesting transport layer information fit
1165 into first 8 bytes of IP data field (minimal nonzero size).
1166 Thus we don't need actual packet reassembling to build whole
1167 transport layer data. We only check the fragment offset for
1168 zero value to find packet with this information.
1170 if (!off_frag && psize >= 8) {
1171 switch (flow->proto) {
1174 flow->sp = ((struct udphdr *)tl)->uh_sport;
1175 flow->dp = ((struct udphdr *)tl)->uh_dport;
1180 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1181 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1184 #ifdef ICMP_TRICK_CISCO
1186 flow->dp = *((int32_t *) tl);
1191 /* Unknown transport layer */
1192 #if ((DEBUG) & DEBUG_C)
1193 strcat(logbuf, " U");
1200 #if ((DEBUG) & DEBUG_C)
1201 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1202 strcat(logbuf, buf);
1204 flow->flags |= FLOW_TL;
1208 /* Check for tcp flags presence (including CWR and ECE). */
1209 if (flow->proto == IPPROTO_TCP
1211 && psize >= 16 - off_frag) {
1212 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1213 #if ((DEBUG) & DEBUG_C)
1214 sprintf(buf, " TCP:%x", flow->tcp_flags);
1215 strcat(logbuf, buf);
1219 #if ((DEBUG) & DEBUG_C)
1220 sprintf(buf, " => %x", (unsigned) flow);
1221 strcat(logbuf, buf);
1222 my_log(LOG_DEBUG, "%s", logbuf);
1225 #if ((DEBUG) & DEBUG_I)
1227 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1228 if (pending_queue_trace < pending_queue_trace_candidate)
1229 pending_queue_trace = pending_queue_trace_candidate;
1232 /* Flow complete - inform unpending_thread() about it */
1233 pending_head->flags |= FLOW_PENDING;
1234 pending_head = pending_head->next;
1236 pthread_cond_signal(&unpending_cond);
1242 int main(int argc, char **argv)
1245 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1246 int c, i, write_fd, memory_limit = 0;
1247 struct addrinfo hints, *res;
1248 struct sockaddr_in saddr;
1249 pthread_attr_t tattr;
1250 struct sigaction sigact;
1251 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1252 struct timeval timeout;
1254 sched_min = sched_get_priority_min(SCHED);
1255 sched_max = sched_get_priority_max(SCHED);
1257 memset(&saddr, 0 , sizeof(saddr));
1258 memset(&hints, 0 , sizeof(hints));
1259 hints.ai_flags = AI_PASSIVE;
1260 hints.ai_family = AF_INET;
1261 hints.ai_socktype = SOCK_DGRAM;
1263 /* Process command line options */
1266 while ((c = my_getopt(argc, argv, parms)) != -1) {
1276 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1277 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1278 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1279 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1280 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1281 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1282 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1283 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1284 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1285 if (parms[nflag].count) {
1286 switch (atoi(parms[nflag].arg)) {
1288 netflow = &NetFlow1;
1295 netflow = &NetFlow7;
1299 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1303 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1304 if (parms[lflag].count) {
1305 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1308 sprintf(errpbuf, "[%s]", log_suffix);
1309 strcat(ident, errpbuf);
1312 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1313 if (log_suffix) *--log_suffix = ':';
1315 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1317 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1320 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1321 if (parms[qflag].count) {
1322 pending_queue_length = atoi(parms[qflag].arg);
1323 if (pending_queue_length < 1) {
1324 fprintf(stderr, "Illegal %s\n", "pending queue length");
1328 if (parms[rflag].count) {
1329 schedp.sched_priority = atoi(parms[rflag].arg);
1330 if (schedp.sched_priority
1331 && (schedp.sched_priority < sched_min
1332 || schedp.sched_priority > sched_max)) {
1333 fprintf(stderr, "Illegal %s\n", "realtime priority");
1337 if (parms[Bflag].count) {
1338 sockbufsize = atoi(parms[Bflag].arg) << 10;
1340 if (parms[bflag].count) {
1341 bulk_quantity = atoi(parms[bflag].arg);
1342 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1343 fprintf(stderr, "Illegal %s\n", "bulk size");
1347 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1348 if (parms[Xflag].count) {
1349 for(i = 0; parms[Xflag].arg[i]; i++)
1350 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1351 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1353 rule = strtok(parms[Xflag].arg, ":");
1354 for (i = 0; rule; i++) {
1355 snmp_rules[i].len = strlen(rule);
1356 if (snmp_rules[i].len > IFNAMSIZ) {
1357 fprintf(stderr, "Illegal %s\n", "interface basename");
1360 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1361 if (!*(rule - 1)) *(rule - 1) = ',';
1362 rule = strtok(NULL, ",");
1364 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1367 snmp_rules[i].base = atoi(rule);
1369 rule = strtok(NULL, ":");
1373 if (parms[tflag].count)
1374 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1375 if (parms[aflag].count) {
1376 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1378 fprintf(stderr, "Illegal %s\n", "source address");
1381 saddr = *((struct sockaddr_in *) res->ai_addr);
1385 if (parms[uflag].count)
1386 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1387 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1392 /* Process collectors parameters. Brrrr... :-[ */
1394 npeers = argc - optind;
1396 /* Send to remote Netflow collector */
1397 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1398 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1400 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1402 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1403 fprintf(stderr, "socket(): %s\n", strerror(errno));
1406 peers[npeers].write_fd = write_fd;
1407 peers[npeers].type = PEER_MIRROR;
1408 peers[npeers].laddr = saddr;
1409 peers[npeers].seq = 0;
1410 if ((lhost = strchr(dport, '/'))) {
1412 if ((type = strchr(lhost, '/'))) {
1420 peers[npeers].type = PEER_ROTATE;
1429 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1430 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1434 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1435 sizeof(struct sockaddr_in))) {
1436 fprintf(stderr, "bind(): %s\n", strerror(errno));
1439 if (getaddrinfo(dhost, dport, &hints, &res)) {
1441 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1444 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1446 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1447 sizeof(struct sockaddr_in))) {
1448 fprintf(stderr, "connect(): %s\n", strerror(errno));
1452 /* Restore command line */
1453 if (type) *--type = '/';
1454 if (lhost) *--lhost = '/';
1458 else if (parms[fflag].count) {
1460 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1461 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1462 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1464 peers[npeers].write_fd = START_VALUE;
1465 peers[npeers].type = PEER_FILE;
1466 peers[npeers].seq = 0;
1475 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1476 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1478 fprintf(stderr, "libipulog initialization error: %s",
1479 ipulog_strerror(ipulog_errno));
1483 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1484 &sockbufsize, sizeof(sockbufsize)) < 0)
1485 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1487 /* Daemonize (if log destination stdout-free) */
1489 my_log_open(ident, verbosity, log_dest);
1490 if (!(log_dest & 2)) {
1491 /* Crash-proofing - Sapan*/
1495 fprintf(stderr, "fork(): %s", strerror(errno));
1500 freopen("/dev/null", "r", stdin);
1501 freopen("/dev/null", "w", stdout);
1502 freopen("/dev/null", "w", stderr);
1506 while (wait3(NULL,0,NULL) < 1);
1510 setvbuf(stdout, (char *)0, _IONBF, 0);
1511 setvbuf(stderr, (char *)0, _IONBF, 0);
1515 sprintf(errpbuf, "[%ld]", (long) pid);
1516 strcat(ident, errpbuf);
1518 /* Initialization */
1520 hash_init(); /* Actually for crc16 only */
1521 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1522 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1525 /* Hope 12 days is enough :-/ */
1526 start_time_offset = 1 << 20;
1528 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1530 gettime(&start_time);
1533 Build static pending queue as circular buffer.
1535 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1536 pending_tail = pending_head;
1537 for (i = pending_queue_length - 1; i--;) {
1538 if (!(pending_tail->next = mem_alloc())) {
1540 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1543 pending_tail = pending_tail->next;
1545 pending_tail->next = pending_head;
1546 pending_tail = pending_head;
1548 sigemptyset(&sig_mask);
1549 sigact.sa_handler = &sighandler;
1550 sigact.sa_mask = sig_mask;
1551 sigact.sa_flags = 0;
1552 sigaddset(&sig_mask, SIGTERM);
1553 sigaction(SIGTERM, &sigact, 0);
1554 #if ((DEBUG) & DEBUG_I)
1555 sigaddset(&sig_mask, SIGUSR1);
1556 sigaction(SIGUSR1, &sigact, 0);
1558 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1559 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1563 my_log(LOG_INFO, "Starting %s...", VERSION);
1565 if (parms[cflag].count) {
1566 if (chdir(parms[cflag].arg) || chroot(".")) {
1567 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1572 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1573 pthread_attr_init(&tattr);
1574 for (i = 0; i < THREADS - 1; i++) {
1575 if (schedp.sched_priority > 0) {
1576 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1577 (pthread_attr_setschedparam(&tattr, &schedp))) {
1578 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1582 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1583 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1586 pthread_detach(thid);
1587 schedp.sched_priority++;
1591 if (setgroups(0, NULL)) {
1592 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1595 if (setregid(pw->pw_gid, pw->pw_gid)) {
1596 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1599 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1600 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1605 if (!(pidfile = fopen(pidfilepath, "w")))
1606 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1608 fprintf(pidfile, "%ld\n", (long) pid);
1612 my_log(LOG_INFO, "pid: %d", pid);
1613 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1614 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1615 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1616 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1617 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1618 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1619 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1620 for (i = 0; i < nsnmp_rules; i++) {
1621 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1622 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1624 for (i = 0; i < npeers; i++) {
1625 switch (peers[i].type) {
1633 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1634 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1635 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1638 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1640 timeout.tv_usec = 0;
1642 || (total_elements - free_elements - pending_queue_length)
1644 || pending_tail->flags) {
1647 timeout.tv_sec = scan_interval;
1648 select(0, 0, 0, 0, &timeout);
1651 if (sigs & SIGTERM_MASK && !killed) {
1652 sigs &= ~SIGTERM_MASK;
1653 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1656 active_lifetime = -1;
1657 inactive_lifetime = -1;
1659 unpending_timeout = 1;
1661 pthread_cond_signal(&scan_cond);
1662 pthread_cond_signal(&unpending_cond);
1665 #if ((DEBUG) & DEBUG_I)
1666 if (sigs & SIGUSR1_MASK) {
1667 sigs &= ~SIGUSR1_MASK;
1672 remove(pidfilepath);
1673 #if ((DEBUG) & DEBUG_I)
1676 my_log(LOG_INFO, "Done.");