2 Copyright (C) Slava Astashonok <sla@0n.ru>
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License.
7 $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
9 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
11 Added data collection (-f) functionality, xid support in the header and log file
17 /* stdout, stderr, freopen() */
23 /* getopt(), alarm(), getpid(), sedsid(), chdir() */
34 #include <sys/statfs.h>
36 #include <libipulog/libipulog.h>
37 struct ipulog_handle {
40 struct sockaddr_nl local;
41 struct sockaddr_nl peer;
42 struct nlmsghdr* last_nlhdr;
45 /* inet_*() (Linux, FreeBSD, Solaris), getpid() */
46 #include <sys/types.h>
47 #include <netinet/in_systm.h>
48 #include <sys/socket.h>
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51 #include <netinet/ip.h>
52 #include <netinet/tcp.h>
53 #include <netinet/udp.h>
54 #include <netinet/ip_icmp.h>
57 #include <sys/param.h>
82 #include <sys/select.h>
88 #include <fprobe-ulog.h>
90 #include <my_getopt.h>
123 static struct getopt_parms parms[] = {
124 {'a', MY_GETOPT_ARG_REQUIRED, 0, 0},
125 {'B', MY_GETOPT_ARG_REQUIRED, 0, 0},
126 {'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
127 {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
128 {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
129 {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
130 {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
131 {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
132 {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
134 {'l', MY_GETOPT_ARG_REQUIRED, 0, 0},
135 {'m', MY_GETOPT_ARG_REQUIRED, 0, 0},
137 {'n', MY_GETOPT_ARG_REQUIRED, 0, 0},
138 {'q', MY_GETOPT_ARG_REQUIRED, 0, 0},
139 {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
140 {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
141 {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
142 {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
143 {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
144 {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
145 {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
146 {'X', MY_GETOPT_ARG_REQUIRED, 0, 0},
151 extern int optind, opterr, optopt;
154 extern struct NetFlow NetFlow1;
155 extern struct NetFlow NetFlow5;
156 extern struct NetFlow NetFlow7;
158 #define START_VALUE -5
159 #define mark_is_tos parms[Mflag].count
160 static unsigned scan_interval = 5;
161 static int min_free = 0;
162 static int frag_lifetime = 30;
163 static int inactive_lifetime = 60;
164 static int active_lifetime = 300;
165 static int sockbufsize;
166 #define BULK_QUANTITY_MAX (unsigned)(mem_index_t)(-1)
167 #if (MEM_BITS == 0) || (MEM_BITS == 16)
168 #define BULK_QUANTITY 10000
170 #define BULK_QUANTITY 200
173 static unsigned epoch_length=60, log_epochs=1;
174 static unsigned cur_epoch=0,prev_uptime=0;
176 static unsigned bulk_quantity = BULK_QUANTITY;
177 static unsigned pending_queue_length = 100;
178 static struct NetFlow *netflow = &NetFlow5;
179 static unsigned verbosity = 6;
180 static unsigned log_dest = MY_LOG_SYSLOG;
181 static struct Time start_time;
182 static long start_time_offset;
185 extern unsigned total_elements;
186 extern unsigned free_elements;
187 extern unsigned total_memory;
188 #if ((DEBUG) & DEBUG_I)
189 static unsigned emit_pkts, emit_queue;
190 static uint64_t size_total;
191 static unsigned pkts_total, pkts_total_fragmented;
192 static unsigned pkts_ignored, pkts_lost_capture, pkts_lost_unpending;
193 static unsigned pkts_pending, pkts_pending_done;
194 static unsigned pending_queue_trace, pending_queue_trace_candidate;
195 static unsigned flows_total, flows_fragmented;
197 static unsigned emit_count;
198 static uint32_t emit_sequence;
199 static unsigned emit_rate_bytes, emit_rate_delay;
200 static struct Time emit_time;
201 static uint8_t emit_packet[NETFLOW_MAX_PACKET];
202 static pthread_t thid;
203 static sigset_t sig_mask;
204 static struct sched_param schedp;
205 static int sched_min, sched_max;
206 static int npeers, npeers_rot;
207 static struct peer *peers;
210 static struct Flow *flows[1 << HASH_BITS];
211 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
213 static pthread_mutex_t unpending_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t unpending_cond = PTHREAD_COND_INITIALIZER;
216 static pthread_mutex_t scan_mutex = PTHREAD_MUTEX_INITIALIZER;
217 static pthread_cond_t scan_cond = PTHREAD_COND_INITIALIZER;
218 static struct Flow *pending_head, *pending_tail;
219 static struct Flow *scan_frag_dreg;
221 static pthread_mutex_t emit_mutex = PTHREAD_MUTEX_INITIALIZER;
222 static pthread_cond_t emit_cond = PTHREAD_COND_INITIALIZER;
223 static struct Flow *flows_emit;
225 static char ident[256] = "fprobe-ulog";
226 static FILE *pidfile;
227 static char *pidfilepath;
230 static int emit_timeout = EMIT_TIMEOUT, unpending_timeout = UNPENDING_TIMEOUT;
231 static struct ipulog_handle *ulog_handle;
232 static uint32_t ulog_gmask = 1;
233 static char *cap_buf;
234 static int nsnmp_rules;
235 static struct snmp_rule *snmp_rules;
236 static struct passwd *pw = 0;
241 "fprobe-ulog: a NetFlow probe. Version %s\n"
242 "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
244 "-h\t\tDisplay this help\n"
245 "-U <mask>\tULOG group bitwise mask [1]\n"
246 "-s <seconds>\tHow often scan for expired flows [5]\n"
247 "-g <seconds>\tFragmented flow lifetime [30]\n"
248 "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
249 "-f <filename>\tLog flow data in a file\n"
250 "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
251 "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
252 "-a <address>\tUse <address> as source for NetFlow flow\n"
253 "-X <rules>\tInterface name to SNMP-index conversion rules\n"
254 "-M\t\tUse netfilter mark value as ToS flag\n"
255 "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
256 "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
257 "-q <flows>\tPending queue length [100]\n"
258 "-B <kilobytes>\tKernel capture buffer size [0]\n"
259 "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
260 "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
261 "-c <directory>\tDirectory to chroot to\n"
262 "-u <user>\tUser to run as\n"
263 "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
264 "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
265 "-y <remote:port>\tAddress of the NetFlow collector\n"
266 "-f <writable file>\tFile to write data into\n"
267 "-T <n>\tRotate log file every n epochs\n"
268 "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
269 "-E <[1..60]>\tSize of an epoch in minutes\n"
270 "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
272 VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
276 #if ((DEBUG) & DEBUG_I)
279 my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
280 pkts_total, pkts_total_fragmented, size_total,
281 pkts_pending - pkts_pending_done, pending_queue_trace);
282 my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
283 pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
284 my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
285 flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
286 my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
287 total_elements, free_elements, total_memory);
291 void sighandler(int sig)
295 sigs |= SIGTERM_MASK;
297 #if ((DEBUG) & DEBUG_I)
299 sigs |= SIGUSR1_MASK;
305 void gettime(struct Time *now)
311 now->usec = t.tv_usec;
315 inline time_t cmpMtime(struct Time *t1, struct Time *t2)
317 return (t1->sec - t2->sec)/60;
320 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
322 return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
325 /* Uptime in miliseconds */
326 uint32_t getuptime(struct Time *t)
328 /* Maximum uptime is about 49/2 days */
329 return cmpmtime(t, &start_time);
332 /* Uptime in minutes */
333 uint32_t getuptime_minutes(struct Time *t)
335 /* Maximum uptime is about 49/2 days */
336 return cmpMtime(t, &start_time);
339 hash_t hash_flow(struct Flow *flow)
341 if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
342 else return hash(flow, sizeof(struct Flow_TL));
345 uint16_t snmp_index(char *name) {
348 if (!*name) return 0;
350 for (i = 0; (int) i < nsnmp_rules; i++) {
351 if (strncmp(snmp_rules[i].basename, name, snmp_rules[i].len)) continue;
352 return atoi(&name[snmp_rules[i].len]) + snmp_rules[i].base;
355 if ((i = if_nametoindex(name))) return i;
360 inline void copy_flow(struct Flow *src, struct Flow *dst)
367 dst->proto = src->proto;
368 dst->tcp_flags = src->tcp_flags;
372 dst->pkts = src->pkts;
373 dst->size = src->size;
374 dst->sizeF = src->sizeF;
375 dst->sizeP = src->sizeP;
376 dst->ctime = src->ctime;
377 dst->mtime = src->mtime;
378 dst->flags = src->flags;
381 void update_cur_epoch_file(int n) {
384 len=snprintf(snum,6,"%d",n);
385 fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
387 my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
390 write(fd, snum, len);
394 unsigned get_log_fd(char *fname, unsigned cur_fd) {
397 /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
398 * doesn't solve the problem */
400 struct statfs statfs;
403 cur_uptime = getuptime_minutes(&now);
405 if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
406 my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
409 if (min_free && statfs.f_bfree < min_free)
411 case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
412 my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
415 my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
420 /* Epoch length in minutes */
421 if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
422 char nextname[MAX_PATH_LEN];
424 prev_uptime = cur_uptime;
425 cur_epoch = (cur_epoch + 1) % log_epochs;
427 snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
428 if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
429 my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
432 update_cur_epoch_file(cur_epoch);
440 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
442 struct Flow **flowpp;
448 if (prev) flowpp = *prev;
451 if (where->sip.s_addr == what->sip.s_addr
452 && where->dip.s_addr == what->dip.s_addr
453 && where->proto == what->proto) {
454 switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
456 /* Both unfragmented */
457 if ((what->sp == where->sp)
458 && (what->dp == where->dp)) goto done;
461 /* Both fragmented */
462 if (where->id == what->id) goto done;
466 flowpp = &where->next;
470 if (prev) *prev = flowpp;
474 int put_into(struct Flow *flow, int flag
475 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
482 struct Flow *flown, **flowpp;
483 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
488 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
489 sprintf(buf, " %x H:%04x", (unsigned) flow, h);
492 pthread_mutex_lock(&flows_mutex[h]);
494 if (!(flown = find(flows[h], flow, &flowpp))) {
495 /* No suitable flow found - add */
496 if (flag == COPY_INTO) {
497 if ((flown = mem_alloc())) {
498 copy_flow(flow, flown);
501 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
502 my_log(LOG_ERR, "%s %s. %s",
503 "mem_alloc():", strerror(errno), "packet lost");
508 flow->next = flows[h];
510 #if ((DEBUG) & DEBUG_I)
512 if (flow->flags & FLOW_FRAG) flows_fragmented++;
514 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
516 sprintf(buf, " => %x, flags: %x", (unsigned) flown, flown->flags);
521 /* Found suitable flow - update */
522 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
523 sprintf(buf, " +> %x", (unsigned) flown);
526 if (cmpmtime(&flow->mtime, &flown->mtime) > 0)
527 flown->mtime = flow->mtime;
528 if (cmpmtime(&flow->ctime, &flown->ctime) < 0)
529 flown->ctime = flow->ctime;
530 flown->tcp_flags |= flow->tcp_flags;
531 flown->size += flow->size;
532 flown->pkts += flow->pkts;
533 if (flow->flags & FLOW_FRAG) {
534 /* Fragmented flow require some additional work */
535 if (flow->flags & FLOW_TL) {
538 Several packets with FLOW_TL (attack)
540 flown->sp = flow->sp;
541 flown->dp = flow->dp;
543 if (flow->flags & FLOW_LASTFRAG) {
546 Several packets with FLOW_LASTFRAG (attack)
548 flown->sizeP = flow->sizeP;
550 flown->flags |= flow->flags;
551 flown->sizeF += flow->sizeF;
552 if ((flown->flags & FLOW_LASTFRAG)
553 && (flown->sizeF >= flown->sizeP)) {
554 /* All fragments received - flow reassembled */
555 *flowpp = flown->next;
556 pthread_mutex_unlock(&flows_mutex[h]);
557 #if ((DEBUG) & DEBUG_I)
562 flown->flags &= ~FLOW_FRAG;
563 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
566 ret = put_into(flown, MOVE_INTO
567 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
573 if (flag == MOVE_INTO) mem_free(flow);
575 pthread_mutex_unlock(&flows_mutex[h]);
579 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
583 for (i = 0; i < fields; i++) {
584 #if ((DEBUG) & DEBUG_F)
585 my_log(LOG_DEBUG, "F: field %04d at %x", format[i], (unsigned) p);
588 case NETFLOW_IPV4_SRC_ADDR:
589 ((struct in_addr *) p)->s_addr = flow->sip.s_addr;
590 p += NETFLOW_IPV4_SRC_ADDR_SIZE;
593 case NETFLOW_IPV4_DST_ADDR:
594 ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
595 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
596 my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
598 p += NETFLOW_IPV4_DST_ADDR_SIZE;
601 case NETFLOW_INPUT_SNMP:
602 *((uint16_t *) p) = htons(flow->iif);
603 p += NETFLOW_INPUT_SNMP_SIZE;
606 case NETFLOW_OUTPUT_SNMP:
607 *((uint16_t *) p) = htons(flow->oif);
608 p += NETFLOW_OUTPUT_SNMP_SIZE;
611 case NETFLOW_PKTS_32:
612 *((uint32_t *) p) = htonl(flow->pkts);
613 p += NETFLOW_PKTS_32_SIZE;
616 case NETFLOW_BYTES_32:
617 *((uint32_t *) p) = htonl(flow->size);
618 p += NETFLOW_BYTES_32_SIZE;
621 case NETFLOW_FIRST_SWITCHED:
622 *((uint32_t *) p) = htonl(getuptime(&flow->ctime));
623 p += NETFLOW_FIRST_SWITCHED_SIZE;
626 case NETFLOW_LAST_SWITCHED:
627 *((uint32_t *) p) = htonl(getuptime(&flow->mtime));
628 p += NETFLOW_LAST_SWITCHED_SIZE;
631 case NETFLOW_L4_SRC_PORT:
632 *((uint16_t *) p) = flow->sp;
633 p += NETFLOW_L4_SRC_PORT_SIZE;
636 case NETFLOW_L4_DST_PORT:
637 *((uint16_t *) p) = flow->dp;
638 p += NETFLOW_L4_DST_PORT_SIZE;
642 *((uint8_t *) p) = flow->proto;
643 p += NETFLOW_PROT_SIZE;
646 case NETFLOW_SRC_TOS:
647 *((uint8_t *) p) = flow->tos;
648 p += NETFLOW_SRC_TOS_SIZE;
651 case NETFLOW_TCP_FLAGS:
652 *((uint8_t *) p) = flow->tcp_flags;
653 p += NETFLOW_TCP_FLAGS_SIZE;
656 case NETFLOW_VERSION:
657 *((uint16_t *) p) = htons(netflow->Version);
658 p += NETFLOW_VERSION_SIZE;
662 *((uint16_t *) p) = htons(emit_count);
663 p += NETFLOW_COUNT_SIZE;
667 *((uint32_t *) p) = htonl(getuptime(&emit_time));
668 p += NETFLOW_UPTIME_SIZE;
671 case NETFLOW_UNIX_SECS:
672 *((uint32_t *) p) = htonl(emit_time.sec);
673 p += NETFLOW_UNIX_SECS_SIZE;
676 case NETFLOW_UNIX_NSECS:
677 *((uint32_t *) p) = htonl(emit_time.usec * 1000);
678 p += NETFLOW_UNIX_NSECS_SIZE;
681 case NETFLOW_FLOW_SEQUENCE:
682 //*((uint32_t *) p) = htonl(emit_sequence);
683 *((uint32_t *) p) = 0;
684 p += NETFLOW_FLOW_SEQUENCE_SIZE;
688 /* Unsupported (uint8_t) */
689 case NETFLOW_ENGINE_TYPE:
690 case NETFLOW_ENGINE_ID:
691 case NETFLOW_FLAGS7_1:
692 case NETFLOW_SRC_MASK:
693 case NETFLOW_DST_MASK:
694 *((uint8_t *) p) = 0;
695 p += NETFLOW_PAD8_SIZE;
698 *((uint16_t *) p) = flow->tos;
699 p += NETFLOW_XID_SIZE;
702 /* Unsupported (uint16_t) */
705 case NETFLOW_FLAGS7_2:
706 *((uint16_t *) p) = 0;
707 p += NETFLOW_PAD16_SIZE;
711 /* Unsupported (uint32_t) */
712 case NETFLOW_IPV4_NEXT_HOP:
713 case NETFLOW_ROUTER_SC:
714 *((uint32_t *) p) = 0;
715 p += NETFLOW_PAD32_SIZE;
719 my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
720 format, i, format[i]);
724 #if ((DEBUG) & DEBUG_F)
725 my_log(LOG_DEBUG, "F: return %x", (unsigned) p);
732 Workaround for clone()-based threads
733 Try to change EUID independently of main thread
737 setregid(pw->pw_gid, pw->pw_gid);
738 setreuid(pw->pw_uid, pw->pw_uid);
747 struct timespec timeout;
748 int i, ret, sent = 0, size, delay, peer_rot_cur, peer_rot_work = 0;
750 p = (void *) &emit_packet + netflow->HeaderSize;
756 pthread_mutex_lock(&emit_mutex);
757 while (!flows_emit) {
758 gettimeofday(&now, 0);
759 timeout.tv_sec = now.tv_sec + emit_timeout;
760 /* Do not wait until emit_packet will filled - it may be too long */
761 if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
762 pthread_mutex_unlock(&emit_mutex);
767 flows_emit = flows_emit->next;
768 #if ((DEBUG) & DEBUG_I)
771 pthread_mutex_unlock(&emit_mutex);
775 gettime(&start_time);
776 start_time.sec -= start_time_offset;
779 p = fill(netflow->FlowFields, netflow->FlowFormat, flow, p);
783 printf("Emit count = %d\n", emit_count);
786 if (emit_count == netflow->MaxFlows) {
789 p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
790 size = netflow->HeaderSize + emit_count * netflow->FlowSize;
791 /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
792 if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
794 for (i = 0; i < npeers; i++) {
795 if (peers[i].type == PEER_FILE) {
796 if (netflow->SeqOffset)
797 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
798 peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
799 ret = write(peers[i].write_fd, emit_packet, size);
802 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
803 my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
804 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
808 #if ((DEBUG) & DEBUG_E)
810 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
811 emit_count, i + 1, peers[i].seq);
814 peers[i].seq += emit_count;
817 if (emit_rate_bytes) {
819 delay = sent / emit_rate_bytes;
821 sent %= emit_rate_bytes;
823 timeout.tv_nsec = emit_rate_delay * delay;
824 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
829 if (peers[i].type == PEER_MIRROR) goto sendreal;
831 if (peers[i].type == PEER_ROTATE)
832 if (peer_rot_cur++ == peer_rot_work) {
834 if (netflow->SeqOffset)
835 *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
836 ret = send(peers[i].write_fd, emit_packet, size, 0);
838 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
839 my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
840 i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
843 #if ((DEBUG) & DEBUG_E)
845 my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
846 emit_count, i + 1, peers[i].seq);
849 peers[i].seq += emit_count;
852 if (emit_rate_bytes) {
854 delay = sent / emit_rate_bytes;
856 sent %= emit_rate_bytes;
858 timeout.tv_nsec = emit_rate_delay * delay;
859 while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
864 if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
865 emit_sequence += emit_count;
867 #if ((DEBUG) & DEBUG_I)
874 void *unpending_thread()
877 struct timespec timeout;
878 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
885 pthread_mutex_lock(&unpending_mutex);
888 while (!(pending_tail->flags & FLOW_PENDING)) {
889 gettimeofday(&now, 0);
890 timeout.tv_sec = now.tv_sec + unpending_timeout;
891 pthread_cond_timedwait(&unpending_cond, &unpending_mutex, &timeout);
894 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
897 if (put_into(pending_tail, COPY_INTO
898 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
902 #if ((DEBUG) & DEBUG_I)
903 pkts_lost_unpending++;
907 #if ((DEBUG) & DEBUG_U)
908 my_log(LOG_DEBUG, "%s%s", "U:", logbuf);
911 pending_tail->flags = 0;
912 pending_tail = pending_tail->next;
913 #if ((DEBUG) & DEBUG_I)
921 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
925 struct Flow *flow, **flowpp;
927 struct timespec timeout;
932 pthread_mutex_lock(&scan_mutex);
936 timeout.tv_sec = now.sec + scan_interval;
937 pthread_cond_timedwait(&scan_cond, &scan_mutex, &timeout);
940 #if ((DEBUG) & DEBUG_S)
941 my_log(LOG_DEBUG, "S: %d", now.sec);
943 for (i = 0; i < 1 << HASH_BITS ; i++) {
944 pthread_mutex_lock(&flows_mutex[i]);
948 if (flow->flags & FLOW_FRAG) {
949 /* Process fragmented flow */
950 if ((now.sec - flow->mtime.sec) > frag_lifetime) {
951 /* Fragmented flow expired - put it into special chain */
952 #if ((DEBUG) & DEBUG_I)
956 *flowpp = flow->next;
958 flow->flags &= ~FLOW_FRAG;
959 flow->next = scan_frag_dreg;
960 scan_frag_dreg = flow;
965 /* Flow is not frgamented */
966 if ((now.sec - flow->mtime.sec) > inactive_lifetime
967 || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
969 #if ((DEBUG) & DEBUG_S)
970 my_log(LOG_DEBUG, "S: E %x", flow);
972 #if ((DEBUG) & DEBUG_I)
975 *flowpp = flow->next;
976 pthread_mutex_lock(&emit_mutex);
977 flow->next = flows_emit;
979 #if ((DEBUG) & DEBUG_I)
982 pthread_mutex_unlock(&emit_mutex);
987 flowpp = &flow->next;
990 pthread_mutex_unlock(&flows_mutex[i]);
992 if (flows_emit) pthread_cond_signal(&emit_cond);
994 while (scan_frag_dreg) {
995 flow = scan_frag_dreg;
996 scan_frag_dreg = flow->next;
997 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1000 put_into(flow, MOVE_INTO
1001 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
1005 #if ((DEBUG) & DEBUG_S)
1006 my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
1014 struct ulog_packet_msg *ulog_msg;
1018 int len, off_frag, psize;
1019 #if ((DEBUG) & DEBUG_C)
1027 len = ipulog_read(ulog_handle, cap_buf, CAPTURE_SIZE, 1);
1029 my_log(LOG_ERR, "ipulog_read(): %s", ipulog_strerror(ipulog_errno));
1032 while ((ulog_msg = ipulog_get_packet(ulog_handle, cap_buf, len))) {
1034 #if ((DEBUG) & DEBUG_C)
1035 sprintf(logbuf, "C: %d", ulog_msg->data_len);
1038 nl = (void *) &ulog_msg->payload;
1039 psize = ulog_msg->data_len;
1042 if (psize < (signed) sizeof(struct ip) || nl->ip_v != 4) {
1043 #if ((DEBUG) & DEBUG_C)
1044 strcat(logbuf, " U");
1045 my_log(LOG_DEBUG, "%s", logbuf);
1047 #if ((DEBUG) & DEBUG_I)
1053 if (pending_head->flags) {
1054 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
1056 # if ((DEBUG) & DEBUG_C)
1061 "pending queue full:", "packet lost");
1063 #if ((DEBUG) & DEBUG_I)
1064 pkts_lost_capture++;
1069 #if ((DEBUG) & DEBUG_I)
1073 flow = pending_head;
1075 /* ?FIXME? Add sanity check for ip_len? */
1076 flow->size = ntohs(nl->ip_len);
1077 #if ((DEBUG) & DEBUG_I)
1078 size_total += flow->size;
1081 flow->sip = nl->ip_src;
1082 flow->dip = nl->ip_dst;
1083 if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
1084 my_log(LOG_INFO, "Received test flow to corewars.org");
1086 flow->iif = snmp_index(ulog_msg->indev_name);
1087 flow->oif = snmp_index(ulog_msg->outdev_name);
1088 flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
1089 flow->proto = nl->ip_p;
1091 flow->tcp_flags = 0;
1095 /* Packets captured from OUTPUT table didn't contains valid timestamp */
1096 if (ulog_msg->timestamp_sec) {
1097 flow->ctime.sec = ulog_msg->timestamp_sec;
1098 flow->ctime.usec = ulog_msg->timestamp_usec;
1099 } else gettime(&flow->ctime);
1100 flow->mtime = flow->ctime;
1102 off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
1105 Offset (from network layer) to transport layer header/IP data
1106 IOW IP header size ;-)
1109 Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
1111 off_tl = nl->ip_hl << 2;
1112 tl = (void *) nl + off_tl;
1114 /* THIS packet data size: data_size = total_size - ip_header_size*4 */
1115 flow->sizeF = ntohs(nl->ip_len) - off_tl;
1117 if ((signed) flow->sizeF < 0) flow->sizeF = 0;
1118 if (psize > (signed) flow->sizeF) psize = flow->sizeF;
1120 if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) {
1121 /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */
1122 #if ((DEBUG) & DEBUG_C)
1123 strcat(logbuf, " F");
1125 #if ((DEBUG) & DEBUG_I)
1126 pkts_total_fragmented++;
1128 flow->flags |= FLOW_FRAG;
1129 flow->id = nl->ip_id;
1131 if (!(ntohs(nl->ip_off) & IP_MF)) {
1132 /* Packet whith IP_MF contains information about whole datagram size */
1133 flow->flags |= FLOW_LASTFRAG;
1134 /* size = frag_offset*8 + data_size */
1135 flow->sizeP = off_frag + flow->sizeF;
1139 #if ((DEBUG) & DEBUG_C)
1140 sprintf(buf, " %s@%u>", inet_ntoa(flow->sip), flow->iif);
1141 strcat(logbuf, buf);
1142 sprintf(buf, "%s@%u P:%x", inet_ntoa(flow->dip), flow->oif, flow->proto);
1143 strcat(logbuf, buf);
1147 Fortunately most interesting transport layer information fit
1148 into first 8 bytes of IP data field (minimal nonzero size).
1149 Thus we don't need actual packet reassembling to build whole
1150 transport layer data. We only check the fragment offset for
1151 zero value to find packet with this information.
1153 if (!off_frag && psize >= 8) {
1154 switch (flow->proto) {
1157 flow->sp = ((struct udphdr *)tl)->uh_sport;
1158 flow->dp = ((struct udphdr *)tl)->uh_dport;
1163 flow->sp = htons(((struct icmp *)tl)->icmp_type);
1164 flow->dp = htons(((struct icmp *)tl)->icmp_code);
1167 #ifdef ICMP_TRICK_CISCO
1169 flow->dp = *((int32_t *) tl);
1174 /* Unknown transport layer */
1175 #if ((DEBUG) & DEBUG_C)
1176 strcat(logbuf, " U");
1183 #if ((DEBUG) & DEBUG_C)
1184 sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
1185 strcat(logbuf, buf);
1187 flow->flags |= FLOW_TL;
1191 /* Check for tcp flags presence (including CWR and ECE). */
1192 if (flow->proto == IPPROTO_TCP
1194 && psize >= 16 - off_frag) {
1195 flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
1196 #if ((DEBUG) & DEBUG_C)
1197 sprintf(buf, " TCP:%x", flow->tcp_flags);
1198 strcat(logbuf, buf);
1202 #if ((DEBUG) & DEBUG_C)
1203 sprintf(buf, " => %x", (unsigned) flow);
1204 strcat(logbuf, buf);
1205 my_log(LOG_DEBUG, "%s", logbuf);
1208 #if ((DEBUG) & DEBUG_I)
1210 pending_queue_trace_candidate = pkts_pending - pkts_pending_done;
1211 if (pending_queue_trace < pending_queue_trace_candidate)
1212 pending_queue_trace = pending_queue_trace_candidate;
1215 /* Flow complete - inform unpending_thread() about it */
1216 pending_head->flags |= FLOW_PENDING;
1217 pending_head = pending_head->next;
1219 pthread_cond_signal(&unpending_cond);
1225 int main(int argc, char **argv)
1228 char *dhost, *dport, *lhost, *type = 0, *log_suffix = 0, *rule;
1229 int c, i, write_fd, memory_limit = 0;
1230 struct addrinfo hints, *res;
1231 struct sockaddr_in saddr;
1232 pthread_attr_t tattr;
1233 struct sigaction sigact;
1234 static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &cap_thread};
1235 struct timeval timeout;
1237 sched_min = sched_get_priority_min(SCHED);
1238 sched_max = sched_get_priority_max(SCHED);
1240 memset(&saddr, 0 , sizeof(saddr));
1241 memset(&hints, 0 , sizeof(hints));
1242 hints.ai_flags = AI_PASSIVE;
1243 hints.ai_family = AF_INET;
1244 hints.ai_socktype = SOCK_DGRAM;
1246 /* Process command line options */
1249 while ((c = my_getopt(argc, argv, parms)) != -1) {
1259 if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
1260 if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
1261 if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
1262 if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
1263 if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
1264 if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
1265 if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
1266 if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
1267 if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
1268 if (parms[nflag].count) {
1269 switch (atoi(parms[nflag].arg)) {
1271 netflow = &NetFlow1;
1278 netflow = &NetFlow7;
1282 fprintf(stderr, "Illegal %s\n", "NetFlow version");
1286 if (parms[vflag].count) verbosity = atoi(parms[vflag].arg);
1287 if (parms[lflag].count) {
1288 if ((log_suffix = strchr(parms[lflag].arg, ':'))) {
1291 sprintf(errpbuf, "[%s]", log_suffix);
1292 strcat(ident, errpbuf);
1295 if (*parms[lflag].arg) log_dest = atoi(parms[lflag].arg);
1296 if (log_suffix) *--log_suffix = ':';
1298 if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
1300 fprintf(stderr, "malloc(): %s\n", strerror(errno));
1303 sprintf(pidfilepath, "%s/%s.pid", PID_DIR, ident);
1304 if (parms[qflag].count) {
1305 pending_queue_length = atoi(parms[qflag].arg);
1306 if (pending_queue_length < 1) {
1307 fprintf(stderr, "Illegal %s\n", "pending queue length");
1311 if (parms[rflag].count) {
1312 schedp.sched_priority = atoi(parms[rflag].arg);
1313 if (schedp.sched_priority
1314 && (schedp.sched_priority < sched_min
1315 || schedp.sched_priority > sched_max)) {
1316 fprintf(stderr, "Illegal %s\n", "realtime priority");
1320 if (parms[Bflag].count) {
1321 sockbufsize = atoi(parms[Bflag].arg) << 10;
1323 if (parms[bflag].count) {
1324 bulk_quantity = atoi(parms[bflag].arg);
1325 if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) {
1326 fprintf(stderr, "Illegal %s\n", "bulk size");
1330 if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10;
1331 if (parms[Xflag].count) {
1332 for(i = 0; parms[Xflag].arg[i]; i++)
1333 if (parms[Xflag].arg[i] == ':') nsnmp_rules++;
1334 if (!(snmp_rules = malloc(nsnmp_rules * sizeof(struct snmp_rule))))
1336 rule = strtok(parms[Xflag].arg, ":");
1337 for (i = 0; rule; i++) {
1338 snmp_rules[i].len = strlen(rule);
1339 if (snmp_rules[i].len > IFNAMSIZ) {
1340 fprintf(stderr, "Illegal %s\n", "interface basename");
1343 strncpy(snmp_rules[i].basename, rule, snmp_rules[i].len);
1344 if (!*(rule - 1)) *(rule - 1) = ',';
1345 rule = strtok(NULL, ",");
1347 fprintf(stderr, "Illegal %s\n", "SNMP rule");
1350 snmp_rules[i].base = atoi(rule);
1352 rule = strtok(NULL, ":");
1356 if (parms[tflag].count)
1357 sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
1358 if (parms[aflag].count) {
1359 if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
1361 fprintf(stderr, "Illegal %s\n", "source address");
1364 saddr = *((struct sockaddr_in *) res->ai_addr);
1368 if (parms[uflag].count)
1369 if ((pw = getpwnam(parms[uflag].arg)) == NULL) {
1370 fprintf(stderr, "getpwnam(%s): %s\n", parms[uflag].arg, errno ? strerror(errno) : "Unknown user");
1375 /* Process collectors parameters. Brrrr... :-[ */
1377 npeers = argc - optind;
1379 /* Send to remote Netflow collector */
1380 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1381 for (i = optind, npeers = 0; i < argc; i++, npeers++) {
1383 if (!(dport = strchr(dhost, ':'))) goto bad_collector;
1385 if ((write_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
1386 fprintf(stderr, "socket(): %s\n", strerror(errno));
1389 peers[npeers].write_fd = write_fd;
1390 peers[npeers].type = PEER_MIRROR;
1391 peers[npeers].laddr = saddr;
1392 peers[npeers].seq = 0;
1393 if ((lhost = strchr(dport, '/'))) {
1395 if ((type = strchr(lhost, '/'))) {
1403 peers[npeers].type = PEER_ROTATE;
1412 if (getaddrinfo(lhost, 0, &hints, &res)) goto bad_lhost;
1413 peers[npeers].laddr = *((struct sockaddr_in *) res->ai_addr);
1417 if (bind(write_fd, (struct sockaddr *) &peers[npeers].laddr,
1418 sizeof(struct sockaddr_in))) {
1419 fprintf(stderr, "bind(): %s\n", strerror(errno));
1422 if (getaddrinfo(dhost, dport, &hints, &res)) {
1424 fprintf(stderr, "Error in collector #%d parameters\n", npeers + 1);
1427 peers[npeers].addr = *((struct sockaddr_in *) res->ai_addr);
1429 if (connect(write_fd, (struct sockaddr *) &peers[npeers].addr,
1430 sizeof(struct sockaddr_in))) {
1431 fprintf(stderr, "connect(): %s\n", strerror(errno));
1435 /* Restore command line */
1436 if (type) *--type = '/';
1437 if (lhost) *--lhost = '/';
1441 else if (parms[fflag].count) {
1443 if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
1444 if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
1445 strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
1447 peers[npeers].write_fd = START_VALUE;
1448 peers[npeers].type = PEER_FILE;
1449 peers[npeers].seq = 0;
1456 if (!(cap_buf = malloc(CAPTURE_SIZE))) goto err_malloc;
1457 ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
1459 fprintf(stderr, "libipulog initialization error: %s",
1460 ipulog_strerror(ipulog_errno));
1464 if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
1465 &sockbufsize, sizeof(sockbufsize)) < 0)
1466 fprintf(stderr, "setsockopt(): %s", strerror(errno));
1468 /* Daemonize (if log destination stdout-free) */
1470 my_log_open(ident, verbosity, log_dest);
1471 if (!(log_dest & 2)) {
1472 /* Crash-proofing - Sapan*/
1476 fprintf(stderr, "fork(): %s", strerror(errno));
1481 freopen("/dev/null", "r", stdin);
1482 freopen("/dev/null", "w", stdout);
1483 freopen("/dev/null", "w", stderr);
1487 while (wait3(NULL,0,NULL) < 1);
1491 setvbuf(stdout, (char *)0, _IONBF, 0);
1492 setvbuf(stderr, (char *)0, _IONBF, 0);
1496 sprintf(errpbuf, "[%ld]", (long) pid);
1497 strcat(ident, errpbuf);
1499 /* Initialization */
1501 hash_init(); /* Actually for crc16 only */
1502 mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
1503 for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
1506 /* Hope 12 days is enough :-/ */
1507 start_time_offset = 1 << 20;
1509 /* start_time_offset = active_lifetime + inactive_lifetime + scan_interval; */
1511 gettime(&start_time);
1514 Build static pending queue as circular buffer.
1516 if (!(pending_head = mem_alloc())) goto err_mem_alloc;
1517 pending_tail = pending_head;
1518 for (i = pending_queue_length - 1; i--;) {
1519 if (!(pending_tail->next = mem_alloc())) {
1521 my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
1524 pending_tail = pending_tail->next;
1526 pending_tail->next = pending_head;
1527 pending_tail = pending_head;
1529 sigemptyset(&sig_mask);
1530 sigact.sa_handler = &sighandler;
1531 sigact.sa_mask = sig_mask;
1532 sigact.sa_flags = 0;
1533 sigaddset(&sig_mask, SIGTERM);
1534 sigaction(SIGTERM, &sigact, 0);
1535 #if ((DEBUG) & DEBUG_I)
1536 sigaddset(&sig_mask, SIGUSR1);
1537 sigaction(SIGUSR1, &sigact, 0);
1539 if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) {
1540 my_log(LOG_CRIT, "pthread_sigmask(): %s", strerror(errno));
1544 my_log(LOG_INFO, "Starting %s...", VERSION);
1546 if (parms[cflag].count) {
1547 if (chdir(parms[cflag].arg) || chroot(".")) {
1548 my_log(LOG_CRIT, "could not chroot to %s: %s", parms[cflag].arg, strerror(errno));
1553 schedp.sched_priority = schedp.sched_priority - THREADS + 2;
1554 pthread_attr_init(&tattr);
1555 for (i = 0; i < THREADS - 1; i++) {
1556 if (schedp.sched_priority > 0) {
1557 if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
1558 (pthread_attr_setschedparam(&tattr, &schedp))) {
1559 my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
1563 if (pthread_create(&thid, &tattr, threads[i], 0)) {
1564 my_log(LOG_CRIT, "pthread_create(): %s", strerror(errno));
1567 pthread_detach(thid);
1568 schedp.sched_priority++;
1572 if (setgroups(0, NULL)) {
1573 my_log(LOG_CRIT, "setgroups(): %s", strerror(errno));
1576 if (setregid(pw->pw_gid, pw->pw_gid)) {
1577 my_log(LOG_CRIT, "setregid(%u): %s", pw->pw_gid, strerror(errno));
1580 if (setreuid(pw->pw_uid, pw->pw_uid)) {
1581 my_log(LOG_CRIT, "setreuid(%u): %s", pw->pw_uid, strerror(errno));
1586 if (!(pidfile = fopen(pidfilepath, "w")))
1587 my_log(LOG_ERR, "Can't create pid file. fopen(): %s", strerror(errno));
1589 fprintf(pidfile, "%ld\n", (long) pid);
1593 my_log(LOG_INFO, "pid: %d", pid);
1594 my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
1595 "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
1596 ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
1597 netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
1598 memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
1599 emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
1600 parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
1601 for (i = 0; i < nsnmp_rules; i++) {
1602 my_log(LOG_INFO, "SNMP rule #%d %s:%d",
1603 i + 1, snmp_rules[i].basename, snmp_rules[i].base);
1605 for (i = 0; i < npeers; i++) {
1606 switch (peers[i].type) {
1614 snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
1615 my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
1616 inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
1619 pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
1621 timeout.tv_usec = 0;
1623 || (total_elements - free_elements - pending_queue_length)
1625 || pending_tail->flags) {
1628 timeout.tv_sec = scan_interval;
1629 select(0, 0, 0, 0, &timeout);
1632 if (sigs & SIGTERM_MASK && !killed) {
1633 sigs &= ~SIGTERM_MASK;
1634 my_log(LOG_INFO, "SIGTERM received. Emitting flows cache...");
1637 active_lifetime = -1;
1638 inactive_lifetime = -1;
1640 unpending_timeout = 1;
1642 pthread_cond_signal(&scan_cond);
1643 pthread_cond_signal(&unpending_cond);
1646 #if ((DEBUG) & DEBUG_I)
1647 if (sigs & SIGUSR1_MASK) {
1648 sigs &= ~SIGUSR1_MASK;
1653 remove(pidfilepath);
1654 #if ((DEBUG) & DEBUG_I)
1657 my_log(LOG_INFO, "Done.");